refactor(watch): support configurable directory watch depth

This commit is contained in:
2026-04-01 22:15:00 +08:00
parent 632e47ec13
commit 4ae65b885e
7 changed files with 322 additions and 20 deletions

View File

@@ -11,11 +11,11 @@ import io.modelcontextprotocol.spec.McpSchema;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import work.slhaf.partner.api.common.support.DirectoryWatchSupport;
import work.slhaf.partner.common.mcp.InProcessMcpTransport; import work.slhaf.partner.common.mcp.InProcessMcpTransport;
import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.exception.ActionInitFailedException; import work.slhaf.partner.core.action.exception.ActionInitFailedException;
import work.slhaf.partner.core.action.runner.execution.CommandExecutionService; import work.slhaf.partner.core.action.runner.execution.CommandExecutionService;
import work.slhaf.partner.core.action.runner.support.DirectoryWatchSupport;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@@ -58,7 +58,7 @@ public class DynamicActionMcpManager implements AutoCloseable {
.capabilities(serverCapabilities) .capabilities(serverCapabilities)
.jsonMapper(McpJsonMapper.getDefault()) .jsonMapper(McpJsonMapper.getDefault())
.build(); .build();
this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, true, this::loadExisting) this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, 1, this::loadExisting)
.onCreate(this::handleCreate) .onCreate(this::handleCreate)
.onModify(this::handleModify) .onModify(this::handleModify)
.onDelete(this::handleDelete) .onDelete(this::handleDelete)

View File

@@ -6,11 +6,11 @@ import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.spec.McpSchema; import io.modelcontextprotocol.spec.McpSchema;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import work.slhaf.partner.api.common.support.DirectoryWatchSupport;
import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.runner.LocalRunnerClient; import work.slhaf.partner.core.action.runner.LocalRunnerClient;
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy; import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy;
import work.slhaf.partner.core.action.runner.policy.RunnerExecutionPolicyListener; import work.slhaf.partner.core.action.runner.policy.RunnerExecutionPolicyListener;
import work.slhaf.partner.core.action.runner.support.DirectoryWatchSupport;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@@ -46,7 +46,7 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis
this.mcpClientRegistry = mcpClientRegistry; this.mcpClientRegistry = mcpClientRegistry;
this.mcpTransportFactory = mcpTransportFactory; this.mcpTransportFactory = mcpTransportFactory;
this.mcpMetaRegistry = mcpMetaRegistry; this.mcpMetaRegistry = mcpMetaRegistry;
this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, false, this::loadInitial) this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, 0, this::loadInitial)
.onCreate(this::handleCreate) .onCreate(this::handleCreate)
.onModify((thisDir, context) -> checkAndReload(true)) .onModify((thisDir, context) -> checkAndReload(true))
.onDelete(this::handleDelete) .onDelete(this::handleDelete)

View File

@@ -1,7 +1,7 @@
package work.slhaf.partner.core.action.runner.mcp; package work.slhaf.partner.core.action.runner.mcp;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.core.action.runner.support.DirectoryWatchSupport; import work.slhaf.partner.api.common.support.DirectoryWatchSupport;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@@ -18,7 +18,7 @@ public class McpDescWatcher implements AutoCloseable {
public McpDescWatcher(Path root, McpMetaRegistry mcpMetaRegistry, ExecutorService executor) throws IOException { public McpDescWatcher(Path root, McpMetaRegistry mcpMetaRegistry, ExecutorService executor) throws IOException {
this.root = root; this.root = root;
this.mcpMetaRegistry = mcpMetaRegistry; this.mcpMetaRegistry = mcpMetaRegistry;
this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, true, () -> mcpMetaRegistry.loadDirectory(root)) this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, 0, () -> mcpMetaRegistry.loadDirectory(root))
.onCreate(this::handleUpsert) .onCreate(this::handleUpsert)
.onModify(this::handleUpsert) .onModify(this::handleUpsert)
.onDelete(this::handleDelete) .onDelete(this::handleDelete)

View File

@@ -328,6 +328,7 @@ public class LocalRunnerClientTest {
} }
} }
@Test @Test
void testDynamicWatchDeleteBehavior(@TempDir Path tempDir) throws IOException, InterruptedException { void testDynamicWatchDeleteBehavior(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>(); ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
@@ -521,6 +522,7 @@ public class LocalRunnerClientTest {
} }
} }
@Test @Test
void testDescMcpIgnoreInvalidFileName(@TempDir Path tempDir) throws IOException, InterruptedException { void testDescMcpIgnoreInvalidFileName(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>(); ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
@@ -809,6 +811,7 @@ public class LocalRunnerClientTest {
executor.shutdownNow(); executor.shutdownNow();
} }
} }
} }
@Nested @Nested

