mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 08:43:02 +08:00
fix(McpConfigWatcher): clean stale actions on client removal
This commit is contained in:
@@ -19,6 +19,7 @@ import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@@ -95,14 +96,7 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis
|
||||
return;
|
||||
}
|
||||
for (String clientId : fileRecord.paramsCacheMap().keySet()) {
|
||||
McpSyncClient client = mcpClientRegistry.detach(clientId);
|
||||
if (client == null) {
|
||||
continue;
|
||||
}
|
||||
for (McpSchema.Tool tool : client.listTools().tools()) {
|
||||
existedMetaActions.remove(clientId + "::" + tool.name());
|
||||
}
|
||||
client.close();
|
||||
unregisterClient(clientId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,6 +120,39 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis
|
||||
}
|
||||
}
|
||||
|
||||
private void unregisterClient(String clientId) {
|
||||
McpSyncClient client = mcpClientRegistry.detach(clientId);
|
||||
removeClientActions(clientId, client);
|
||||
if (client != null) {
|
||||
try {
|
||||
client.close();
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] MCP client close failed", clientId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void removeClientActions(String clientId, McpSyncClient client) {
|
||||
boolean removedByListing = false;
|
||||
if (client != null) {
|
||||
try {
|
||||
List<McpSchema.Tool> tools = client.listTools().tools();
|
||||
if (tools != null) {
|
||||
for (McpSchema.Tool tool : tools) {
|
||||
existedMetaActions.remove(clientId + "::" + tool.name());
|
||||
}
|
||||
removedByListing = true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] MCP client listTools failed during unregister, fallback to key scan", clientId, e);
|
||||
}
|
||||
}
|
||||
if (!removedByListing) {
|
||||
String prefix = clientId + "::";
|
||||
existedMetaActions.keySet().removeIf(actionKey -> actionKey.startsWith(prefix));
|
||||
}
|
||||
}
|
||||
|
||||
private cn.hutool.json.JSONObject readJson(File file) {
|
||||
try {
|
||||
return JSONUtil.readJSONObject(file, StandardCharsets.UTF_8);
|
||||
@@ -237,13 +264,16 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis
|
||||
}
|
||||
|
||||
private void updateMcpClients(HashMap<String, McpTransportConfig> changedMap, HashSet<String> existingMcpIdSet) {
|
||||
changedMap.forEach(this::registerMcpClient);
|
||||
changedMap.forEach((clientId, config) -> {
|
||||
unregisterClient(clientId);
|
||||
registerMcpClient(clientId, config);
|
||||
});
|
||||
for (String clientId : mcpClientRegistry.listIds()) {
|
||||
if (clientId.equals(LocalRunnerClient.MCP_NAME_DESC) || clientId.equals(LocalRunnerClient.MCP_NAME_DYNAMIC)) {
|
||||
continue;
|
||||
}
|
||||
if (!existingMcpIdSet.contains(clientId)) {
|
||||
mcpClientRegistry.remove(clientId);
|
||||
unregisterClient(clientId);
|
||||
}
|
||||
}
|
||||
existedMetaActions.keySet().removeIf(actionKey -> {
|
||||
|
||||
@@ -3,16 +3,25 @@ package work.slhaf.partner.core.action.runner;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import work.slhaf.partner.core.action.entity.MetaAction;
|
||||
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||
import work.slhaf.partner.core.action.runner.mcp.McpClientRegistry;
|
||||
import work.slhaf.partner.core.action.runner.mcp.McpConfigWatcher;
|
||||
import work.slhaf.partner.core.action.runner.mcp.McpMetaRegistry;
|
||||
import work.slhaf.partner.core.action.runner.mcp.McpTransportConfig;
|
||||
import work.slhaf.partner.core.action.runner.mcp.McpTransportFactory;
|
||||
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy;
|
||||
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry;
|
||||
import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
|
||||
import work.slhaf.partner.module.action.builtin.BuiltinActionRegistry;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@@ -31,6 +40,22 @@ import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Fs.*;
|
||||
@Slf4j
|
||||
public class LocalRunnerClientTest {
|
||||
|
||||
private static String originalUserHome;
|
||||
|
||||
@BeforeAll
|
||||
static void prepareTestHome() throws IOException {
|
||||
originalUserHome = System.getProperty("user.home");
|
||||
Path tempHome = Files.createTempDirectory("partner-test-home");
|
||||
System.setProperty("user.home", tempHome.toString());
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void restoreUserHome() {
|
||||
if (originalUserHome != null) {
|
||||
System.setProperty("user.home", originalUserHome);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("LoggingSimilarMessage")
|
||||
static class Fs {
|
||||
static void writeRunFile(Path actionDir) throws IOException {
|
||||
@@ -839,6 +864,60 @@ public class LocalRunnerClientTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMcpConfigWatcherDeleteFallsBackWhenClientListFails(@TempDir Path tempDir) throws Exception {
|
||||
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
|
||||
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
McpClientRegistry clientRegistry = new McpClientRegistry();
|
||||
McpMetaRegistry metaRegistry = new McpMetaRegistry(existedMetaActions);
|
||||
McpConfigWatcher watcher = new McpConfigWatcher(
|
||||
tempDir,
|
||||
existedMetaActions,
|
||||
clientRegistry,
|
||||
new McpTransportFactory(),
|
||||
metaRegistry,
|
||||
executor
|
||||
);
|
||||
Path configFile = tempDir.resolve("servers.json");
|
||||
Files.writeString(configFile, "{\n}\n");
|
||||
existedMetaActions.put("demo::stale_tool", buildMetaActionInfo("stale"));
|
||||
clientRegistry.register("demo", buildThrowingMcpClient());
|
||||
|
||||
try {
|
||||
Field cacheField = McpConfigWatcher.class.getDeclaredField("mcpConfigFileCache");
|
||||
cacheField.setAccessible(true);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<java.io.File, Object> cache = (Map<java.io.File, Object>) cacheField.get(watcher);
|
||||
|
||||
Class<?> recordClass = Arrays.stream(McpConfigWatcher.class.getDeclaredClasses())
|
||||
.filter(Class::isRecord)
|
||||
.findFirst()
|
||||
.orElseThrow();
|
||||
var constructor = recordClass.getDeclaredConstructor(long.class, long.class, Map.class);
|
||||
constructor.setAccessible(true);
|
||||
Object fileRecord = constructor.newInstance(
|
||||
Files.getLastModifiedTime(configFile).toMillis(),
|
||||
Files.size(configFile),
|
||||
new HashMap<>(Map.of(
|
||||
"demo",
|
||||
new McpTransportConfig.Http(30, "http://127.0.0.1:9", "", Map.of())
|
||||
))
|
||||
);
|
||||
cache.put(configFile.toFile(), fileRecord);
|
||||
|
||||
Method handleDelete = McpConfigWatcher.class.getDeclaredMethod("handleDelete", Path.class, Path.class);
|
||||
handleDelete.setAccessible(true);
|
||||
handleDelete.invoke(watcher, tempDir, configFile);
|
||||
|
||||
Assertions.assertFalse(existedMetaActions.containsKey("demo::stale_tool"));
|
||||
Assertions.assertFalse(clientRegistry.contains("demo"));
|
||||
} finally {
|
||||
watcher.close();
|
||||
metaRegistry.close();
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Nested
|
||||
@@ -979,4 +1058,15 @@ public class LocalRunnerClientTest {
|
||||
}
|
||||
}
|
||||
|
||||
private static io.modelcontextprotocol.client.McpSyncClient buildThrowingMcpClient() {
|
||||
try {
|
||||
Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
|
||||
unsafeField.setAccessible(true);
|
||||
sun.misc.Unsafe unsafe = (sun.misc.Unsafe) unsafeField.get(null);
|
||||
return (io.modelcontextprotocol.client.McpSyncClient) unsafe.allocateInstance(io.modelcontextprotocol.client.McpSyncClient.class);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("failed to build throwing mcp client", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user