diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index 00343e35..5929bf25 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -25,7 +25,6 @@ import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.MetaActionType; import work.slhaf.partner.core.action.exception.ActionInitFailedException; -import work.slhaf.partner.core.action.exception.ActionLoadFailedException; import java.io.BufferedReader; import java.io.File; @@ -41,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM; import static work.slhaf.partner.common.Constant.Path.TMP_ACTION_DIR_LOCAL; @Slf4j @@ -52,11 +50,15 @@ public class LocalRunnerClient extends RunnerClient { * 动态生成的行动程序都将挂载至该 McpServer */ private McpStatelessAsyncServer dynamicActionMcpServer; + private final WatchService watchService; public LocalRunnerClient(ConcurrentHashMap existedMetaActions, ExecutorService executor, @Nullable String actionWatchPath) { super(existedMetaActions, executor); - ActionWatchService watchService = new ActionWatchService(actionWatchPath); - watchService.launch(); + try { + watchService = FileSystems.getDefault().newWatchService(); + } catch (IOException e) { + throw new ActionInitFailedException("目录监听器启动失败", e); + } registerDynamicActionMcp(); setupShutdownHook(); } @@ -308,11 +310,127 @@ public class LocalRunnerClient extends RunnerClient { } private void setupShutdownHook() { - dynamicActionMcpServer.close(); - this.mcpClients.forEach((id, client) -> { - client.close(); - log.info("[{}] MCP-Client 已关闭", id); - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + dynamicActionMcpServer.close(); + this.mcpClients.forEach((id, client) -> { + client.close(); + log.info("[{}] MCP-Client 已关闭", id); + }); + })); + } + + private WatchServiceBuild registerWatchService(Path path) { + return new LocalWatchServiceHelper(path, watchService, executor); + } + + private interface WatchServiceBuild { + WatchServiceBuild registerCreate(WatchEventHandler handler); + + WatchServiceBuild registerModify(WatchEventHandler handler); + + WatchServiceBuild registerDelete(WatchEventHandler handler); + + WatchServiceBuild registerOverflow(WatchEventHandler handler); + + WatchServiceBuild initialLoad(WatchInitLoader loader); + + void commit(); + } + + private interface WatchEventHandler { + void handle(Path thisDir, Path context); + } + + private interface WatchInitLoader { + void load(Path path); + } + + private static class LocalWatchServiceHelper implements WatchServiceBuild { + + private final Map, WatchEventHandler> handlers = new HashMap<>(); + private final Path path; + private final WatchService watchService; + private final ExecutorService executor; + private WatchInitLoader initLoader; + + private LocalWatchServiceHelper(Path path, WatchService watchService, ExecutorService executor) { + this.path = path; + this.watchService = watchService; + this.executor = executor; + } + + @Override + public WatchServiceBuild registerCreate(WatchEventHandler handler) { + handlers.put(StandardWatchEventKinds.ENTRY_CREATE, handler); + return this; + } + + @Override + public WatchServiceBuild registerModify(WatchEventHandler handler) { + handlers.put(StandardWatchEventKinds.ENTRY_MODIFY, handler); + return this; + } + + @Override + public WatchServiceBuild registerDelete(WatchEventHandler handler) { + handlers.put(StandardWatchEventKinds.ENTRY_DELETE, handler); + return this; + } + + @Override + public WatchServiceBuild registerOverflow(WatchEventHandler handler) { + handlers.put(StandardWatchEventKinds.OVERFLOW, handler); + return this; + } + + @Override + public WatchServiceBuild initialLoad(WatchInitLoader loader) { + initLoader = loader; + return this; + } + + @Override + public void commit() { + if (initLoader != null) initLoader.load(path); + executor.execute(buildWatchTask()); + } + + private Runnable buildWatchTask() { + return () -> { + String pathStr = path.toString(); + log.info("行动程序目录监听器已启动,监听目录: {}", pathStr); + while (true) { + WatchKey key; + try { + key = watchService.take(); + List> events = key.pollEvents(); + for (WatchEvent e : events) { + WatchEvent.Kind kind = e.kind(); + Object context = e.context(); + log.info("行动程序目录变更事件: {} - {} - {}", pathStr, kind.name(), context); + Path thisDir = (Path) key.watchable(); + if (!thisDir.equals(path)) { + // 若事件所在目录不为为 path,忽略并步入下一轮循环 + continue; + } + WatchEventHandler handler = handlers.get(kind); + if (handler == null) { + continue; + } + handler.handle(thisDir, context instanceof Path ? (Path) context : null); + } + } catch (InterruptedException e) { + log.info("监听线程被中断,准备退出..."); + Thread.currentThread().interrupt(); // 恢复中断标志 + break; + } catch (ClosedWatchServiceException e) { + log.info("WatchService 已关闭,监听线程退出。"); + break; + } + } + }; + } + } private sealed abstract static class McpClientTransportParams permits HttpMcpClientTransportParams, StdioMcpClientTransportParams { @@ -355,182 +473,4 @@ public class LocalRunnerClient extends RunnerClient { private String total; private List resultList; } - - //TODO 逻辑待更新,用以适配 MCP 服务的及时发现与注册 - private class ActionWatchService { - - private final HashMap registeredPaths = new HashMap<>(); - private final String actionWatchPath; - - private ActionWatchService(String actionWatchPath) { - this.actionWatchPath = actionWatchPath; - } - - private void launch() { - Path path = Path.of(actionWatchPath != null ? actionWatchPath : ACTION_PROGRAM); - scanActions(path.toFile()); - launchActionDirectoryWatcher(path); - } - - private void launchActionDirectoryWatcher(Path path) { - WatchService watchService; - try { - watchService = FileSystems.getDefault().newWatchService(); - setupShutdownHook(watchService); - registerParentToWatch(path, watchService); - registerSubToWatch(path, watchService); - executor.execute(registerWatchTask(path, watchService)); - } catch (IOException e) { - throw new ActionInitFailedException("行动程序目录监听器启动失败", e); - } - } - - private void setupShutdownHook(WatchService watchService) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - watchService.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - })); - } - - private Runnable registerWatchTask(Path path, WatchService watchService) { - return () -> { - log.info("行动程序目录监听器已启动"); - while (true) { - WatchKey key; - try { - key = watchService.take(); - List> events = key.pollEvents(); - for (WatchEvent e : events) { - @SuppressWarnings("unchecked") - WatchEvent event = (WatchEvent) e; - WatchEvent.Kind kind = event.kind(); - Path context = event.context(); - log.info("行动程序目录变更事件: {} - {}", kind.name(), context.toString()); - Path thisDir = (Path) key.watchable(); - // 根据事件发生的目录进行分流,分为父目录事件和子程序事件 - if (thisDir.equals(path)) { - handleParentDirEvent(kind, thisDir, context, watchService); - } else { - handleSubDirEvent(kind, thisDir); - } - } - } catch (InterruptedException e) { - log.info("监听线程被中断,准备退出..."); - Thread.currentThread().interrupt(); // 恢复中断标志 - break; - } catch (ClosedWatchServiceException e) { - log.info("WatchService 已关闭,监听线程退出。"); - break; - } - } - }; - } - - private void handleSubDirEvent(WatchEvent.Kind kind, Path thisDir) { - // path为触发本次行动的文件的路径(当前位于某个action目录下) - // 先判定发生的目录前缀是否匹配(action、desc),否则忽略 - if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) { - // CREATE、MODIFY 事件将触发一次检测,看当前thisDir中action和desc是否都具备,如果通过则尝试加载(put)。 - boolean complete = checkComplete(thisDir); - if (!complete) - return; - try { - MetaActionInfo newActionInfo = new MetaActionInfo(); - existedMetaActions.put(thisDir.toString(), newActionInfo); - } catch (ActionLoadFailedException e) { - log.warn("行动信息重新加载失败,触发行为: {}", kind.name()); - } - } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { - // DELETE 事件将会把该 MetaActionInfo 从记录中移除 - existedMetaActions.remove(thisDir.toString()); - } - } - - private boolean checkComplete(Path thisDir) { - File[] files = thisDir.toFile().listFiles(); - if (files == null) { - log.error("当前目录无法访问: [{}]", thisDir); - return false; - } - boolean existedAction = false; - boolean existedDesc = false; - for (File file : files) { - String fileName = file.getName(); - String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.')); - if (nameWithoutExt.equals("action")) - existedAction = true; - else if (nameWithoutExt.equals("desc")) - existedDesc = true; - } - return existedAction && existedDesc; - } - - private void handleParentDirEvent(WatchEvent.Kind kind, Path thisDir, Path context, - WatchService watchService) { - Path path = Path.of(thisDir.toString(), context.toString()); - // MODIFY 事件不进行处理 - if (kind == StandardWatchEventKinds.ENTRY_CREATE) { - try { - path.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - } catch (IOException e) { - log.error("新增行动程序目录监听失败: {}", path, e); - } - } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) { - WatchKey remove = registeredPaths.remove(path); - remove.cancel(); - } - } - - private void registerSubToWatch(Path path, WatchService watchService) throws IOException { - Files.walkFileTree(path, new SimpleFileVisitor<>() { - @Override - public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs) - throws IOException { - if (dir.getFileName().startsWith(".")) - return FileVisitResult.CONTINUE; - WatchKey key = dir.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - registeredPaths.put(dir, key); - return FileVisitResult.CONTINUE; - } - }); - } - - private void registerParentToWatch(Path path, WatchService watchService) throws IOException { - WatchKey key = path.register(watchService, - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY); - registeredPaths.put(path, key); - } - - private void scanActions(File file) { - if (!file.exists() || file.isFile()) { - throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath()); - } - File[] files = file.listFiles(); - if (files == null) { - throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath()); - } - for (File f : files) { - try { - MetaActionInfo actionInfo = new MetaActionInfo(); - existedMetaActions.put(f.getName(), actionInfo); -// log.info("行动程序[{}]已加载", actionInfo.getKey()); - } catch (ActionLoadFailedException e) { - log.warn("行动程序加载失败", e); - } - } - - } - } - }