From ddd999d47b4613aab77d56bca41f9e1b7f9bbfa0 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Mon, 12 Jan 2026 19:46:45 +0800 Subject: [PATCH] fix(LocalRunnerClient): prevent WatchService event loss caused by concurrent consumers Context: Shared WatchService with multiple watch threads caused WatchKey events to be consumed by mismatched processors, leading to missed file events. Use isolated WatchService per WatchContext to restore correct semantics. --- .../core/action/runner/LocalRunnerClient.java | 35 +++--- .../action/runner/LocalRunnerClientTest.java | 109 +++++++++++++++--- 2 files changed, 115 insertions(+), 29 deletions(-) diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index cafce89a..c947e2f6 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -95,7 +95,6 @@ public class LocalRunnerClient extends RunnerClient { * 该 MCP Server-Client 的作用为: 与 CommonMcp Clients 配合,补齐第三方 MCP 服务的描述信息 */ private McpStatelessAsyncServer mcpDescServer; - private final WatchService watchService; public LocalRunnerClient(ConcurrentHashMap existedMetaActions, ExecutorService executor, @Nullable String baseActionPath) { super(existedMetaActions, executor, baseActionPath); @@ -110,18 +109,17 @@ public class LocalRunnerClient extends RunnerClient { createPath(MCP_DESC_PATH); try { - watchService = FileSystems.getDefault().newWatchService(); + registerDescMcp(); + registerDynamicActionMcp(); + registerCommonMcp(); } catch (IOException e) { throw new ActionInitFailedException("目录监听器启动失败", e); } - registerDescMcp(); - registerDynamicActionMcp(); - registerCommonMcp(); setupShutdownHook(); } - private void registerCommonMcp() { - val ctx = new WatchContext(Path.of(MCP_SERVER_PATH), watchService); + private void registerCommonMcp() throws IOException { + val ctx = new WatchContext(Path.of(MCP_SERVER_PATH), FileSystems.getDefault().newWatchService()); val common = new LocalWatchEventProcessor.Common(existedMetaActions, mcpClients, ctx); new LocalWatchServiceBuild.BuildRegistry(ctx) .initialLoad(common.buildLoad()) @@ -133,7 +131,7 @@ public class LocalRunnerClient extends RunnerClient { log.info("CommonMcp 文件监听注册完毕"); } - private void registerDescMcp() { + private void registerDescMcp() throws IOException { InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() .resources(true, true) @@ -149,8 +147,8 @@ public class LocalRunnerClient extends RunnerClient { } - private void registerDescMcpWatch() { - WatchContext ctx = new WatchContext(Path.of(MCP_DESC_PATH), watchService); + private void registerDescMcpWatch() throws IOException { + WatchContext ctx = new WatchContext(Path.of(MCP_DESC_PATH), FileSystems.getDefault().newWatchService()); LocalWatchEventProcessor.Desc desc = new LocalWatchEventProcessor.Desc(existedMetaActions, mcpDescServer, ctx); new LocalWatchServiceBuild.BuildRegistry(ctx) .initialLoad(desc.buildLoad()) @@ -158,10 +156,11 @@ public class LocalRunnerClient extends RunnerClient { .registerDelete(desc.buildDelete()) .registerModify(desc.buildModify()) .registerOverflow(desc.buildOverflow()) + .watchAll(true) .commit(executor); } - private void registerDynamicActionMcp() { + private void registerDynamicActionMcp() throws IOException { InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() .tools(true) @@ -179,9 +178,9 @@ public class LocalRunnerClient extends RunnerClient { log.info("DynamicActionMcp 注册完毕"); } - private void registerDynamicActionMcpWatch() { + private void registerDynamicActionMcpWatch() throws IOException { // MODIFY、CREATE、DELETE、OVERFLOW 都需要不同的处理方式 - WatchContext ctx = new WatchContext(Path.of(DYNAMIC_ACTION_PATH), watchService); + WatchContext ctx = new WatchContext(Path.of(DYNAMIC_ACTION_PATH), FileSystems.getDefault().newWatchService()); LocalWatchEventProcessor.Dynamic dynamic = new LocalWatchEventProcessor.Dynamic(existedMetaActions, dynamicActionMcpServer, ctx); new LocalWatchServiceBuild.BuildRegistry(ctx) .initialLoad(dynamic.buildLoad()) @@ -491,7 +490,7 @@ public class LocalRunnerClient extends RunnerClient { String rootStr = ctx.root.toString(); log.info("行动程序目录监听器已启动,监听目录: {}", rootStr); while (true) { - WatchKey key; + WatchKey key = null; try { key = ctx.watchService.take(); List> events = key.pollEvents(); @@ -513,6 +512,14 @@ public class LocalRunnerClient extends RunnerClient { } catch (ClosedWatchServiceException e) { log.info("WatchService 已关闭,监听线程退出。"); break; + } finally { + if (key != null) { + // reset 返回 false 表示该 key 已失效(目录被删、不可访问等) + boolean valid = key.reset(); + if (!valid) { + log.info("WatchKey 已失效,停止监听该目录: {}", key.watchable()); + } + } } } }; diff --git a/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java b/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java index f9e3e462..2a0dc383 100644 --- a/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java +++ b/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java @@ -1,31 +1,32 @@ package work.slhaf.partner.core.action.runner; +import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.MetaActionType; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Map; import java.util.Scanner; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.BooleanSupplier; +@SuppressWarnings("LoggingSimilarMessage") +@Slf4j public class LocalRunnerClientTest { - static LocalRunnerClient runnerClient; - - @BeforeAll - static void beforeAll() { - runnerClient = new LocalRunnerClient(new ConcurrentHashMap<>(), Executors.newVirtualThreadPerTaskExecutor(), "/home/slhaf/Projects/IdeaProjects/Projects/Partner/Partner-Main/src/test/java/resources/action"); - } - - @Test - void testRunOrigin() { - MetaAction metaAction = buildTmpMetaAction(); - - RunnerClient.RunnerResponse runnerResponse = runnerClient.doRun(metaAction); - System.out.println(runnerResponse.getData()); + private static void writeRunFile(Path actionDir) throws IOException { + Path runPath = actionDir.resolve("run.py"); + log.debug("写入路径: {}", runPath); + Files.writeString(runPath, "print('ok')\n"); } private static @NotNull MetaAction buildTmpMetaAction() { @@ -38,10 +39,88 @@ public class LocalRunnerClientTest { return metaAction; } + private static void writeDescJson(Path actionDir, String description) throws IOException { + Path descPath = actionDir.resolve("desc.json"); + log.debug("写入路径: {}", descPath); + String json = "{\n" + + " \"io\": false,\n" + + " \"params\": {},\n" + + " \"description\": \"" + description + "\",\n" + + " \"tags\": [],\n" + + " \"preActions\": [],\n" + + " \"postActions\": [],\n" + + " \"strictDependencies\": false,\n" + + " \"responseSchema\": {}\n" + + "}\n"; + Files.writeString(descPath, json); + } + + private static void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException { + long start = System.currentTimeMillis(); + while (!supplier.getAsBoolean()) { + if (System.currentTimeMillis() - start > timeoutMs) { + break; + } + Thread.sleep(50); + } + } + @Test - void testWatch() { + void testRunOrigin(@TempDir Path tempDir) { + LocalRunnerClient runnerClient = + new LocalRunnerClient( + new ConcurrentHashMap<>(), + Executors.newVirtualThreadPerTaskExecutor(), + tempDir.toString() + ); + + MetaAction metaAction = buildTmpMetaAction(); + RunnerClient.RunnerResponse runnerResponse = runnerClient.doRun(metaAction); + System.out.println(runnerResponse.getData()); + } + + @Test + void testWatch(@TempDir Path tempDir) { + LocalRunnerClient runnerClient = + new LocalRunnerClient( + new ConcurrentHashMap<>(), + Executors.newVirtualThreadPerTaskExecutor(), + tempDir.toString() + ); // 直接等待输入然后尝试触发各种文件监听事件即可 + System.out.println("Press any key to continue..."); Scanner scanner = new Scanner(System.in); scanner.next(); } + + @Test + void testDynamicWatchCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + + try { + Path dynamicRoot = tempDir.resolve("action").resolve("dynamic"); + Path actionDir = dynamicRoot.resolve("demo_action"); + Files.createDirectories(actionDir); + Thread.sleep(100); + + writeRunFile(actionDir); + writeDescJson(actionDir, "demo action"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action")); + + // 触发一次 modify,确保监听线程能够捕捉到完整的 action 结构 + writeDescJson(actionDir, "demo action updated"); + + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action")); + + Files.deleteIfExists(actionDir.resolve("run.py")); + waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action"), 2000); + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action")); + } finally { + executor.shutdownNow(); + } + } }