From 4b852e0049868e6f7d49b33ff2509b3001b94b62 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Mon, 15 Dec 2025 21:54:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E8=BF=9B=20ActionExecutor=20=E4=B8=8B?= =?UTF-8?q?=E7=9A=84=E2=80=98=E8=A1=8C=E5=8A=A8=E7=94=9F=E6=88=90=E4=B8=8E?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E2=80=99=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 RunnerClient 抽象类,并划分 SandboxRunnerClient、LocalRunnerClient两个子类(内容待完善)。前者负责对接 SandboxRunner 模块,后者直接使用本地作为执行环境(但不推荐)。 - 将 ActionWatchService 划为 LocalRunnerClient 的内部类,负责采用本地执行环境时,监听行动程序变化 - 完善 ActionRepairer 处的修复逻辑 - 调整 MetaAction 中路径获取逻辑 这提交方式真该调整一下了,这阶段推进容易攒太多,但又不好停手。或许阶段目标可以保留,但推进点应该可以细化🤔 --- .../work/slhaf/partner/common/Constant.java | 1 + .../partner/core/action/ActionCapability.java | 3 +- .../slhaf/partner/core/action/ActionCore.java | 11 +- .../core/action/ActionWatchService.java | 192 ----------- .../core/action/SandboxRunnerClient.java | 36 -- .../core/action/entity/GeneratedData.java | 15 + .../core/action/entity/MetaAction.java | 18 +- .../core/action/runner/LocalRunnerClient.java | 320 ++++++++++++++++++ .../core/action/runner/RunnerClient.java | 108 ++++++ .../action/runner/SandboxRunnerClient.java | 54 +++ .../dispatcher/executor/ActionExecutor.java | 7 +- .../dispatcher/executor/ActionRepairer.java | 29 +- .../executor/DynamicActionGenerator.java | 70 ++-- .../dispatcher/executor/ParamsExtractor.java | 7 +- .../executor/entity/ExtractorResult.java | 4 +- .../executor/entity/GeneratorInput.java | 4 +- .../handler/InterventionHandler.java | 22 +- Partner-Main/src/test/java/SystemTest.java | 47 +++ 18 files changed, 652 insertions(+), 296 deletions(-) delete mode 100644 Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionWatchService.java delete mode 100644 Partner-Main/src/main/java/work/slhaf/partner/core/action/SandboxRunnerClient.java create mode 100644 Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/GeneratedData.java create mode 100644 Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java create mode 100644 Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java create mode 100644 Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/SandboxRunnerClient.java create mode 100644 Partner-Main/src/test/java/SystemTest.java diff --git a/Partner-Main/src/main/java/work/slhaf/partner/common/Constant.java b/Partner-Main/src/main/java/work/slhaf/partner/common/Constant.java index 0707b3d3..fd7d3985 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/common/Constant.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/common/Constant.java @@ -6,6 +6,7 @@ public final class Constant { public static final String DATA = "./data"; public static final String MEMORY_DATA = DATA + "/memory"; public static final String ACTION_PROGRAM = DATA + "/action"; + public static final String TMP_ACTION_DIR_LOCAL = DATA + "/tmp"; } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java index 7c021c06..27b55d4c 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java @@ -7,6 +7,7 @@ import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.PhaserRecord; import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; +import work.slhaf.partner.core.action.runner.SandboxRunnerClient; import java.util.List; import java.util.concurrent.ExecutorService; @@ -46,5 +47,5 @@ public interface ActionCapability { boolean checkExists(String... actionKeys); - void execute(MetaAction metaAction); + SandboxRunnerClient runnerClient(); } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java index f1a34332..0dde618a 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -16,6 +16,8 @@ import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData; import work.slhaf.partner.core.action.exception.ActionDataNotFoundException; import work.slhaf.partner.core.action.exception.MetaActionNotFoundException; +import work.slhaf.partner.core.action.runner.RunnerClient; +import work.slhaf.partner.core.action.runner.SandboxRunnerClient; import java.io.IOException; import java.util.*; @@ -57,10 +59,11 @@ public class ActionCore extends PartnerCore { */ private final Map existedMetaActions = new HashMap<>(); private final List phaserRecords = new ArrayList<>(); - private final SandboxRunnerClient sandboxRunnerClient = new SandboxRunnerClient(); + private RunnerClient runnerClient; public ActionCore() throws IOException, ClassNotFoundException { - new ActionWatchService(existedMetaActions, virtualExecutor).launch(); + // TODO 通过 AgentConfigManager指定 + runnerClient = new SandboxRunnerClient(existedMetaActions, virtualExecutor); setupShutdownHook(); } @@ -248,8 +251,8 @@ public class ActionCore extends PartnerCore { } @CapabilityMethod - public void execute(MetaAction metaAction) { - sandboxRunnerClient.run(metaAction); + public RunnerClient runnerClient() { + return runnerClient; } /** diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionWatchService.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionWatchService.java deleted file mode 100644 index 9b265f0e..00000000 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionWatchService.java +++ /dev/null @@ -1,192 +0,0 @@ -package work.slhaf.partner.core.action; - -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import work.slhaf.partner.core.action.entity.MetaActionInfo; -import work.slhaf.partner.core.action.exception.ActionInitFailedException; -import work.slhaf.partner.core.action.exception.ActionLoadFailedException; - -import java.io.File; -import java.io.IOException; -import java.nio.file.*; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; - -import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; - -// TODO 后续需迁移至 SandboxRunner,作为容器内的监听逻辑 -@SuppressWarnings("unchecked") -@Slf4j -class ActionWatchService { - - private final HashMap registeredPaths = new HashMap<>(); - private final Map existedMetaActions; - private final ExecutorService virtualExecutor; - - public ActionWatchService(Map existedMetaActions, ExecutorService virtualExecutor) { - this.existedMetaActions = existedMetaActions; - this.virtualExecutor = virtualExecutor; - } - - public void launch() { - Path path = Path.of(ACTION_PROGRAM); - scanActions(path.toFile()); - launchActionDirectoryWatcher(path); - } - - private void launchActionDirectoryWatcher(Path path) { - WatchService watchService; - try { - watchService = FileSystems.getDefault().newWatchService(); - setupShutdownHook(watchService); - registerParentToWatch(path, watchService); - registerSubToWatch(path, watchService); - virtualExecutor.execute(registerWatchTask(path, watchService)); - } catch (IOException e) { - throw new ActionInitFailedException("行动程序目录监听器启动失败", e); - } - } - - private void setupShutdownHook(WatchService watchService) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - watchService.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - })); - } - - private Runnable registerWatchTask(Path path, WatchService watchService) { - return () -> { - log.info("行动程序目录监听器已启动"); - while (true) { - WatchKey key; - try { - key = watchService.take(); - List> events = key.pollEvents(); - for (WatchEvent e : events) { - WatchEvent event = (WatchEvent) e; - WatchEvent.Kind kind = event.kind(); - Path context = event.context(); - log.info("行动程序目录变更事件: {} - {}", kind.name(), context.toString()); - Path thisDir = (Path) key.watchable(); - //根据事件发生的目录进行分流,分为父目录事件和子程序事件 - if (thisDir.equals(path)) { - handleParentDirEvent(kind, thisDir, context, watchService); - } else { - handleSubDirEvent(kind, thisDir); - } - } - } catch (InterruptedException e) { - log.info("监听线程被中断,准备退出..."); - Thread.currentThread().interrupt(); // 恢复中断标志 - break; - } catch (ClosedWatchServiceException e) { - log.info("WatchService 已关闭,监听线程退出。"); - break; - } - } - }; - } - - private void handleSubDirEvent(WatchEvent.Kind kind, Path thisDir) { - // path为触发本次行动的文件的路径(当前位于某个action目录下) - // 先判定发生的目录前缀是否匹配(action、desc),否则忽略 - if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) { - // CREATE、MODIFY 事件将触发一次检测,看当前thisDir中action和desc是否都具备,如果通过则尝试加载(put)。 - boolean complete = checkComplete(thisDir); - if (!complete) return; - try { - MetaActionInfo newActionInfo = new MetaActionInfo(thisDir.toFile()); - existedMetaActions.put(thisDir.toString(), newActionInfo); - } catch (ActionLoadFailedException e) { - log.warn("行动信息重新加载失败,触发行为: {}", kind.name()); - } - } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { - // DELETE 事件将会把该 MetaActionInfo 从记录中移除 - existedMetaActions.remove(thisDir.toString()); - } - } - - private boolean checkComplete(Path thisDir) { - File[] files = thisDir.toFile().listFiles(); - if (files == null) { - log.error("当前目录无法访问: [{}]", thisDir); - return false; - } - boolean existedAction = false; - boolean existedDesc = false; - for (File file : files) { - String fileName = file.getName(); - String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.')); - if (nameWithoutExt.equals("action")) existedAction = true; - else if (nameWithoutExt.equals("desc")) existedDesc = true; - } - return existedAction && existedDesc; - } - - private void handleParentDirEvent(WatchEvent.Kind kind, Path thisDir, Path context, WatchService watchService) { - Path path = Path.of(thisDir.toString(), context.toString()); - // MODIFY 事件不进行处理 - if (kind == StandardWatchEventKinds.ENTRY_CREATE) { - try { - path.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - } catch (IOException e) { - log.error("新增行动程序目录监听失败: {}", path, e); - } - } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { - WatchKey remove = registeredPaths.remove(path); - remove.cancel(); - } - } - - private void registerSubToWatch(Path path, WatchService watchService) throws IOException { - Files.walkFileTree(path, new SimpleFileVisitor<>() { - @Override - public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs) throws IOException { - if (dir.getFileName().startsWith(".")) return FileVisitResult.CONTINUE; - WatchKey key = dir.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - registeredPaths.put(dir, key); - return FileVisitResult.CONTINUE; - } - }); - } - - private void registerParentToWatch(Path path, WatchService watchService) throws IOException { - WatchKey key = path.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - registeredPaths.put(path, key); - } - - private void scanActions(File file) { - if (!file.exists() || file.isFile()) { - throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath()); - } - File[] files = file.listFiles(); - if (files == null) { - throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath()); - } - for (File f : files) { - try { - MetaActionInfo actionInfo = new MetaActionInfo(f); - existedMetaActions.put(f.getName(), actionInfo); - log.info("行动程序[{}]已加载", actionInfo.getKey()); - } catch (ActionLoadFailedException e) { - log.warn("行动程序未加载: {}", e.getMessage()); - } - } - - } -} \ No newline at end of file diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/SandboxRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/SandboxRunnerClient.java deleted file mode 100644 index c13f7539..00000000 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/SandboxRunnerClient.java +++ /dev/null @@ -1,36 +0,0 @@ -package work.slhaf.partner.core.action; - -import work.slhaf.partner.core.action.entity.MetaAction; - -import java.nio.file.Path; - -/** - * 基于 Http 与 WebSocket 的沙盒执行器客户端,负责: - *
    - *
  • - * 发送行动单元数据 - *
  • - *
  • - * 实时更新获取已存在行动列表 - *
  • - *
  • - * 向传入的 MetaAction 回写执行结果 - *
  • - *
- */ -class SandboxRunnerClient { - - public SandboxRunnerClient() { - // 连接沙盒执行器(websocket) - } - - public void run(MetaAction metaAction) { - // 获取已存在行动列表 - Path path = metaAction.checkAndGetPath(); - if (!metaAction.getResult().isSuccess()) { - return; - } - // 调用沙盒执行器 - } - -} diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/GeneratedData.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/GeneratedData.java new file mode 100644 index 00000000..512e33c8 --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/GeneratedData.java @@ -0,0 +1,15 @@ +package work.slhaf.partner.core.action.entity; + +import com.alibaba.fastjson2.JSONObject; +import lombok.Data; + +import java.util.List; + +@Data +public class GeneratedData { + private List dependencies; + private String code; + private String codeType; + private boolean serialize; + private JSONObject responseSchema; +} diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/MetaAction.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/MetaAction.java index c1935052..14bbe1c0 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/MetaAction.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/MetaAction.java @@ -1,14 +1,12 @@ package work.slhaf.partner.core.action.entity; -import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; +import lombok.Data; +import org.jetbrains.annotations.NotNull; -import java.io.File; import java.nio.file.Path; import java.util.Map; -import org.jetbrains.annotations.NotNull; - -import lombok.Data; +import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; /** * 行动链中的单一元素,封装了调用外部行动程序的必要信息与结果容器,可被{@link work.slhaf.partner.core.action.ActionCapability}执行 @@ -27,7 +25,7 @@ public class MetaAction implements Comparable { /** * 行动结果,包括执行状态和相应内容(执行结果或者错误信息) */ - private final Result result = new Result(); + private Result result = new Result(); /** * 执行顺序,升序排列 */ @@ -44,18 +42,12 @@ public class MetaAction implements Comparable { private Path path; - public Path checkAndGetPath() { + public void resetPath() { path = switch (type) { case PLUGIN -> Path.of(ACTION_PROGRAM, key, "action.jar"); case SCRIPT -> Path.of(ACTION_PROGRAM, key, "action.py"); case MCP -> Path.of(ACTION_PROGRAM, key, "action.json"); }; - File action = path.toFile(); - if (!action.exists()) { - result.setStatus(ResultStatus.FAILED); - result.setData("Action file not found: " + action.getAbsolutePath()); - } - return path; } @Override diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java new file mode 100644 index 00000000..46d11aed --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -0,0 +1,320 @@ +package work.slhaf.partner.core.action.runner; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.entity.MetaActionType; +import work.slhaf.partner.core.action.exception.ActionInitFailedException; +import work.slhaf.partner.core.action.exception.ActionLoadFailedException; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; +import static work.slhaf.partner.common.Constant.Path.TMP_ACTION_DIR_LOCAL; + +@Slf4j +public class LocalRunnerClient extends RunnerClient { + + public LocalRunnerClient(Map existedMetaActions, ExecutorService executor) { + super(existedMetaActions, executor); + ActionWatchService watchService = new ActionWatchService(); + watchService.launch(); + } + + @Override + protected RunnerResponse doRun(MetaAction metaAction) { + RunnerResponse response; + try { + // 由于三种方式返回的内容结构变化太大,所以选择油具体执行逻辑返回真正的 Response 对象 + response = switch (metaAction.getType()) { + case MetaActionType.SCRIPT -> doRunWithScript(metaAction); + case MetaActionType.MCP -> doRunWithMcp(metaAction); + case MetaActionType.PLUGIN -> doRunWithPlugin(metaAction); + }; + } catch (Exception e) { + response = new RunnerResponse(); + response.setOk(false); + response.setData(e.getLocalizedMessage()); + } + return response; + } + + private RunnerResponse doRunWithMcp(MetaAction metaAction) { + RunnerResponse response = new RunnerResponse(); + + return response; + } + + private RunnerResponse doRunWithPlugin(MetaAction metaAction) { + RunnerResponse response = new RunnerResponse(); + + return response; + } + + private RunnerResponse doRunWithScript(MetaAction metaAction) { + RunnerResponse response = new RunnerResponse(); + + return response; + } + + @Override + protected Path doBuildTempPath(MetaAction tempAction, String codeType) { + return Path.of(TMP_ACTION_DIR_LOCAL, System.currentTimeMillis() + "-" + tempAction.getKey() + codeType); + } + + @Override + protected void doSerialize(MetaAction tempAction, String code, String codeType) throws IOException { + Path path = tempAction.getPath(); + File file = path.toFile(); + file.createNewFile(); + Files.writeString(path, code); + } + + @Override + public JSONObject listSysDependencies() { + // 先只列出系统/环境的 Python 依赖 + // TODO 在 AgentConfigManager 内配置启用的脚本语言及对应的扩展名 + // 这里的逻辑后续需要替换为“根据 AgentConfigManager 读取到的脚本语言启用情况,遍历并列出当前系统环境依赖” + // 还需要将返回值调整为相应的数据类 + // 后续还需要将不同语言的处理逻辑分散到不同方法内,这里为了验证,先写死在当前方法 + JSONObject sysDependencies = new JSONObject(); + sysDependencies.put("language", "Python"); + JSONArray dependencies = sysDependencies.putArray("dependencies"); + SystemExecResult pyResult = exec("pip", "li", "--format=feeze"); + System.out.println(pyResult); + if (pyResult.isOk()) { + List resultList = pyResult.getResultList(); + for (String result : resultList) { + JSONObject element = dependencies.addObject(); + String[] split = result.split("=="); + element.put("name", split[0]); + element.put("version", split[1]); + } + } else { + JSONObject element = dependencies.addObject(); + element.put("error", pyResult.getTotal()); + } + return sysDependencies; + } + + private SystemExecResult exec(String... command) { + SystemExecResult result = new SystemExecResult(); + + List resultList = new ArrayList<>(); + result.setResultList(resultList); + StringBuilder s = new StringBuilder(); + + try { + Process process = Runtime.getRuntime().exec(command); + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + String line; + while ((line = reader.readLine()) != null) { + s.append(line); + resultList.add(line); + } + int exitCode = process.waitFor(); + result.setOk(exitCode == 0); + result.setTotal(s.toString().isEmpty() ? "响应为空" : s.toString()); + } catch (Exception e) { + result.setOk(false); + result.setTotal(e.getLocalizedMessage()); + } + if (result.getTotal().isEmpty()) { + result.setOk(false); + } + return result; + } + + @Data + private static class SystemExecResult { + private boolean ok; + private String total; + private List resultList; + } + + private class ActionWatchService { + + private final HashMap registeredPaths = new HashMap<>(); + + private void launch() { + Path path = Path.of(ACTION_PROGRAM); + scanActions(path.toFile()); + launchActionDirectoryWatcher(path); + } + + private void launchActionDirectoryWatcher(Path path) { + WatchService watchService; + try { + watchService = FileSystems.getDefault().newWatchService(); + setupShutdownHook(watchService); + registerParentToWatch(path, watchService); + registerSubToWatch(path, watchService); + executor.execute(registerWatchTask(path, watchService)); + } catch (IOException e) { + throw new ActionInitFailedException("行动程序目录监听器启动失败", e); + } + } + + private void setupShutdownHook(WatchService watchService) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + watchService.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + private Runnable registerWatchTask(Path path, WatchService watchService) { + return () -> { + log.info("行动程序目录监听器已启动"); + while (true) { + WatchKey key; + try { + key = watchService.take(); + List> events = key.pollEvents(); + for (WatchEvent e : events) { + @SuppressWarnings("unchecked") + WatchEvent event = (WatchEvent) e; + WatchEvent.Kind kind = event.kind(); + Path context = event.context(); + log.info("行动程序目录变更事件: {} - {}", kind.name(), context.toString()); + Path thisDir = (Path) key.watchable(); + // 根据事件发生的目录进行分流,分为父目录事件和子程序事件 + if (thisDir.equals(path)) { + handleParentDirEvent(kind, thisDir, context, watchService); + } else { + handleSubDirEvent(kind, thisDir); + } + } + } catch (InterruptedException e) { + log.info("监听线程被中断,准备退出..."); + Thread.currentThread().interrupt(); // 恢复中断标志 + break; + } catch (ClosedWatchServiceException e) { + log.info("WatchService 已关闭,监听线程退出。"); + break; + } + } + }; + } + + private void handleSubDirEvent(WatchEvent.Kind kind, Path thisDir) { + // path为触发本次行动的文件的路径(当前位于某个action目录下) + // 先判定发生的目录前缀是否匹配(action、desc),否则忽略 + if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) { + // CREATE、MODIFY 事件将触发一次检测,看当前thisDir中action和desc是否都具备,如果通过则尝试加载(put)。 + boolean complete = checkComplete(thisDir); + if (!complete) + return; + try { + MetaActionInfo newActionInfo = new MetaActionInfo(thisDir.toFile()); + existedMetaActions.put(thisDir.toString(), newActionInfo); + } catch (ActionLoadFailedException e) { + log.warn("行动信息重新加载失败,触发行为: {}", kind.name()); + } + } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { + // DELETE 事件将会把该 MetaActionInfo 从记录中移除 + existedMetaActions.remove(thisDir.toString()); + } + } + + private boolean checkComplete(Path thisDir) { + File[] files = thisDir.toFile().listFiles(); + if (files == null) { + log.error("当前目录无法访问: [{}]", thisDir); + return false; + } + boolean existedAction = false; + boolean existedDesc = false; + for (File file : files) { + String fileName = file.getName(); + String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.')); + if (nameWithoutExt.equals("action")) + existedAction = true; + else if (nameWithoutExt.equals("desc")) + existedDesc = true; + } + return existedAction && existedDesc; + } + + private void handleParentDirEvent(WatchEvent.Kind kind, Path thisDir, Path context, + WatchService watchService) { + Path path = Path.of(thisDir.toString(), context.toString()); + // MODIFY 事件不进行处理 + if (kind == StandardWatchEventKinds.ENTRY_CREATE) { + try { + path.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY); + } catch (IOException e) { + log.error("新增行动程序目录监听失败: {}", path, e); + } + } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { + WatchKey remove = registeredPaths.remove(path); + remove.cancel(); + } + } + + private void registerSubToWatch(Path path, WatchService watchService) throws IOException { + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs) + throws IOException { + if (dir.getFileName().startsWith(".")) + return FileVisitResult.CONTINUE; + WatchKey key = dir.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY); + registeredPaths.put(dir, key); + return FileVisitResult.CONTINUE; + } + }); + } + + private void registerParentToWatch(Path path, WatchService watchService) throws IOException { + WatchKey key = path.register(watchService, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY); + registeredPaths.put(path, key); + } + + private void scanActions(File file) { + if (!file.exists() || file.isFile()) { + throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath()); + } + File[] files = file.listFiles(); + if (files == null) { + throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath()); + } + for (File f : files) { + try { + MetaActionInfo actionInfo = new MetaActionInfo(f); + existedMetaActions.put(f.getName(), actionInfo); + log.info("行动程序[{}]已加载", actionInfo.getKey()); + } catch (ActionLoadFailedException e) { + log.warn("行动程序加载失败", e); + } + } + + } + + } +} diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java new file mode 100644 index 00000000..92c5799f --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java @@ -0,0 +1,108 @@ +package work.slhaf.partner.core.action.runner; + +import com.alibaba.fastjson2.JSONObject; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import work.slhaf.partner.core.action.entity.GeneratedData; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.action.entity.MetaAction.Result; +import work.slhaf.partner.core.action.entity.MetaAction.ResultStatus; +import work.slhaf.partner.core.action.entity.MetaActionInfo; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +@Slf4j +public abstract class RunnerClient { + + protected final Map existedMetaActions; + protected final ExecutorService executor; + + /** + * ActionCore 将注入虚拟线程池 + */ + public RunnerClient(Map existedMetaActions, ExecutorService executor) { + this.existedMetaActions = existedMetaActions; + this.executor = executor; + } + + /** + * 执行行动程序 + */ + public void run(MetaAction metaAction) { + // 获取已存在行动列表 + Result result = metaAction.getResult(); + if (!result.getStatus().equals(ResultStatus.WAITING)) { + return; + } + RunnerResponse response = doRun(metaAction); + result.setData(response.getData()); + result.setStatus(response.isOk() ? ResultStatus.SUCCESS : ResultStatus.FAILED); + } + + //TODO 将执行划分为 MCP、OriginalScript两种类型,SCRIPT、PLUGIN、MCP的分类不再必要 + protected abstract RunnerResponse doRun(MetaAction metaAction); + + /** + * 将临时行动程序放入等待队列,根据其是否需要持久序列化,监听其执行状态,执行成功则持久序列化 + * + * @throws IOException + */ + public Path getPathAndSerialize(MetaAction tempAction, GeneratedData generatedData) throws IOException { + String code = generatedData.getCode(); + String codeType = generatedData.getCodeType(); + + Path path = doBuildTempPath(tempAction, codeType); + tempAction.setPath(path); + doSerialize(tempAction, code, codeType); + if (generatedData.isSerialize()) { + waitingSerialize(tempAction, code, codeType, generatedData.getResponseSchema()); + } + return path; + } + + private void waitingSerialize(MetaAction tempAction, String code, String codeType, JSONObject jsonObject) { + executor.execute(() -> { + Result result = tempAction.getResult(); + while (true) { + switch (result.getStatus()) { + case ResultStatus.WAITING -> { + try { + Thread.sleep(300); + } catch (InterruptedException ignored) { + } + } + case ResultStatus.FAILED -> { + break; + } + case ResultStatus.SUCCESS -> { + tempAction.resetPath(); + try { + doSerialize(tempAction, code, codeType); + } catch (IOException e) { + log.error("行动程序序列化出错: {}", tempAction.getKey(), e); + } + } + } + } + }); + + } + + protected abstract Path doBuildTempPath(MetaAction tempAction, String codeType); + + protected abstract void doSerialize(MetaAction tempAction, String code, String codeType) throws IOException; + + /** + * 列出执行环境下的系统依赖情况 + */ + public abstract JSONObject listSysDependencies(); + + @Data + protected static class RunnerResponse { + private boolean ok; + private String data; + } +} \ No newline at end of file diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/SandboxRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/SandboxRunnerClient.java new file mode 100644 index 00000000..4013a696 --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/SandboxRunnerClient.java @@ -0,0 +1,54 @@ +package work.slhaf.partner.core.action.runner; + +import com.alibaba.fastjson2.JSONObject; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.action.entity.MetaActionInfo; + +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +/** + * 基于 Http 与 WebSocket 的沙盒执行器客户端,负责: + *
    + *
  • + * 发送行动单元数据 + *
  • + *
  • + * 实时更新获取已存在行动列表 + *
  • + *
  • + * 向传入的 MetaAction 回写执行结果 + *
  • + *
+ */ +public class SandboxRunnerClient extends RunnerClient { + + @Override + protected Path doBuildTempPath(MetaAction tempAction, String codeType) { + // TODO Auto-generated method stub + return null; + } + + @Override + protected void doSerialize(MetaAction tempAction, String code, String codeType) { + // TODO Auto-generated method stub + + } + + public SandboxRunnerClient(Map existedMetaActions, ExecutorService executor) { // 连接沙盒执行器(websocket) + super(existedMetaActions, executor); + } + + public RunnerResponse doRun(MetaAction metaAction) { + // 调用沙盒执行器 + return null; + } + + @Override + public JSONObject listSysDependencies() { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java index 5d3138bd..bd01e0f4 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java @@ -11,6 +11,7 @@ import work.slhaf.partner.core.action.ActionCore; import work.slhaf.partner.core.action.entity.*; import work.slhaf.partner.core.action.entity.ActionData.ActionStatus; import work.slhaf.partner.core.action.entity.MetaAction.ResultStatus; +import work.slhaf.partner.core.action.runner.RunnerClient; import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.*; @@ -43,6 +44,7 @@ public class ActionExecutor extends AgentRunningSubModule { try { - actionCapability.execute(action); + runnerClient.run(action); result.getFixedData().add(action.getResult().getData()); } catch (Exception e) { log.error("行动单元执行失败: {}", key, e); diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/DynamicActionGenerator.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/DynamicActionGenerator.java index f31038de..d9d6e854 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/DynamicActionGenerator.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/DynamicActionGenerator.java @@ -1,19 +1,22 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor; -import java.util.List; - import com.alibaba.fastjson2.JSONObject; - -import lombok.Data; +import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; +import work.slhaf.partner.api.agent.factory.module.annotation.Init; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.api.chat.pojo.ChatResponse; -import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput; -import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult; import work.slhaf.partner.common.util.ExtractUtil; +import work.slhaf.partner.core.action.ActionCapability; +import work.slhaf.partner.core.action.entity.GeneratedData; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionType; +import work.slhaf.partner.core.action.runner.RunnerClient; +import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput; +import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult; + +import java.nio.file.Path; /** * 负责依据输入内容生成可执行的动态行动单元,并选择是否持久化至 SandboxRunner 容器内 @@ -22,27 +25,38 @@ import work.slhaf.partner.core.action.entity.MetaActionType; public class DynamicActionGenerator extends AgentRunningSubModule implements ActivateModel { + @InjectCapability + private ActionCapability actionCapability; + + private RunnerClient runnerClient; + + @Init + void init() { + runnerClient = actionCapability.runnerClient(); + } + @Override public GeneratorResult execute(GeneratorInput input) { GeneratorResult result = new GeneratorResult(); - // 由于 SCRIPT 类型程序都是在 SandboxRunner 内部的磁盘上加载然后执行的, - // 所以此处的输入内容也只需要指定输入参数、临时key、是否持久化即可,路径将按照指定规则统一构建,不可交给LLM生成 - String prompt = buildPrompt(input); - // 响应结果需要包含几个特殊数据: 依赖项、代码内容、是否序列化、响应数据释义 - ChatResponse response = this.singleChat(prompt); - GeneratorResponseData generatorData = JSONObject - .parseObject(ExtractUtil.extractJson(response.getMessage()), GeneratorResponseData.class); - MetaAction tempAction = buildAction(input); - waitingSerialize(tempAction, generatorData); - result.setTempAction(tempAction); - return null; - } - - /** - * 将临时行动单元序列化至临时文件夹,并设置程序路径、放置在队列中,等待执行状态变化,并根据序列化选项选择是否补充 MetaActionInfo 并持久序列化 - */ - private void waitingSerialize(MetaAction tempAction, GeneratorResponseData generatorData) { - + try { + // 由于 SCRIPT 类型程序都是在 SandboxRunner 内部的磁盘上加载然后执行的, + // 所以此处的输入内容也只需要指定输入参数、临时key、是否持久化即可,路径将按照指定规则统一构建,不可交给LLM生成 + String prompt = buildPrompt(input); + // 响应结果需要包含几个特殊数据: 依赖项、代码内容、是否序列化、响应数据释义 + ChatResponse response = this.singleChat(prompt); + GeneratedData generatorData = JSONObject + .parseObject(ExtractUtil.extractJson(response.getMessage()), GeneratedData.class); + MetaAction tempAction = buildAction(input); + // 将临时行动单元序列化至临时文件夹,并设置程序路径、放置在队列中,等待执行状态变化,并根据序列化选项选择是否补充 MetaActionInfo 并持久序列化 + // 通过 ActionCapability 暴露的接口,序列化至临时文件夹,同时返回Path对象并设置。队列建议交给 SandboxRunner + // 持有,包括监听与序列化线程 + Path path = runnerClient.getPathAndSerialize(tempAction, generatorData); + tempAction.setPath(path); + result.setTempAction(tempAction); + } catch (Exception e) { + result.setTempAction(null); + } + return result; } private MetaAction buildAction(GeneratorInput input) { @@ -72,12 +86,4 @@ public class DynamicActionGenerator extends AgentRunningSubModule dependencies; - private String code; - private boolean serialize; - private JSONObject responseSchema; - } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ParamsExtractor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ParamsExtractor.java index ff527d21..25a8d5cc 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ParamsExtractor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ParamsExtractor.java @@ -1,11 +1,7 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor; -import java.util.HashMap; -import java.util.List; - import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; - import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; @@ -16,6 +12,9 @@ import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.Extra import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorResult; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.HistoryAction; +import java.util.HashMap; +import java.util.List; + /** * 负责依据输入内容进行行动单元的参数信息提取 */ diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ExtractorResult.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ExtractorResult.java index 9ddf9060..f36bf561 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ExtractorResult.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ExtractorResult.java @@ -1,9 +1,9 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; -import java.util.Map; - import lombok.Data; +import java.util.Map; + @Data public class ExtractorResult { private boolean ok; diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/GeneratorInput.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/GeneratorInput.java index 0d84f40f..15cc1cb7 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/GeneratorInput.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/GeneratorInput.java @@ -1,9 +1,9 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; -import java.util.Map; - import lombok.Data; +import java.util.Map; + @Data public class GeneratorInput { private String key; diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java index 01eba94e..e30897a1 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java @@ -3,6 +3,7 @@ package work.slhaf.partner.module.modules.action.interventor.handler; import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; +import work.slhaf.partner.api.agent.factory.module.annotation.Init; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCore.ExecutorType; @@ -10,6 +11,7 @@ import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.action.entity.ActionData.ActionStatus; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.PhaserRecord; +import work.slhaf.partner.core.action.runner.SandboxRunnerClient; import work.slhaf.partner.module.modules.action.interventor.entity.InterventionType; import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput; @@ -30,6 +32,13 @@ public class InterventionHandler extends AgentRunningSubModule actionCapability.loadMetaAction(actionKey)) .toList(); - //TODO 需要将干预逻辑下放至 ActionCapability ,因为 ActionExecutor 中也存在干预操作 + // TODO 需要将干预逻辑下放至 ActionCapability ,因为 ActionExecutor 中也存在干预操作 switch (intervention.getType()) { case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions); case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions, phaser); @@ -118,7 +127,8 @@ public class InterventionHandler extends AgentRunningSubModule actions) { - if (order <= actionData.getExecutingStage()) return; + if (order <= actionData.getExecutingStage()) + return; actionData.getActionChain().put(order, actions); } @@ -127,7 +137,8 @@ public class InterventionHandler extends AgentRunningSubModule actions, Phaser phaser) { - if (order < actionData.getExecutingStage()) return; + if (order < actionData.getExecutingStage()) + return; phaser.register(); try { @@ -144,7 +155,7 @@ public class InterventionHandler extends AgentRunningSubModule { try { - actionCapability.execute(action); + runnerClient.run(action); } finally { phaser.arriveAndDeregister(); } @@ -158,7 +169,8 @@ public class InterventionHandler extends AgentRunningSubModule actions) { - if (order <= actionData.getExecutingStage()) return; + if (order <= actionData.getExecutingStage()) + return; Map> actionChain = actionData.getActionChain(); if (actionChain.containsKey(order)) { diff --git a/Partner-Main/src/test/java/SystemTest.java b/Partner-Main/src/test/java/SystemTest.java new file mode 100644 index 00000000..b618dae8 --- /dev/null +++ b/Partner-Main/src/test/java/SystemTest.java @@ -0,0 +1,47 @@ +import com.alibaba.fastjson2.JSONObject; +import org.junit.jupiter.api.Test; +import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.runner.LocalRunnerClient; +import work.slhaf.partner.core.action.runner.RunnerClient; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class SystemTest { + @Test + void execTest() { + // exec("pwd"); + // exec("ls", "-la"); + String r = exec("pip", "st", "--format=freeze"); + System.out.println(r); + } + + private String exec(String... command) { + StringBuilder s = new StringBuilder(); + ProcessBuilder processBuilder = new ProcessBuilder(command); + try { + Process process = processBuilder.start(); + java.io.InputStream inputStream = process.getInputStream(); + java.util.Scanner scanner = new java.util.Scanner(inputStream).useDelimiter("\\A"); + if (scanner.hasNext()) { + s.append(scanner.next()); + } + } catch (IOException e) { + e.printStackTrace(); + } + return s.toString(); + } + + @Test + void localRunnerClientTest() { + Map existedMetaActions = new HashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + RunnerClient client = new LocalRunnerClient(existedMetaActions, executor); + JSONObject res = client.listSysDependencies(); + System.out.println(res.toString()); + } + +}