View File

@@ -0,0 +1,89 @@
package work.slhaf.partner.core.action.runner;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Await.waitForCondition;
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Common.*;
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Fs.*;
class LocalRunnerClientWatchDepthTest {
@Test
void testDynamicWatchIgnoresGrandchildDirectories(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path nestedActionDir = tempDir.resolve("action").resolve("dynamic").resolve("group").resolve("demo_action_nested");
Files.createDirectories(nestedActionDir);
writeRunFile(nestedActionDir);
writeDescJson(nestedActionDir, "nested action");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_nested"), 1000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_nested"));
} finally {
executor.shutdownNow();
}
}
@Test
void testDescMcpIgnoresNestedDirectories(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
String actionKey = "local::desc_nested";
existedMetaActions.put(actionKey, buildMetaActionInfo("base"));
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path nestedDescDir = tempDir.resolve("action").resolve("mcp").resolve("desc").resolve("nested");
Files.createDirectories(nestedDescDir);
writeDescMcpJson(nestedDescDir, actionKey, "nested override");
waitForCondition(() -> {
MetaActionInfo current = getMetaActionInfo(existedMetaActions, actionKey);
return current != null && "nested override".equals(current.getDescription());
}, 1000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertEquals("base", info.getDescription());
} finally {
executor.shutdownNow();
}
}
@Test
void testCommonMcpIgnoresNestedDirectories(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path nestedDir = tempDir.resolve("action").resolve("mcp").resolve("nested");
Files.createDirectories(nestedDir);
Path configFile = nestedDir.resolve("servers.json");
String config = buildCommonMcpConfig(
buildStdioServerEntry("mcp-deepwiki", "mcp-deepwiki@latest")
);
writeCommonMcpConfig(configFile, config);
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")), 2000);
Assertions.assertFalse(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")));
} finally {
executor.shutdownNow();
}
}
}

View File

