diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java index 79003548..9a3e8627 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java @@ -5,13 +5,13 @@ import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; @Capability(value = "action") public interface ActionCapability { void putPreparedAction(String uuid, ActionData actionData); - List popPreparedAction(String userId); - List popPendingAction(String userId); List listPreparedAction(String userId); @@ -23,4 +23,8 @@ public interface ActionCapability { List selectTendencyCache(String input); void updateTendencyCache(CacheAdjustData data); + + ExecutorService getExecutor(ActionCore.ExecutorType type); + + Set getExistedMetaActions(); } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 9497147b..9e01dd3a 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -1,7 +1,6 @@ package work.slhaf.partner.core.action; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore; import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod; import work.slhaf.partner.common.vector.VectorClient; @@ -11,25 +10,15 @@ import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.cache.ActionCacheData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData; -import work.slhaf.partner.core.action.exception.ActionInitFailedException; -import work.slhaf.partner.core.action.exception.ActionLoadFailedException; -import java.io.File; import java.io.IOException; -import java.nio.file.*; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; - @SuppressWarnings("FieldMayBeFinal") @CapabilityCore(value = "action") @Slf4j @@ -38,7 +27,7 @@ public class ActionCore extends PartnerCore { /** * 对应本次交互即将执行或将要放置在行动池的预备任务,因此将以本次交互的uuid为键,其起到的作用相当于暂时的模块上下文 */ - private HashMap> preparedActions = new HashMap<>(); + private HashMap> actionPool = new HashMap<>(); /** * 待确认任务,以userId区分不同用户,因为需要跨请求确认 @@ -58,7 +47,7 @@ public class ActionCore extends PartnerCore { private final LinkedHashMap existedMetaActions = new LinkedHashMap<>(); public ActionCore() throws IOException, ClassNotFoundException { - new ActionWatchService().launch(); + new ActionWatchService(existedMetaActions, virtualExecutor).launch(); } @CapabilityMethod @@ -79,23 +68,19 @@ public class ActionCore extends PartnerCore { @CapabilityMethod public synchronized void putPreparedAction(String uuid, ActionData actionData) { - preparedActions.computeIfAbsent(uuid, k -> { + actionPool.computeIfAbsent(uuid, k -> { List temp = new ArrayList<>(); temp.add(actionData); return temp; }); } - @CapabilityMethod - public synchronized List popPreparedAction(String userId) { - List infos = preparedActions.get(userId); - preparedActions.remove(userId); - return infos; - } - @CapabilityMethod public List listPreparedAction(String userId) { - return preparedActions.get(userId); + List actions = actionPool.get(userId); + return actions.stream() + .filter(actionData -> actionData.getStatus().equals(ActionData.ActionStatus.PREPARE)) + .toList(); } @CapabilityMethod @@ -155,6 +140,19 @@ public class ActionCore extends PartnerCore { platformExecutor.execute(() -> adjustNotMatchPassed(notMatchPassed, inputVector, input, vectorClient)); } + @CapabilityMethod + public ExecutorService getExecutor(ExecutorType type) { + return switch (type) { + case VIRTUAL -> virtualExecutor; + case PLATFORM -> platformExecutor; + }; + } + + @CapabilityMethod + public Set getExistedMetaActions() { + return existedMetaActions.keySet(); + } + /** * 命中缓存且评估通过时 * @@ -249,168 +247,7 @@ public class ActionCore extends PartnerCore { return "action-core"; } - @SuppressWarnings("unchecked") - private class ActionWatchService { - - private HashMap registeredPaths = new HashMap<>(); - - public void launch() { - Path path = Path.of(ACTION_PROGRAM); - scanActions(path.toFile()); - launchActionDirectoryWatcher(path); - } - - private void launchActionDirectoryWatcher(Path path) { - WatchService watchService; - try { - watchService = FileSystems.getDefault().newWatchService(); - setupShutdownHook(watchService); - registerParentToWatch(path, watchService); - registerSubToWatch(path, watchService); - virtualExecutor.execute(registerWatchTask(path, watchService)); - } catch (IOException e) { - throw new ActionInitFailedException("行动程序目录监听器启动失败", e); - } - } - - private void setupShutdownHook(WatchService watchService) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - watchService.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - })); - } - - private Runnable registerWatchTask(Path path, WatchService watchService) { - return () -> { - log.info("[{}] 行动程序目录监听器已启动", getCoreKey()); - while (true) { - WatchKey key; - try { - key = watchService.take(); - List> events = key.pollEvents(); - for (WatchEvent e : events) { - WatchEvent event = (WatchEvent) e; - WatchEvent.Kind kind = event.kind(); - Path context = event.context(); - log.info("[{}] 行动程序目录变更事件: {} - {}", getCoreKey(), kind.name(), context.toString()); - Path thisDir = (Path) key.watchable(); - //根据事件发生的目录进行分流,分为父目录事件和子程序事件 - if (thisDir.equals(path)) { - handleParentDirEvent(kind, thisDir, context, watchService); - } else { - handleSubDirEvent(kind, thisDir); - } - } - } catch (InterruptedException e) { - log.info("监听线程被中断,准备退出..."); - Thread.currentThread().interrupt(); // 恢复中断标志 - break; - } catch (ClosedWatchServiceException e) { - log.info("WatchService 已关闭,监听线程退出。"); - break; - } - } - }; - } - - private void handleSubDirEvent(WatchEvent.Kind kind, Path thisDir) { - // path为触发本次行动的文件的路径(当前位于某个action目录下) - // 先判定发生的目录前缀是否匹配(action、desc),否则忽略 - if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) { - // CREATE、MODIFY 事件将触发一次检测,看当前thisDir中action和desc是否都具备,如果通过则尝试加载(put)。 - boolean complete = checkComplete(thisDir); - if (!complete) return; - try { - MetaActionInfo newActionInfo = new MetaActionInfo(thisDir.toFile()); - existedMetaActions.put(thisDir.toString(), newActionInfo); - } catch (ActionLoadFailedException e) { - log.warn("[{}] 行动信息重新加载失败,触发行为: {}", getCoreKey(), kind.name()); - } - } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { - // DELETE 事件将会把该 MetaActionInfo 从记录中移除 - existedMetaActions.remove(thisDir.toString()); - } - } - - private boolean checkComplete(Path thisDir) { - File[] files = thisDir.toFile().listFiles(); - if (files == null) { - log.error("[{}]当前目录无法访问: [{}]", getCoreKey(), thisDir); - return false; - } - boolean existedAction = false; - boolean existedDesc = false; - for (File file : files) { - String fileName = file.getName(); - String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.')); - if (nameWithoutExt.equals("action")) existedAction = true; - else if (nameWithoutExt.equals("desc")) existedDesc = true; - } - return existedAction && existedDesc; - } - - private void handleParentDirEvent(WatchEvent.Kind kind, Path thisDir, Path context, WatchService watchService) { - Path path = Path.of(thisDir.toString(), context.toString()); - // MODIFY 事件不进行处理 - if (kind == StandardWatchEventKinds.ENTRY_CREATE) { - try { - path.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - } catch (IOException e) { - log.error("[{}] 新增行动程序目录监听失败: {}", getCoreKey(), path, e); - } - } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { - WatchKey remove = registeredPaths.remove(path); - remove.cancel(); - } - } - - private void registerSubToWatch(Path path, WatchService watchService) throws IOException { - Files.walkFileTree(path, new SimpleFileVisitor<>() { - @Override - public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs) throws IOException { - if (dir.getFileName().startsWith(".")) return FileVisitResult.CONTINUE; - WatchKey key = dir.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - registeredPaths.put(dir, key); - return FileVisitResult.CONTINUE; - } - }); - } - - private void registerParentToWatch(Path path, WatchService watchService) throws IOException { - WatchKey key = path.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - registeredPaths.put(path, key); - } - - private void scanActions(File file) { - if (!file.exists() || file.isFile()) { - throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath()); - } - File[] files = file.listFiles(); - if (files == null) { - throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath()); - } - for (File f : files) { - try { - MetaActionInfo actionInfo = new MetaActionInfo(f); - existedMetaActions.put(f.getName(), actionInfo); - log.info("[{}] 行动程序[{}]已加载", getCoreKey(), actionInfo.getKey()); - } catch (ActionLoadFailedException e) { - log.warn("[{}] 行动程序未加载: {}", getCoreKey(), e.getMessage()); - } - } - - } + public enum ExecutorType { + VIRTUAL, PLATFORM } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionWatchService.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionWatchService.java new file mode 100644 index 00000000..a10cc59d --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionWatchService.java @@ -0,0 +1,191 @@ +package work.slhaf.partner.core.action; + +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.exception.ActionInitFailedException; +import work.slhaf.partner.core.action.exception.ActionLoadFailedException; + +import java.io.File; +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; + +@SuppressWarnings("unchecked") +@Slf4j +class ActionWatchService { + + private HashMap registeredPaths = new HashMap<>(); + private LinkedHashMap existedMetaActions; + private ExecutorService virtualExecutor; + + public ActionWatchService(LinkedHashMap existedMetaActions, ExecutorService virtualExecutor) { + this.existedMetaActions = existedMetaActions; + this.virtualExecutor = virtualExecutor; + } + + public void launch() { + Path path = Path.of(ACTION_PROGRAM); + scanActions(path.toFile()); + launchActionDirectoryWatcher(path); + } + + private void launchActionDirectoryWatcher(Path path) { + WatchService watchService; + try { + watchService = FileSystems.getDefault().newWatchService(); + setupShutdownHook(watchService); + registerParentToWatch(path, watchService); + registerSubToWatch(path, watchService); + virtualExecutor.execute(registerWatchTask(path, watchService)); + } catch (IOException e) { + throw new ActionInitFailedException("行动程序目录监听器启动失败", e); + } + } + + private void setupShutdownHook(WatchService watchService) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + watchService.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + private Runnable registerWatchTask(Path path, WatchService watchService) { + return () -> { + log.info("行动程序目录监听器已启动"); + while (true) { + WatchKey key; + try { + key = watchService.take(); + List> events = key.pollEvents(); + for (WatchEvent e : events) { + WatchEvent event = (WatchEvent) e; + WatchEvent.Kind kind = event.kind(); + Path context = event.context(); + log.info("行动程序目录变更事件: {} - {}", kind.name(), context.toString()); + Path thisDir = (Path) key.watchable(); + //根据事件发生的目录进行分流,分为父目录事件和子程序事件 + if (thisDir.equals(path)) { + handleParentDirEvent(kind, thisDir, context, watchService); + } else { + handleSubDirEvent(kind, thisDir); + } + } + } catch (InterruptedException e) { + log.info("监听线程被中断,准备退出..."); + Thread.currentThread().interrupt(); // 恢复中断标志 + break; + } catch (ClosedWatchServiceException e) { + log.info("WatchService 已关闭,监听线程退出。"); + break; + } + } + }; + } + + private void handleSubDirEvent(WatchEvent.Kind kind, Path thisDir) { + // path为触发本次行动的文件的路径(当前位于某个action目录下) + // 先判定发生的目录前缀是否匹配(action、desc),否则忽略 + if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) { + // CREATE、MODIFY 事件将触发一次检测,看当前thisDir中action和desc是否都具备,如果通过则尝试加载(put)。 + boolean complete = checkComplete(thisDir); + if (!complete) return; + try { + MetaActionInfo newActionInfo = new MetaActionInfo(thisDir.toFile()); + existedMetaActions.put(thisDir.toString(), newActionInfo); + } catch (ActionLoadFailedException e) { + log.warn("行动信息重新加载失败,触发行为: {}", kind.name()); + } + } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { + // DELETE 事件将会把该 MetaActionInfo 从记录中移除 + existedMetaActions.remove(thisDir.toString()); + } + } + + private boolean checkComplete(Path thisDir) { + File[] files = thisDir.toFile().listFiles(); + if (files == null) { + log.error("[{}]当前目录无法访问: [{}]", thisDir); + return false; + } + boolean existedAction = false; + boolean existedDesc = false; + for (File file : files) { + String fileName = file.getName(); + String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.')); + if (nameWithoutExt.equals("action")) existedAction = true; + else if (nameWithoutExt.equals("desc")) existedDesc = true; + } + return existedAction && existedDesc; + } + + private void handleParentDirEvent(WatchEvent.Kind kind, Path thisDir, Path context, WatchService watchService) { + Path path = Path.of(thisDir.toString(), context.toString()); + // MODIFY 事件不进行处理 + if (kind == StandardWatchEventKinds.ENTRY_CREATE) { + try { + path.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY); + } catch (IOException e) { + log.error("新增行动程序目录监听失败: {}", path, e); + } + } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { + WatchKey remove = registeredPaths.remove(path); + remove.cancel(); + } + } + + private void registerSubToWatch(Path path, WatchService watchService) throws IOException { + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs) throws IOException { + if (dir.getFileName().startsWith(".")) return FileVisitResult.CONTINUE; + WatchKey key = dir.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY); + registeredPaths.put(dir, key); + return FileVisitResult.CONTINUE; + } + }); + } + + private void registerParentToWatch(Path path, WatchService watchService) throws IOException { + WatchKey key = path.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY); + registeredPaths.put(path, key); + } + + private void scanActions(File file) { + if (!file.exists() || file.isFile()) { + throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath()); + } + File[] files = file.listFiles(); + if (files == null) { + throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath()); + } + for (File f : files) { + try { + MetaActionInfo actionInfo = new MetaActionInfo(f); + existedMetaActions.put(f.getName(), actionInfo); + log.info("行动程序[{}]已加载", actionInfo.getKey()); + } catch (ActionLoadFailedException e) { + log.warn("行动程序未加载: {}", e.getMessage()); + } + } + + } +} \ No newline at end of file diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java index 18ddcddc..3bf7aea2 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java @@ -13,7 +13,7 @@ public abstract class ActionData { protected String tendency; protected ActionStatus status; protected List actionChain; - protected String Result; + protected String result; protected String reason; protected String description; diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/MetaAction.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/MetaAction.java index 349bebb8..73587070 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/MetaAction.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/MetaAction.java @@ -6,7 +6,6 @@ import work.slhaf.partner.common.Constant; import java.io.File; import java.nio.file.Path; -import java.util.concurrent.Callable; import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; @@ -14,7 +13,7 @@ import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; * 行动链中的单一元素,实现{@link Runnable}接口,封装了调用外部行动程序的必要信息,可被执行 */ @Data -public class MetaAction implements Comparable, Callable { +public class MetaAction implements Comparable, Runnable { /** * 行动key,用于标识与定位行动程序 @@ -47,13 +46,12 @@ public class MetaAction implements Comparable, Callable { } @Override - public Void call() { + public void run() { File action = loadFromFile(); if (!action.exists()) { result = new Result(); result.setSuccess(false); result.setData("Action file not found: " + action.getAbsolutePath()); - return null; } try { switch (type) { @@ -66,7 +64,6 @@ public class MetaAction implements Comparable, Callable { result.setSuccess(false); result.setData(e.getMessage()); } - return null; } private File loadFromFile() { diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCapability.java b/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCapability.java index 81c4b290..4f9ac125 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCapability.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCapability.java @@ -13,7 +13,6 @@ import java.util.concurrent.locks.Lock; public interface CognationCapability { List getChatMessages(); - void setChatMessages(List chatMessages); void cleanMessage(List messages); Lock getMessageLock(); void addMetaMessage(String userId, MetaMessage metaMessage); diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCore.java index 1157c1a8..675a7e6a 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCore.java @@ -63,11 +63,6 @@ public class CognationCore extends PartnerCore { return currentMemoryId; } - @CapabilityMethod - public void setChatMessages(List chatMessages) { - this.chatMessages = chatMessages; - } - @CapabilityMethod public void cleanMessage(List messages) { messageLock.lock(); diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/ActionDispatcher.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/ActionDispatcher.java index d07b95f3..307afe38 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/ActionDispatcher.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/ActionDispatcher.java @@ -42,7 +42,7 @@ public class ActionDispatcher extends PostRunningModule { //对于将触发的PLANNING action,理想做法是将执行工具做成执行链的形式,模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力,避免绑定固定流程 executor.execute(() -> { String userId = context.getUserId(); - List preparedActions = actionCapability.popPreparedAction(userId); + List preparedActions = actionCapability.listPreparedAction(userId); //分类成PLANNING和IMMEDIATE两类 List scheduledActions = new ArrayList<>(); List immediateActions = new ArrayList<>(); diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java index 8f97e60e..a870ae0c 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java @@ -1,22 +1,120 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor; +import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; +import work.slhaf.partner.api.agent.factory.module.annotation.Init; +import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.core.action.ActionCapability; +import work.slhaf.partner.core.action.ActionCore; +import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.action.entity.ImmediateActionData; +import work.slhaf.partner.core.action.entity.MetaAction; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +@Slf4j @AgentSubModule -public class ActionExecutor extends AgentRunningSubModule, Void> { +public class ActionExecutor extends AgentRunningSubModule, Void> implements ActivateModel { @InjectCapability private ActionCapability actionCapability; + private ExecutorService virtualExecutor; + private ExecutorService platformExecutor; + + @Init + public void init() { + virtualExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL); + platformExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM); + } + @Override public Void execute(List immediateActions) { - + for (ImmediateActionData actionData : immediateActions) { + handleActionData(actionData); + } return null; } + + private void handleActionData(ImmediateActionData actionData) { + virtualExecutor.execute(() -> { + actionData.setStatus(ActionData.ActionStatus.EXECUTING); + List actionChain = actionData.getActionChain(); + actionChain.sort(MetaAction::compareTo); + List virtual = new ArrayList<>(); + List platform = new ArrayList<>(); + int order; + for (int index = 0; index < actionChain.size(); index++) { + MetaAction metaAction = actionChain.get(index); + // 根据io类型放入合适的列表 + if (metaAction.isIo()) { + virtual.add(metaAction); + } else { + platform.add(metaAction); + } + // 记录当前order + order = metaAction.getOrder(); + // 如果下一个行动单元的order与当前不同,则执行并清空当前组内容 + if (actionChain.size() <= (index + 1) || actionChain.get(index + 1).getOrder() != order) { + runGroupAction(virtual, platform, actionChain); + } + } + }); + } + + //TODO 考虑是否使用phaser来承担同组的动态任务新增 + private void runGroupAction(List virtual, List platform, List actionChain) { + boolean first = true; + do { + CountDownLatch latch = new CountDownLatch(virtual.size() + platform.size()); + runGroupAction(virtual, virtualExecutor, actionChain, latch, first); + runGroupAction(platform, platformExecutor, actionChain, latch, first); + try { + latch.await(); + } catch (InterruptedException e) { + log.error("[{}] CountDownLatch被中断", modelKey()); + } + first = false; + } while (!virtual.isEmpty() || !platform.isEmpty()); + } + + private void runGroupAction(List actions, ExecutorService executor, List actionChain, CountDownLatch latch, boolean first) { + if (!first && !new HashSet<>(actionChain).containsAll(actions)) { + // 该部分对应LLM新增本组执行单元时,将其添加至actionChain记录。对于后续组级别的新增,将直接在上一级调用处体现,除了注意并发安全外无需额外处理 + int index = actionChain.indexOf(actions.getLast()); + actionChain.addAll(index, actions); + } + for (MetaAction action : actions) { + executor.execute(() -> { + boolean success = true; + do { + // 该循环对应LLM的调整参数后重试 + if (!success) { + //TODO LLM决策是重构参数、执行自对话反思、还是选择向用户求助(通过cognationCore暴露方法,可能需要修改其他模块以进行适应) + + } + action.run(); + success = action.getResult().isSuccess(); + } while (!success); + latch.countDown(); + //TODO 将执行结果写入特定对话角色记忆(cognationCore暴露方法) + }); + } + } + + @Override + public String modelKey() { + return "action_executor"; + } + + @Override + public boolean withBasicPrompt() { + return false; + } }