From 4ae65b885e1ab716f5038814c8fe24621d3c8003 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Wed, 1 Apr 2026 22:15:00 +0800 Subject: [PATCH] refactor(watch): support configurable directory watch depth --- .../runner/mcp/DynamicActionMcpManager.java | 4 +- .../action/runner/mcp/McpConfigWatcher.java | 4 +- .../action/runner/mcp/McpDescWatcher.java | 4 +- .../action/runner/LocalRunnerClientTest.java | 3 + .../LocalRunnerClientWatchDepthTest.java | 89 ++++++++++ .../support/DirectoryWatchSupport.java | 75 ++++++-- .../support/DirectoryWatchSupportTest.java | 163 ++++++++++++++++++ 7 files changed, 322 insertions(+), 20 deletions(-) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientWatchDepthTest.java rename {Partner-Core/src/main/java/work/slhaf/partner/core/action/runner => Partner-Framework/src/main/java/work/slhaf/partner/api/common}/support/DirectoryWatchSupport.java (69%) create mode 100644 Partner-Framework/src/test/java/work/slhaf/partner/api/common/support/DirectoryWatchSupportTest.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManager.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManager.java index a0fc92ea..a5658d8f 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManager.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManager.java @@ -11,11 +11,11 @@ import io.modelcontextprotocol.spec.McpSchema; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import work.slhaf.partner.api.common.support.DirectoryWatchSupport; import work.slhaf.partner.common.mcp.InProcessMcpTransport; import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.exception.ActionInitFailedException; import work.slhaf.partner.core.action.runner.execution.CommandExecutionService; -import work.slhaf.partner.core.action.runner.support.DirectoryWatchSupport; import java.io.File; import java.io.IOException; @@ -58,7 +58,7 @@ public class DynamicActionMcpManager implements AutoCloseable { .capabilities(serverCapabilities) .jsonMapper(McpJsonMapper.getDefault()) .build(); - this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, true, this::loadExisting) + this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, 1, this::loadExisting) .onCreate(this::handleCreate) .onModify(this::handleModify) .onDelete(this::handleDelete) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java index 983f90b9..31ffc557 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java @@ -6,11 +6,11 @@ import io.modelcontextprotocol.client.McpSyncClient; import io.modelcontextprotocol.spec.McpSchema; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; +import work.slhaf.partner.api.common.support.DirectoryWatchSupport; import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.runner.LocalRunnerClient; import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy; import work.slhaf.partner.core.action.runner.policy.RunnerExecutionPolicyListener; -import work.slhaf.partner.core.action.runner.support.DirectoryWatchSupport; import java.io.File; import java.io.IOException; @@ -46,7 +46,7 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis this.mcpClientRegistry = mcpClientRegistry; this.mcpTransportFactory = mcpTransportFactory; this.mcpMetaRegistry = mcpMetaRegistry; - this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, false, this::loadInitial) + this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, 0, this::loadInitial) .onCreate(this::handleCreate) .onModify((thisDir, context) -> checkAndReload(true)) .onDelete(this::handleDelete) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpDescWatcher.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpDescWatcher.java index 5e533025..96303065 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpDescWatcher.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpDescWatcher.java @@ -1,7 +1,7 @@ package work.slhaf.partner.core.action.runner.mcp; import lombok.extern.slf4j.Slf4j; -import work.slhaf.partner.core.action.runner.support.DirectoryWatchSupport; +import work.slhaf.partner.api.common.support.DirectoryWatchSupport; import java.io.IOException; import java.nio.file.Files; @@ -18,7 +18,7 @@ public class McpDescWatcher implements AutoCloseable { public McpDescWatcher(Path root, McpMetaRegistry mcpMetaRegistry, ExecutorService executor) throws IOException { this.root = root; this.mcpMetaRegistry = mcpMetaRegistry; - this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, true, () -> mcpMetaRegistry.loadDirectory(root)) + this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, 0, () -> mcpMetaRegistry.loadDirectory(root)) .onCreate(this::handleUpsert) .onModify(this::handleUpsert) .onDelete(this::handleDelete) diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java index 6ccbb6ee..9850df46 100644 --- a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java @@ -328,6 +328,7 @@ public class LocalRunnerClientTest { } } + @Test void testDynamicWatchDeleteBehavior(@TempDir Path tempDir) throws IOException, InterruptedException { ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); @@ -521,6 +522,7 @@ public class LocalRunnerClientTest { } } + @Test void testDescMcpIgnoreInvalidFileName(@TempDir Path tempDir) throws IOException, InterruptedException { ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); @@ -809,6 +811,7 @@ public class LocalRunnerClientTest { executor.shutdownNow(); } } + } @Nested diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientWatchDepthTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientWatchDepthTest.java new file mode 100644 index 00000000..e7d93a14 --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientWatchDepthTest.java @@ -0,0 +1,89 @@ +package work.slhaf.partner.core.action.runner; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import work.slhaf.partner.core.action.entity.MetaActionInfo; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Await.waitForCondition; +import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Common.*; +import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Fs.*; + +class LocalRunnerClientWatchDepthTest { + + @Test + void testDynamicWatchIgnoresGrandchildDirectories(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + + try { + Path nestedActionDir = tempDir.resolve("action").resolve("dynamic").resolve("group").resolve("demo_action_nested"); + Files.createDirectories(nestedActionDir); + + writeRunFile(nestedActionDir); + writeDescJson(nestedActionDir, "nested action"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_nested"), 1000); + + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_nested")); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testDescMcpIgnoresNestedDirectories(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + String actionKey = "local::desc_nested"; + existedMetaActions.put(actionKey, buildMetaActionInfo("base")); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + + try { + Path nestedDescDir = tempDir.resolve("action").resolve("mcp").resolve("desc").resolve("nested"); + Files.createDirectories(nestedDescDir); + + writeDescMcpJson(nestedDescDir, actionKey, "nested override"); + waitForCondition(() -> { + MetaActionInfo current = getMetaActionInfo(existedMetaActions, actionKey); + return current != null && "nested override".equals(current.getDescription()); + }, 1000); + + MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey); + Assertions.assertNotNull(info); + Assertions.assertEquals("base", info.getDescription()); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testCommonMcpIgnoresNestedDirectories(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + + try { + Path nestedDir = tempDir.resolve("action").resolve("mcp").resolve("nested"); + Files.createDirectories(nestedDir); + Path configFile = nestedDir.resolve("servers.json"); + + String config = buildCommonMcpConfig( + buildStdioServerEntry("mcp-deepwiki", "mcp-deepwiki@latest") + ); + writeCommonMcpConfig(configFile, config); + waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")), 2000); + + Assertions.assertFalse(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::"))); + } finally { + executor.shutdownNow(); + } + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/support/DirectoryWatchSupport.java b/Partner-Framework/src/main/java/work/slhaf/partner/api/common/support/DirectoryWatchSupport.java similarity index 69% rename from Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/support/DirectoryWatchSupport.java rename to Partner-Framework/src/main/java/work/slhaf/partner/api/common/support/DirectoryWatchSupport.java index e6b3c87b..e02108b4 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/support/DirectoryWatchSupport.java +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/common/support/DirectoryWatchSupport.java @@ -1,4 +1,4 @@ -package work.slhaf.partner.core.action.runner.support; +package work.slhaf.partner.api.common.support; import lombok.extern.slf4j.Slf4j; @@ -17,13 +17,16 @@ public class DirectoryWatchSupport implements Closeable { private final Context ctx; private final Map, EventHandler> handlers = new HashMap<>(); private final ExecutorService executor; - private final boolean watchAll; + private final int watchDepth; private final InitLoader initLoader; - public DirectoryWatchSupport(Context ctx, ExecutorService executor, boolean watchAll, InitLoader initLoader) { + public DirectoryWatchSupport(Context ctx, ExecutorService executor, int watchDepth, InitLoader initLoader) { + if (watchDepth < -1) { + throw new IllegalArgumentException("watchDepth must be -1 or greater: " + watchDepth); + } this.ctx = ctx; this.executor = executor; - this.watchAll = watchAll; + this.watchDepth = watchDepth; this.initLoader = initLoader; } @@ -68,6 +71,10 @@ public class DirectoryWatchSupport implements Closeable { } public void registerDirectory(Path dir) throws IOException { + registerDirectoryTree(dir); + } + + private void registerDirectoryInternal(Path dir) throws IOException { if (!java.nio.file.Files.isDirectory(dir) || isWatching(dir)) { return; } @@ -78,20 +85,52 @@ public class DirectoryWatchSupport implements Closeable { private void registerPath() { try { - registerDirectory(ctx.root()); - if (!watchAll) { - return; - } - try (Stream walk = Files.list(ctx.root()).filter(Files::isDirectory)) { - for (Path dir : walk.toList()) { - registerDirectory(dir); - } - } + registerDirectoryTree(ctx.root()); } catch (IOException e) { log.error("监听目录注册失败: ", e); } } + private void registerDirectoryTree(Path dir) throws IOException { + if (!Files.isDirectory(dir) || !isWithinDepth(dir)) { + return; + } + + registerDirectoryInternal(dir); + if (!shouldTraverseChildren(dir)) { + return; + } + + try (Stream walk = Files.list(dir).filter(Files::isDirectory)) { + for (Path child : walk.toList()) { + registerDirectoryTree(child); + } + } + } + + private boolean isWithinDepth(Path dir) { + if (watchDepth == -1) { + return true; + } + return depthOf(dir) <= watchDepth; + } + + private boolean shouldTraverseChildren(Path dir) { + return watchDepth == -1 || depthOf(dir) < watchDepth; + } + + private int depthOf(Path dir) { + Path normalizedRoot = ctx.root().toAbsolutePath().normalize(); + Path normalizedDir = dir.toAbsolutePath().normalize(); + if (normalizedDir.equals(normalizedRoot)) { + return 0; + } + if (!normalizedDir.startsWith(normalizedRoot)) { + throw new IllegalArgumentException("Directory is outside watched root: " + dir); + } + return normalizedRoot.relativize(normalizedDir).getNameCount(); + } + private Runnable buildWatchTask() { return () -> { String rootStr = ctx.root().toString(); @@ -106,11 +145,19 @@ public class DirectoryWatchSupport implements Closeable { Object context = event.context(); log.debug("文件目录监听事件: {} - {} - {}", rootStr, kind.name(), context); Path thisDir = (Path) key.watchable(); + Path resolvedContext = context instanceof Path path ? thisDir.resolve(path) : null; + if (kind == ENTRY_CREATE && resolvedContext != null && Files.isDirectory(resolvedContext)) { + try { + registerDirectoryTree(resolvedContext); + } catch (IOException e) { + log.error("监听目录注册失败: {}", resolvedContext, e); + } + } EventHandler handler = handlers.get(kind); if (handler == null) { continue; } - handler.handle(thisDir, context instanceof Path path ? thisDir.resolve(path) : null); + handler.handle(thisDir, resolvedContext); } } catch (InterruptedException e) { log.info("监听线程被中断,准备退出..."); diff --git a/Partner-Framework/src/test/java/work/slhaf/partner/api/common/support/DirectoryWatchSupportTest.java b/Partner-Framework/src/test/java/work/slhaf/partner/api/common/support/DirectoryWatchSupportTest.java new file mode 100644 index 00000000..76f6b35a --- /dev/null +++ b/Partner-Framework/src/test/java/work/slhaf/partner/api/common/support/DirectoryWatchSupportTest.java @@ -0,0 +1,163 @@ +package work.slhaf.partner.api.common.support; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.BooleanSupplier; + +class DirectoryWatchSupportTest { + + @Test + void testWatchDepthRejectsInvalidValue(@TempDir Path tempDir) throws IOException { + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + work.slhaf.partner.api.common.support.DirectoryWatchSupport.Context context = new work.slhaf.partner.api.common.support.DirectoryWatchSupport.Context(tempDir); + Assertions.assertThrows(IllegalArgumentException.class, + () -> new work.slhaf.partner.api.common.support.DirectoryWatchSupport(context, executor, -2, null)); + } + } + + @Test + void testWatchDepthZeroOnlyWatchesRoot(@TempDir Path tempDir) throws Exception { + Path childDir = Files.createDirectories(tempDir.resolve("child")); + + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + WatchHarness harness = createWatchSupport(tempDir, executor, 0)) { + harness.watchSupport().start(); + + Files.writeString(tempDir.resolve("root.txt"), "root"); + waitForCondition(() -> harness.events().contains("root.txt"), 2000); + + Files.writeString(childDir.resolve("child.txt"), "child"); + Thread.sleep(300); + + Assertions.assertTrue(harness.events().contains("root.txt")); + Assertions.assertFalse(harness.events().contains("child/child.txt")); + } + } + + @Test + void testWatchDepthOneWatchesDirectChildrenOnly(@TempDir Path tempDir) throws Exception { + Path childDir = Files.createDirectories(tempDir.resolve("child")); + Path grandChildDir = Files.createDirectories(childDir.resolve("grandchild")); + + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + WatchHarness harness = createWatchSupport(tempDir, executor, 1)) { + harness.watchSupport().start(); + + Files.writeString(childDir.resolve("child.txt"), "child"); + waitForCondition(() -> harness.events().contains("child/child.txt"), 2000); + + Files.writeString(grandChildDir.resolve("deep.txt"), "deep"); + Thread.sleep(300); + + Assertions.assertTrue(harness.events().contains("child/child.txt")); + Assertions.assertFalse(harness.events().contains("child/grandchild/deep.txt")); + } + } + + @Test + void testWatchDepthNegativeOneWatchesAllDescendants(@TempDir Path tempDir) throws Exception { + Path grandChildDir = Files.createDirectories(tempDir.resolve("child").resolve("grandchild")); + + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + WatchHarness harness = createWatchSupport(tempDir, executor, -1)) { + harness.watchSupport().start(); + + Files.writeString(grandChildDir.resolve("deep.txt"), "deep"); + waitForCondition(() -> harness.events().contains("child/grandchild/deep.txt"), 2000); + + Assertions.assertTrue(harness.events().contains("child/grandchild/deep.txt")); + } + } + + @Test + void testRegistersNewDirectoriesUpToConfiguredDepth(@TempDir Path tempDir) throws Exception { + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + WatchHarness harness = createWatchSupport(tempDir, executor, 1)) { + harness.watchSupport().start(); + + Path childDir = Files.createDirectories(tempDir.resolve("child")); + waitForCondition(() -> harness.watchSupport().isWatching(childDir), 2000); + + Files.writeString(childDir.resolve("child.txt"), "child"); + waitForCondition(() -> harness.events().contains("child/child.txt"), 2000); + + Assertions.assertTrue(harness.events().contains("child/child.txt")); + } + } + + @Test + void testReRegistersRootAfterRecreate(@TempDir Path tempDir) throws Exception { + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + WatchHarness harness = createWatchSupport(tempDir, executor, 0)) { + harness.watchSupport().start(); + + deleteDirectory(tempDir); + waitForCondition(() -> !harness.watchSupport().isWatching(tempDir), 2000); + Files.createDirectories(tempDir); + + waitForCondition(() -> harness.watchSupport().isWatching(tempDir), 2000); + + Files.writeString(tempDir.resolve("recreated.txt"), "ok"); + waitForCondition(() -> harness.events().contains("recreated.txt"), 3000); + + Assertions.assertTrue(harness.events().contains("recreated.txt")); + } + } + + private WatchHarness createWatchSupport(Path root, ExecutorService executor, int watchDepth) throws IOException { + work.slhaf.partner.api.common.support.DirectoryWatchSupport watchSupport = new work.slhaf.partner.api.common.support.DirectoryWatchSupport(new work.slhaf.partner.api.common.support.DirectoryWatchSupport.Context(root), executor, watchDepth, null); + List events = new CopyOnWriteArrayList<>(); + watchSupport.onCreate((thisDir, context) -> record(root, context, events)); + watchSupport.onModify((thisDir, context) -> record(root, context, events)); + return new WatchHarness(watchSupport, events); + } + + private void record(Path root, Path context, List events) { + if (context == null || Files.isDirectory(context)) { + return; + } + events.add(root.relativize(context).toString().replace('\\', '/')); + } + + private void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException { + long start = System.currentTimeMillis(); + while (!supplier.getAsBoolean()) { + if (System.currentTimeMillis() - start > timeoutMs) { + break; + } + Thread.sleep(50); + } + } + + private void deleteDirectory(Path dir) throws IOException { + if (!Files.exists(dir)) { + return; + } + try (var stream = Files.walk(dir)) { + stream.sorted(Comparator.reverseOrder()).forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException ignored) { + } + }); + } + } + + private record WatchHarness(work.slhaf.partner.api.common.support.DirectoryWatchSupport watchSupport, + List events) implements AutoCloseable { + @Override + public void close() throws IOException { + watchSupport.close(); + } + } +}