@@ -1,4 +1,4 @@
package work.slhaf.partner.core.action.runner.support; package work.slhaf.partner.api.common.support;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -17,13 +17,16 @@ public class DirectoryWatchSupport implements Closeable {
private final Context ctx; private final Context ctx;
private final Map<WatchEvent.Kind<?>, EventHandler> handlers = new HashMap<>(); private final Map<WatchEvent.Kind<?>, EventHandler> handlers = new HashMap<>();
private final ExecutorService executor; private final ExecutorService executor;
private final boolean watchAll; private final int watchDepth;
private final InitLoader initLoader; private final InitLoader initLoader;
public DirectoryWatchSupport(Context ctx, ExecutorService executor, boolean watchAll, InitLoader initLoader) { public DirectoryWatchSupport(Context ctx, ExecutorService executor, int watchDepth, InitLoader initLoader) {
if (watchDepth < -1) {
throw new IllegalArgumentException("watchDepth must be -1 or greater: " + watchDepth);
}
this.ctx = ctx; this.ctx = ctx;
this.executor = executor; this.executor = executor;
this.watchAll = watchAll; this.watchDepth = watchDepth;
this.initLoader = initLoader; this.initLoader = initLoader;
} }
@@ -68,6 +71,10 @@ public class DirectoryWatchSupport implements Closeable {
} }
public void registerDirectory(Path dir) throws IOException { public void registerDirectory(Path dir) throws IOException {
registerDirectoryTree(dir);
}
private void registerDirectoryInternal(Path dir) throws IOException {
if (!java.nio.file.Files.isDirectory(dir) || isWatching(dir)) { if (!java.nio.file.Files.isDirectory(dir) || isWatching(dir)) {
return; return;
} }
@@ -78,20 +85,52 @@ public class DirectoryWatchSupport implements Closeable {
private void registerPath() { private void registerPath() {
try { try {
registerDirectory(ctx.root()); registerDirectoryTree(ctx.root());
if (!watchAll) {
return;
}
try (Stream<Path> walk = Files.list(ctx.root()).filter(Files::isDirectory)) {
for (Path dir : walk.toList()) {
registerDirectory(dir);
}
}
} catch (IOException e) { } catch (IOException e) {
log.error("监听目录注册失败: ", e); log.error("监听目录注册失败: ", e);
} }
} }
private void registerDirectoryTree(Path dir) throws IOException {
if (!Files.isDirectory(dir) || !isWithinDepth(dir)) {
return;
}
registerDirectoryInternal(dir);
if (!shouldTraverseChildren(dir)) {
return;
}
try (Stream<Path> walk = Files.list(dir).filter(Files::isDirectory)) {
for (Path child : walk.toList()) {
registerDirectoryTree(child);
}
}
}
private boolean isWithinDepth(Path dir) {
if (watchDepth == -1) {
return true;
}
return depthOf(dir) <= watchDepth;
}
private boolean shouldTraverseChildren(Path dir) {
return watchDepth == -1 || depthOf(dir) < watchDepth;
}
private int depthOf(Path dir) {
Path normalizedRoot = ctx.root().toAbsolutePath().normalize();
Path normalizedDir = dir.toAbsolutePath().normalize();
if (normalizedDir.equals(normalizedRoot)) {
return 0;
}
if (!normalizedDir.startsWith(normalizedRoot)) {
throw new IllegalArgumentException("Directory is outside watched root: " + dir);
}
return normalizedRoot.relativize(normalizedDir).getNameCount();
}
private Runnable buildWatchTask() { private Runnable buildWatchTask() {
return () -> { return () -> {
String rootStr = ctx.root().toString(); String rootStr = ctx.root().toString();
@@ -106,11 +145,19 @@ public class DirectoryWatchSupport implements Closeable {
Object context = event.context(); Object context = event.context();
log.debug("文件目录监听事件: {} - {} - {}", rootStr, kind.name(), context); log.debug("文件目录监听事件: {} - {} - {}", rootStr, kind.name(), context);
Path thisDir = (Path) key.watchable(); Path thisDir = (Path) key.watchable();
Path resolvedContext = context instanceof Path path ? thisDir.resolve(path) : null;
if (kind == ENTRY_CREATE && resolvedContext != null && Files.isDirectory(resolvedContext)) {
try {
registerDirectoryTree(resolvedContext);
} catch (IOException e) {
log.error("监听目录注册失败: {}", resolvedContext, e);
}
}
EventHandler handler = handlers.get(kind); EventHandler handler = handlers.get(kind);
if (handler == null) { if (handler == null) {
continue; continue;
} }
handler.handle(thisDir, context instanceof Path path ? thisDir.resolve(path) : null); handler.handle(thisDir, resolvedContext);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.info("监听线程被中断,准备退出..."); log.info("监听线程被中断,准备退出...");

View File

@@ -0,0 +1,163 @@
package work.slhaf.partner.api.common.support;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BooleanSupplier;
class DirectoryWatchSupportTest {
@Test
void testWatchDepthRejectsInvalidValue(@TempDir Path tempDir) throws IOException {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
work.slhaf.partner.api.common.support.DirectoryWatchSupport.Context context = new work.slhaf.partner.api.common.support.DirectoryWatchSupport.Context(tempDir);
Assertions.assertThrows(IllegalArgumentException.class,
() -> new work.slhaf.partner.api.common.support.DirectoryWatchSupport(context, executor, -2, null));
}
}
@Test
void testWatchDepthZeroOnlyWatchesRoot(@TempDir Path tempDir) throws Exception {
Path childDir = Files.createDirectories(tempDir.resolve("child"));
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
WatchHarness harness = createWatchSupport(tempDir, executor, 0)) {
harness.watchSupport().start();
Files.writeString(tempDir.resolve("root.txt"), "root");
waitForCondition(() -> harness.events().contains("root.txt"), 2000);
Files.writeString(childDir.resolve("child.txt"), "child");
Thread.sleep(300);
Assertions.assertTrue(harness.events().contains("root.txt"));
Assertions.assertFalse(harness.events().contains("child/child.txt"));
}
}
@Test
void testWatchDepthOneWatchesDirectChildrenOnly(@TempDir Path tempDir) throws Exception {
Path childDir = Files.createDirectories(tempDir.resolve("child"));
Path grandChildDir = Files.createDirectories(childDir.resolve("grandchild"));
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
WatchHarness harness = createWatchSupport(tempDir, executor, 1)) {
harness.watchSupport().start();
Files.writeString(childDir.resolve("child.txt"), "child");
waitForCondition(() -> harness.events().contains("child/child.txt"), 2000);
Files.writeString(grandChildDir.resolve("deep.txt"), "deep");
Thread.sleep(300);
Assertions.assertTrue(harness.events().contains("child/child.txt"));
Assertions.assertFalse(harness.events().contains("child/grandchild/deep.txt"));
}
}
@Test
void testWatchDepthNegativeOneWatchesAllDescendants(@TempDir Path tempDir) throws Exception {
Path grandChildDir = Files.createDirectories(tempDir.resolve("child").resolve("grandchild"));
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
WatchHarness harness = createWatchSupport(tempDir, executor, -1)) {
harness.watchSupport().start();
Files.writeString(grandChildDir.resolve("deep.txt"), "deep");
waitForCondition(() -> harness.events().contains("child/grandchild/deep.txt"), 2000);
Assertions.assertTrue(harness.events().contains("child/grandchild/deep.txt"));
}
}
@Test
void testRegistersNewDirectoriesUpToConfiguredDepth(@TempDir Path tempDir) throws Exception {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
WatchHarness harness = createWatchSupport(tempDir, executor, 1)) {
harness.watchSupport().start();
Path childDir = Files.createDirectories(tempDir.resolve("child"));
waitForCondition(() -> harness.watchSupport().isWatching(childDir), 2000);
Files.writeString(childDir.resolve("child.txt"), "child");
waitForCondition(() -> harness.events().contains("child/child.txt"), 2000);
Assertions.assertTrue(harness.events().contains("child/child.txt"));
}
}
@Test
void testReRegistersRootAfterRecreate(@TempDir Path tempDir) throws Exception {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
WatchHarness harness = createWatchSupport(tempDir, executor, 0)) {
harness.watchSupport().start();
deleteDirectory(tempDir);
waitForCondition(() -> !harness.watchSupport().isWatching(tempDir), 2000);
Files.createDirectories(tempDir);
waitForCondition(() -> harness.watchSupport().isWatching(tempDir), 2000);
Files.writeString(tempDir.resolve("recreated.txt"), "ok");
waitForCondition(() -> harness.events().contains("recreated.txt"), 3000);
Assertions.assertTrue(harness.events().contains("recreated.txt"));
}
}
private WatchHarness createWatchSupport(Path root, ExecutorService executor, int watchDepth) throws IOException {
work.slhaf.partner.api.common.support.DirectoryWatchSupport watchSupport = new work.slhaf.partner.api.common.support.DirectoryWatchSupport(new work.slhaf.partner.api.common.support.DirectoryWatchSupport.Context(root), executor, watchDepth, null);
List<String> events = new CopyOnWriteArrayList<>();
watchSupport.onCreate((thisDir, context) -> record(root, context, events));
watchSupport.onModify((thisDir, context) -> record(root, context, events));
return new WatchHarness(watchSupport, events);
}
private void record(Path root, Path context, List<String> events) {
if (context == null || Files.isDirectory(context)) {
return;
}
events.add(root.relativize(context).toString().replace('\\', '/'));
}
private void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException {
long start = System.currentTimeMillis();
while (!supplier.getAsBoolean()) {
if (System.currentTimeMillis() - start > timeoutMs) {
break;
}
Thread.sleep(50);
}
}
private void deleteDirectory(Path dir) throws IOException {
if (!Files.exists(dir)) {
return;
}
try (var stream = Files.walk(dir)) {
stream.sorted(Comparator.reverseOrder()).forEach(path -> {
try {
Files.deleteIfExists(path);
} catch (IOException ignored) {
}
});
}
}
private record WatchHarness(work.slhaf.partner.api.common.support.DirectoryWatchSupport watchSupport,
List<String> events) implements AutoCloseable {
@Override
public void close() throws IOException {
watchSupport.close();
}
}
}