From 64b907707a4aa295382295c57e19db6893a69ef1 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Wed, 31 Dec 2025 23:11:15 +0800 Subject: [PATCH] refactor(LocalRunnerClient): introduce WatchContext and decouple build/processor state --- .../core/action/runner/LocalRunnerClient.java | 107 +++++++++--------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index 0f65160e..20caec3b 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -47,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import java.util.stream.Stream; import static work.slhaf.partner.common.util.PathUtil.buildPathStr; @@ -280,10 +281,6 @@ public class LocalRunnerClient extends RunnerClient { })); } - private LocalWatchServiceBuild registerWatchService(Path path) { - return new LocalWatchServiceBuild.BuildRegistry(path, watchService, executor); - } - private interface LocalWatchServiceBuild { LocalWatchServiceBuild registerCreate(EventHandler handler); @@ -295,28 +292,25 @@ public class LocalRunnerClient extends RunnerClient { LocalWatchServiceBuild initialLoad(InitLoader loader); - void commit(); + LocalWatchServiceBuild watchAll(boolean watchAll); + + void commit(ExecutorService executor); interface EventHandler { void handle(Path thisDir, Path context); } interface InitLoader { - void load(Path path); + void load(); } class BuildRegistry implements LocalWatchServiceBuild { private final Map, EventHandler> handlers = new HashMap<>(); - private final Path path; - private final WatchService watchService; - private final ExecutorService executor; private InitLoader initLoader; - private BuildRegistry(Path path, WatchService watchService, ExecutorService executor) { - this.path = path; - this.watchService = watchService; - this.executor = executor; + private BuildRegistry(WatchContext ctx) { + this.ctx = ctx; } @Override @@ -357,12 +351,12 @@ public class LocalRunnerClient extends RunnerClient { private Runnable buildWatchTask() { return () -> { - String pathStr = path.toString(); + String pathStr = ctx.path.toString(); log.info("行动程序目录监听器已启动,监听目录: {}", pathStr); while (true) { WatchKey key; try { - key = watchService.take(); + key = ctx.watchService.take(); List> events = key.pollEvents(); for (WatchEvent e : events) { WatchEvent.Kind kind = e.kind(); @@ -394,12 +388,20 @@ public class LocalRunnerClient extends RunnerClient { } } - private sealed static abstract class LocalWatchServiceHelper permits LocalWatchServiceHelper.Dynamic, LocalWatchServiceHelper.Desc, LocalWatchServiceHelper.Common { + private record WatchContext(Path path, WatchService watchService, Map watchKeys) { + private WatchContext(Path path, WatchService watchService) { + this(path, watchService, new HashMap<>()); + } + } + + private sealed static abstract class LocalWatchEventProcessor permits LocalWatchEventProcessor.Dynamic, LocalWatchEventProcessor.Desc, LocalWatchEventProcessor.Common { protected final ConcurrentHashMap existedMetaActions; + protected final WatchContext ctx; - private LocalWatchServiceHelper(ConcurrentHashMap existedMetaActions) { + private LocalWatchEventProcessor(ConcurrentHashMap existedMetaActions, WatchContext ctx) { this.existedMetaActions = existedMetaActions; + this.ctx = ctx; } protected abstract @NotNull LocalWatchServiceBuild.InitLoader buildLoad(); @@ -412,12 +414,12 @@ public class LocalRunnerClient extends RunnerClient { protected abstract @NotNull LocalWatchServiceBuild.EventHandler buildOverflow(); - private static final class Dynamic extends LocalWatchServiceHelper { + private static final class Dynamic extends LocalWatchEventProcessor { private final McpStatelessAsyncServer dynamicActionMcpServer; - private Dynamic(ConcurrentHashMap existedMetaActions, McpStatelessAsyncServer dynamicActionMcpServer) { - super(existedMetaActions); + private Dynamic(ConcurrentHashMap existedMetaActions, McpStatelessAsyncServer dynamicActionMcpServer, WatchContext ctx) { + super(existedMetaActions, ctx); this.dynamicActionMcpServer = dynamicActionMcpServer; } @@ -494,33 +496,36 @@ public class LocalRunnerClient extends RunnerClient { @Override @NotNull protected LocalWatchServiceBuild.InitLoader buildLoad() { - return path -> { - // 从该路径列出已存在的目录,每个目录对应不同的行动程序及描述文件,从描述文件加载程序信息 - File file = path.toFile(); - if (file.isFile()) { - throw new ActionInitFailedException("未找到目录: " + path); - } - File[] files = file.listFiles(); - if (files == null) { - throw new ActionInitFailedException("未正常读取目录: " + path); - } - for (File dir : files) { - if (!normalPath(dir.toPath())) { - continue; - } - File meta = new File(dir, "desc.json"); - File program = null; - //noinspection DataFlowIssue - for (File f : dir.listFiles()) { - if (f.getName().startsWith("run.")) { - program = f; - } - } + // 从该路径列出已存在的目录,每个目录对应不同的行动程序及描述文件,从描述文件加载程序信息 + return this::load; + } - MetaActionInfo info = JSONUtil.readJSONObject(meta, StandardCharsets.UTF_8).toBean(MetaActionInfo.class); - addAction(dir.getName(), info, program); + private void load() { + Path path = ctx.path; + File file = path.toFile(); + if (file.isFile()) { + throw new ActionInitFailedException("未找到目录: " + path); + } + File[] files = file.listFiles(); + if (files == null) { + throw new ActionInitFailedException("未正常读取目录: " + path); + } + for (File dir : files) { + if (!normalPath(dir.toPath())) { + continue; } - }; + File meta = new File(dir, "desc.json"); + File program = null; + //noinspection DataFlowIssue + for (File f : dir.listFiles()) { + if (f.getName().startsWith("run.")) { + program = f; + } + } + + MetaActionInfo info = JSONUtil.readJSONObject(meta, StandardCharsets.UTF_8).toBean(MetaActionInfo.class); + addAction(dir.getName(), info, program); + } } private McpStatelessServerFeatures.AsyncToolSpecification buildAsyncToolSpecification(MetaActionInfo info, File program, String actionKey, String name) { @@ -626,12 +631,12 @@ public class LocalRunnerClient extends RunnerClient { } } - private static final class Desc extends LocalWatchServiceHelper { + private static final class Desc extends LocalWatchEventProcessor { private final McpStatelessAsyncServer mcpDescServer; - private Desc(ConcurrentHashMap existedMetaActions, McpStatelessAsyncServer mcpDescServer) { - super(existedMetaActions); + private Desc(ConcurrentHashMap existedMetaActions, McpStatelessAsyncServer mcpDescServer, WatchContext ctx) { + super(existedMetaActions, ctx); this.mcpDescServer = mcpDescServer; } @@ -666,12 +671,12 @@ public class LocalRunnerClient extends RunnerClient { } } - private static final class Common extends LocalWatchServiceHelper { + private static final class Common extends LocalWatchEventProcessor { private final Map mcpClients; - private Common(ConcurrentHashMap existedMetaActions, Map mcpClients) { - super(existedMetaActions); + private Common(ConcurrentHashMap existedMetaActions, Map mcpClients, WatchContext ctx) { + super(existedMetaActions, ctx); this.mcpClients = mcpClients; }