fix(LocalRunnerClient): prevent WatchService event loss caused by concurrent consumers

Context:
Shared WatchService with multiple watch threads caused WatchKey events to be consumed by mismatched processors, leading to missed file events.
Use isolated WatchService per WatchContext to restore correct semantics.
This commit is contained in:
2026-01-12 19:46:45 +08:00
parent 9694a022c7
commit ddd999d47b
2 changed files with 115 additions and 29 deletions

View File

@@ -95,7 +95,6 @@ public class LocalRunnerClient extends RunnerClient {
* 该 MCP Server-Client 的作用为: 与 CommonMcp Clients 配合,补齐第三方 MCP 服务的描述信息
*/
private McpStatelessAsyncServer mcpDescServer;
private final WatchService watchService;
public LocalRunnerClient(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService executor, @Nullable String baseActionPath) {
super(existedMetaActions, executor, baseActionPath);
@@ -110,18 +109,17 @@ public class LocalRunnerClient extends RunnerClient {
createPath(MCP_DESC_PATH);
try {
watchService = FileSystems.getDefault().newWatchService();
registerDescMcp();
registerDynamicActionMcp();
registerCommonMcp();
} catch (IOException e) {
throw new ActionInitFailedException("目录监听器启动失败", e);
}
registerDescMcp();
registerDynamicActionMcp();
registerCommonMcp();
setupShutdownHook();
}
private void registerCommonMcp() {
val ctx = new WatchContext(Path.of(MCP_SERVER_PATH), watchService);
private void registerCommonMcp() throws IOException {
val ctx = new WatchContext(Path.of(MCP_SERVER_PATH), FileSystems.getDefault().newWatchService());
val common = new LocalWatchEventProcessor.Common(existedMetaActions, mcpClients, ctx);
new LocalWatchServiceBuild.BuildRegistry(ctx)
.initialLoad(common.buildLoad())
@@ -133,7 +131,7 @@ public class LocalRunnerClient extends RunnerClient {
log.info("CommonMcp 文件监听注册完毕");
}
private void registerDescMcp() {
private void registerDescMcp() throws IOException {
InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair();
McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder()
.resources(true, true)
@@ -149,8 +147,8 @@ public class LocalRunnerClient extends RunnerClient {
}
private void registerDescMcpWatch() {
WatchContext ctx = new WatchContext(Path.of(MCP_DESC_PATH), watchService);
private void registerDescMcpWatch() throws IOException {
WatchContext ctx = new WatchContext(Path.of(MCP_DESC_PATH), FileSystems.getDefault().newWatchService());
LocalWatchEventProcessor.Desc desc = new LocalWatchEventProcessor.Desc(existedMetaActions, mcpDescServer, ctx);
new LocalWatchServiceBuild.BuildRegistry(ctx)
.initialLoad(desc.buildLoad())
@@ -158,10 +156,11 @@ public class LocalRunnerClient extends RunnerClient {
.registerDelete(desc.buildDelete())
.registerModify(desc.buildModify())
.registerOverflow(desc.buildOverflow())
.watchAll(true)
.commit(executor);
}
private void registerDynamicActionMcp() {
private void registerDynamicActionMcp() throws IOException {
InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair();
McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder()
.tools(true)
@@ -179,9 +178,9 @@ public class LocalRunnerClient extends RunnerClient {
log.info("DynamicActionMcp 注册完毕");
}
private void registerDynamicActionMcpWatch() {
private void registerDynamicActionMcpWatch() throws IOException {
// MODIFY、CREATE、DELETE、OVERFLOW 都需要不同的处理方式
WatchContext ctx = new WatchContext(Path.of(DYNAMIC_ACTION_PATH), watchService);
WatchContext ctx = new WatchContext(Path.of(DYNAMIC_ACTION_PATH), FileSystems.getDefault().newWatchService());
LocalWatchEventProcessor.Dynamic dynamic = new LocalWatchEventProcessor.Dynamic(existedMetaActions, dynamicActionMcpServer, ctx);
new LocalWatchServiceBuild.BuildRegistry(ctx)
.initialLoad(dynamic.buildLoad())
@@ -491,7 +490,7 @@ public class LocalRunnerClient extends RunnerClient {
String rootStr = ctx.root.toString();
log.info("行动程序目录监听器已启动,监听目录: {}", rootStr);
while (true) {
WatchKey key;
WatchKey key = null;
try {
key = ctx.watchService.take();
List<WatchEvent<?>> events = key.pollEvents();
@@ -513,6 +512,14 @@ public class LocalRunnerClient extends RunnerClient {
} catch (ClosedWatchServiceException e) {
log.info("WatchService 已关闭,监听线程退出。");
break;
} finally {
if (key != null) {
// reset 返回 false 表示该 key 已失效(目录被删、不可访问等)
boolean valid = key.reset();
if (!valid) {
log.info("WatchKey 已失效,停止监听该目录: {}", key.watchable());
}
}
}
}
};

View File

@@ -1,31 +1,32 @@
package work.slhaf.partner.core.action.runner;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.MetaActionType;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BooleanSupplier;
@SuppressWarnings("LoggingSimilarMessage")
@Slf4j
public class LocalRunnerClientTest {
static LocalRunnerClient runnerClient;
@BeforeAll
static void beforeAll() {
runnerClient = new LocalRunnerClient(new ConcurrentHashMap<>(), Executors.newVirtualThreadPerTaskExecutor(), "/home/slhaf/Projects/IdeaProjects/Projects/Partner/Partner-Main/src/test/java/resources/action");
}
@Test
void testRunOrigin() {
MetaAction metaAction = buildTmpMetaAction();
RunnerClient.RunnerResponse runnerResponse = runnerClient.doRun(metaAction);
System.out.println(runnerResponse.getData());
private static void writeRunFile(Path actionDir) throws IOException {
Path runPath = actionDir.resolve("run.py");
log.debug("写入路径: {}", runPath);
Files.writeString(runPath, "print('ok')\n");
}
private static @NotNull MetaAction buildTmpMetaAction() {
@@ -38,10 +39,88 @@ public class LocalRunnerClientTest {
return metaAction;
}
private static void writeDescJson(Path actionDir, String description) throws IOException {
Path descPath = actionDir.resolve("desc.json");
log.debug("写入路径: {}", descPath);
String json = "{\n"
+ " \"io\": false,\n"
+ " \"params\": {},\n"
+ " \"description\": \"" + description + "\",\n"
+ " \"tags\": [],\n"
+ " \"preActions\": [],\n"
+ " \"postActions\": [],\n"
+ " \"strictDependencies\": false,\n"
+ " \"responseSchema\": {}\n"
+ "}\n";
Files.writeString(descPath, json);
}
private static void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException {
long start = System.currentTimeMillis();
while (!supplier.getAsBoolean()) {
if (System.currentTimeMillis() - start > timeoutMs) {
break;
}
Thread.sleep(50);
}
}
@Test
void testWatch() {
void testRunOrigin(@TempDir Path tempDir) {
LocalRunnerClient runnerClient =
new LocalRunnerClient(
new ConcurrentHashMap<>(),
Executors.newVirtualThreadPerTaskExecutor(),
tempDir.toString()
);
MetaAction metaAction = buildTmpMetaAction();
RunnerClient.RunnerResponse runnerResponse = runnerClient.doRun(metaAction);
System.out.println(runnerResponse.getData());
}
@Test
void testWatch(@TempDir Path tempDir) {
LocalRunnerClient runnerClient =
new LocalRunnerClient(
new ConcurrentHashMap<>(),
Executors.newVirtualThreadPerTaskExecutor(),
tempDir.toString()
);
// 直接等待输入然后尝试触发各种文件监听事件即可
System.out.println("Press any key to continue...");
Scanner scanner = new Scanner(System.in);
scanner.next();
}
@Test
void testDynamicWatchCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action");
Files.createDirectories(actionDir);
Thread.sleep(100);
writeRunFile(actionDir);
writeDescJson(actionDir, "demo action");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action"));
// 触发一次 modify确保监听线程能够捕捉到完整的 action 结构
writeDescJson(actionDir, "demo action updated");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action"));
Files.deleteIfExists(actionDir.resolve("run.py"));
waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action"));
} finally {
executor.shutdownNow();
}
}
}