From 14df95fc5985b62205315c726349bd5617597e50 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:13:04 +0800 Subject: [PATCH] fix(McpConfigWatcher): clean stale actions on client removal --- .../action/runner/mcp/McpConfigWatcher.java | 50 ++++++++--- .../action/runner/LocalRunnerClientTest.java | 90 +++++++++++++++++++ 2 files changed, 130 insertions(+), 10 deletions(-) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java index e1bfdaf2..26ef8044 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java @@ -19,6 +19,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -95,14 +96,7 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis return; } for (String clientId : fileRecord.paramsCacheMap().keySet()) { - McpSyncClient client = mcpClientRegistry.detach(clientId); - if (client == null) { - continue; - } - for (McpSchema.Tool tool : client.listTools().tools()) { - existedMetaActions.remove(clientId + "::" + tool.name()); - } - client.close(); + unregisterClient(clientId); } } @@ -126,6 +120,39 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis } } + private void unregisterClient(String clientId) { + McpSyncClient client = mcpClientRegistry.detach(clientId); + removeClientActions(clientId, client); + if (client != null) { + try { + client.close(); + } catch (Exception e) { + log.warn("[{}] MCP client close failed", clientId, e); + } + } + } + + private void removeClientActions(String clientId, McpSyncClient client) { + boolean removedByListing = false; + if (client != null) { + try { + List tools = client.listTools().tools(); + if (tools != null) { + for (McpSchema.Tool tool : tools) { + existedMetaActions.remove(clientId + "::" + tool.name()); + } + removedByListing = true; + } + } catch (Exception e) { + log.warn("[{}] MCP client listTools failed during unregister, fallback to key scan", clientId, e); + } + } + if (!removedByListing) { + String prefix = clientId + "::"; + existedMetaActions.keySet().removeIf(actionKey -> actionKey.startsWith(prefix)); + } + } + private cn.hutool.json.JSONObject readJson(File file) { try { return JSONUtil.readJSONObject(file, StandardCharsets.UTF_8); @@ -237,13 +264,16 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis } private void updateMcpClients(HashMap changedMap, HashSet existingMcpIdSet) { - changedMap.forEach(this::registerMcpClient); + changedMap.forEach((clientId, config) -> { + unregisterClient(clientId); + registerMcpClient(clientId, config); + }); for (String clientId : mcpClientRegistry.listIds()) { if (clientId.equals(LocalRunnerClient.MCP_NAME_DESC) || clientId.equals(LocalRunnerClient.MCP_NAME_DYNAMIC)) { continue; } if (!existingMcpIdSet.contains(clientId)) { - mcpClientRegistry.remove(clientId); + unregisterClient(clientId); } } existedMetaActions.keySet().removeIf(actionKey -> { diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java index 4d3c7da1..0f55eb92 100644 --- a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java @@ -3,16 +3,25 @@ package work.slhaf.partner.core.action.runner; import com.alibaba.fastjson2.JSONObject; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.runner.mcp.McpClientRegistry; +import work.slhaf.partner.core.action.runner.mcp.McpConfigWatcher; +import work.slhaf.partner.core.action.runner.mcp.McpMetaRegistry; +import work.slhaf.partner.core.action.runner.mcp.McpTransportConfig; +import work.slhaf.partner.core.action.runner.mcp.McpTransportFactory; import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy; import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec; import work.slhaf.partner.module.action.builtin.BuiltinActionRegistry; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -31,6 +40,22 @@ import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Fs.*; @Slf4j public class LocalRunnerClientTest { + private static String originalUserHome; + + @BeforeAll + static void prepareTestHome() throws IOException { + originalUserHome = System.getProperty("user.home"); + Path tempHome = Files.createTempDirectory("partner-test-home"); + System.setProperty("user.home", tempHome.toString()); + } + + @AfterAll + static void restoreUserHome() { + if (originalUserHome != null) { + System.setProperty("user.home", originalUserHome); + } + } + @SuppressWarnings("LoggingSimilarMessage") static class Fs { static void writeRunFile(Path actionDir) throws IOException { @@ -839,6 +864,60 @@ public class LocalRunnerClientTest { } } + @Test + void testMcpConfigWatcherDeleteFallsBackWhenClientListFails(@TempDir Path tempDir) throws Exception { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + McpClientRegistry clientRegistry = new McpClientRegistry(); + McpMetaRegistry metaRegistry = new McpMetaRegistry(existedMetaActions); + McpConfigWatcher watcher = new McpConfigWatcher( + tempDir, + existedMetaActions, + clientRegistry, + new McpTransportFactory(), + metaRegistry, + executor + ); + Path configFile = tempDir.resolve("servers.json"); + Files.writeString(configFile, "{\n}\n"); + existedMetaActions.put("demo::stale_tool", buildMetaActionInfo("stale")); + clientRegistry.register("demo", buildThrowingMcpClient()); + + try { + Field cacheField = McpConfigWatcher.class.getDeclaredField("mcpConfigFileCache"); + cacheField.setAccessible(true); + @SuppressWarnings("unchecked") + Map cache = (Map) cacheField.get(watcher); + + Class recordClass = Arrays.stream(McpConfigWatcher.class.getDeclaredClasses()) + .filter(Class::isRecord) + .findFirst() + .orElseThrow(); + var constructor = recordClass.getDeclaredConstructor(long.class, long.class, Map.class); + constructor.setAccessible(true); + Object fileRecord = constructor.newInstance( + Files.getLastModifiedTime(configFile).toMillis(), + Files.size(configFile), + new HashMap<>(Map.of( + "demo", + new McpTransportConfig.Http(30, "http://127.0.0.1:9", "", Map.of()) + )) + ); + cache.put(configFile.toFile(), fileRecord); + + Method handleDelete = McpConfigWatcher.class.getDeclaredMethod("handleDelete", Path.class, Path.class); + handleDelete.setAccessible(true); + handleDelete.invoke(watcher, tempDir, configFile); + + Assertions.assertFalse(existedMetaActions.containsKey("demo::stale_tool")); + Assertions.assertFalse(clientRegistry.contains("demo")); + } finally { + watcher.close(); + metaRegistry.close(); + executor.shutdownNow(); + } + } + } @Nested @@ -979,4 +1058,15 @@ public class LocalRunnerClientTest { } } + private static io.modelcontextprotocol.client.McpSyncClient buildThrowingMcpClient() { + try { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + sun.misc.Unsafe unsafe = (sun.misc.Unsafe) unsafeField.get(null); + return (io.modelcontextprotocol.client.McpSyncClient) unsafe.allocateInstance(io.modelcontextprotocol.client.McpSyncClient.class); + } catch (Exception e) { + throw new IllegalStateException("failed to build throwing mcp client", e); + } + } + }