From 14df95fc5985b62205315c726349bd5617597e50 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:13:04 +0800 Subject: [PATCH 1/9] fix(McpConfigWatcher): clean stale actions on client removal --- .../action/runner/mcp/McpConfigWatcher.java | 50 ++++++++--- .../action/runner/LocalRunnerClientTest.java | 90 +++++++++++++++++++ 2 files changed, 130 insertions(+), 10 deletions(-) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java index e1bfdaf2..26ef8044 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/McpConfigWatcher.java @@ -19,6 +19,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -95,14 +96,7 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis return; } for (String clientId : fileRecord.paramsCacheMap().keySet()) { - McpSyncClient client = mcpClientRegistry.detach(clientId); - if (client == null) { - continue; - } - for (McpSchema.Tool tool : client.listTools().tools()) { - existedMetaActions.remove(clientId + "::" + tool.name()); - } - client.close(); + unregisterClient(clientId); } } @@ -126,6 +120,39 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis } } + private void unregisterClient(String clientId) { + McpSyncClient client = mcpClientRegistry.detach(clientId); + removeClientActions(clientId, client); + if (client != null) { + try { + client.close(); + } catch (Exception e) { + log.warn("[{}] MCP client close failed", clientId, e); + } + } + } + + private void removeClientActions(String clientId, McpSyncClient client) { + boolean removedByListing = false; + if (client != null) { + try { + List tools = client.listTools().tools(); + if (tools != null) { + for (McpSchema.Tool tool : tools) { + existedMetaActions.remove(clientId + "::" + tool.name()); + } + removedByListing = true; + } + } catch (Exception e) { + log.warn("[{}] MCP client listTools failed during unregister, fallback to key scan", clientId, e); + } + } + if (!removedByListing) { + String prefix = clientId + "::"; + existedMetaActions.keySet().removeIf(actionKey -> actionKey.startsWith(prefix)); + } + } + private cn.hutool.json.JSONObject readJson(File file) { try { return JSONUtil.readJSONObject(file, StandardCharsets.UTF_8); @@ -237,13 +264,16 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis } private void updateMcpClients(HashMap changedMap, HashSet existingMcpIdSet) { - changedMap.forEach(this::registerMcpClient); + changedMap.forEach((clientId, config) -> { + unregisterClient(clientId); + registerMcpClient(clientId, config); + }); for (String clientId : mcpClientRegistry.listIds()) { if (clientId.equals(LocalRunnerClient.MCP_NAME_DESC) || clientId.equals(LocalRunnerClient.MCP_NAME_DYNAMIC)) { continue; } if (!existingMcpIdSet.contains(clientId)) { - mcpClientRegistry.remove(clientId); + unregisterClient(clientId); } } existedMetaActions.keySet().removeIf(actionKey -> { diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java index 4d3c7da1..0f55eb92 100644 --- a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java @@ -3,16 +3,25 @@ package work.slhaf.partner.core.action.runner; import com.alibaba.fastjson2.JSONObject; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.runner.mcp.McpClientRegistry; +import work.slhaf.partner.core.action.runner.mcp.McpConfigWatcher; +import work.slhaf.partner.core.action.runner.mcp.McpMetaRegistry; +import work.slhaf.partner.core.action.runner.mcp.McpTransportConfig; +import work.slhaf.partner.core.action.runner.mcp.McpTransportFactory; import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy; import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec; import work.slhaf.partner.module.action.builtin.BuiltinActionRegistry; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -31,6 +40,22 @@ import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Fs.*; @Slf4j public class LocalRunnerClientTest { + private static String originalUserHome; + + @BeforeAll + static void prepareTestHome() throws IOException { + originalUserHome = System.getProperty("user.home"); + Path tempHome = Files.createTempDirectory("partner-test-home"); + System.setProperty("user.home", tempHome.toString()); + } + + @AfterAll + static void restoreUserHome() { + if (originalUserHome != null) { + System.setProperty("user.home", originalUserHome); + } + } + @SuppressWarnings("LoggingSimilarMessage") static class Fs { static void writeRunFile(Path actionDir) throws IOException { @@ -839,6 +864,60 @@ public class LocalRunnerClientTest { } } + @Test + void testMcpConfigWatcherDeleteFallsBackWhenClientListFails(@TempDir Path tempDir) throws Exception { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + McpClientRegistry clientRegistry = new McpClientRegistry(); + McpMetaRegistry metaRegistry = new McpMetaRegistry(existedMetaActions); + McpConfigWatcher watcher = new McpConfigWatcher( + tempDir, + existedMetaActions, + clientRegistry, + new McpTransportFactory(), + metaRegistry, + executor + ); + Path configFile = tempDir.resolve("servers.json"); + Files.writeString(configFile, "{\n}\n"); + existedMetaActions.put("demo::stale_tool", buildMetaActionInfo("stale")); + clientRegistry.register("demo", buildThrowingMcpClient()); + + try { + Field cacheField = McpConfigWatcher.class.getDeclaredField("mcpConfigFileCache"); + cacheField.setAccessible(true); + @SuppressWarnings("unchecked") + Map cache = (Map) cacheField.get(watcher); + + Class recordClass = Arrays.stream(McpConfigWatcher.class.getDeclaredClasses()) + .filter(Class::isRecord) + .findFirst() + .orElseThrow(); + var constructor = recordClass.getDeclaredConstructor(long.class, long.class, Map.class); + constructor.setAccessible(true); + Object fileRecord = constructor.newInstance( + Files.getLastModifiedTime(configFile).toMillis(), + Files.size(configFile), + new HashMap<>(Map.of( + "demo", + new McpTransportConfig.Http(30, "http://127.0.0.1:9", "", Map.of()) + )) + ); + cache.put(configFile.toFile(), fileRecord); + + Method handleDelete = McpConfigWatcher.class.getDeclaredMethod("handleDelete", Path.class, Path.class); + handleDelete.setAccessible(true); + handleDelete.invoke(watcher, tempDir, configFile); + + Assertions.assertFalse(existedMetaActions.containsKey("demo::stale_tool")); + Assertions.assertFalse(clientRegistry.contains("demo")); + } finally { + watcher.close(); + metaRegistry.close(); + executor.shutdownNow(); + } + } + } @Nested @@ -979,4 +1058,15 @@ public class LocalRunnerClientTest { } } + private static io.modelcontextprotocol.client.McpSyncClient buildThrowingMcpClient() { + try { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + sun.misc.Unsafe unsafe = (sun.misc.Unsafe) unsafeField.get(null); + return (io.modelcontextprotocol.client.McpSyncClient) unsafe.allocateInstance(io.modelcontextprotocol.client.McpSyncClient.class); + } catch (Exception e) { + throw new IllegalStateException("failed to build throwing mcp client", e); + } + } + } From 9b97fffc5ce4039f5ad7842511edb5cc3366eea7 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:13:40 +0800 Subject: [PATCH 2/9] fix(LocalRunnerClient): unregister policy listener on close --- .../core/action/runner/LocalRunnerClient.java | 1 + .../action/runner/policy/ExecutionPolicy.kt | 4 ++ .../runner/LocalRunnerClientCloseTest.java | 54 +++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientCloseTest.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index 6c6586fb..9b9fa02f 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -176,6 +176,7 @@ public class LocalRunnerClient extends RunnerClient { if (!closed.compareAndSet(false, true)) { return; } + mcpConfigWatcher.unregisterPolicyListener(); closeQuietly(mcpConfigWatcher); closeQuietly(dynamicActionMcpManager); closeQuietly(mcpDescWatcher); diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/policy/ExecutionPolicy.kt b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/policy/ExecutionPolicy.kt index 90434ed8..7dbf0082 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/policy/ExecutionPolicy.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/policy/ExecutionPolicy.kt @@ -150,4 +150,8 @@ interface RunnerExecutionPolicyListener { fun registerPolicyListener() { ExecutionPolicyRegistry.addListener(this) } + + fun unregisterPolicyListener() { + ExecutionPolicyRegistry.removeListener(this) + } } diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientCloseTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientCloseTest.java new file mode 100644 index 00000000..2c848894 --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientCloseTest.java @@ -0,0 +1,54 @@ +package work.slhaf.partner.core.action.runner; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +class LocalRunnerClientCloseTest { + + private static String originalUserHome; + + @BeforeAll + static void prepareTestHome() throws IOException { + originalUserHome = System.getProperty("user.home"); + Path tempHome = Files.createTempDirectory("partner-test-home"); + System.setProperty("user.home", tempHome.toString()); + } + + @AfterAll + static void restoreUserHome() { + if (originalUserHome != null) { + System.setProperty("user.home", originalUserHome); + } + } + + @Test + void testLocalRunnerClientCloseUnregistersPolicyListener(@TempDir Path tempDir) throws Exception { + Field listenersField = ExecutionPolicyRegistry.class.getDeclaredField("listeners"); + listenersField.setAccessible(true); + @SuppressWarnings("unchecked") + CopyOnWriteArraySet listeners = (CopyOnWriteArraySet) listenersField.get(ExecutionPolicyRegistry.INSTANCE); + int before = listeners.size(); + + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { + LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + Assertions.assertEquals(before + 1, listeners.size()); + client.close(); + Assertions.assertEquals(before, listeners.size()); + } + } +} From 657023694c55120f5a431052acfd7f22692a9fb4 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:19:39 +0800 Subject: [PATCH 3/9] fix(OriginExecution): apply execution policy to process launch --- .../execution/CommandExecutionService.java | 47 +++++++++-- .../execution/OriginExecutionService.java | 7 +- .../CommandExecutionServiceTest.java | 27 +++++++ .../execution/OriginExecutionServiceTest.java | 78 +++++++++++++++++++ 4 files changed, 145 insertions(+), 14 deletions(-) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/OriginExecutionServiceTest.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java index 23c1ccee..b6439263 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java @@ -1,9 +1,11 @@ package work.slhaf.partner.core.action.runner.execution; import lombok.Data; +import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec; import java.io.BufferedReader; import java.io.InputStreamReader; +import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -37,15 +39,13 @@ public class CommandExecutionService { return exec(commands.toArray(new String[0])); } - public Result exec(String... commands) { + public Result exec(WrappedLaunchSpec launchSpec) { Result result = new Result(); List output = new ArrayList<>(); List error = new ArrayList<>(); try { - Process process = new ProcessBuilder(commands) - .redirectErrorStream(false) - .start(); + Process process = startProcess(launchSpec); Thread stdoutThread = new Thread(() -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { @@ -85,15 +85,17 @@ public class CommandExecutionService { return result; } + public Result exec(String... commands) { + return exec(defaultLaunchSpec(commands)); + } + public CommandSession createSessionTask(List commands) { return createSessionTask(commands.toArray(new String[0])); } - public CommandSession createSessionTask(String... commands) { + public CommandSession createSessionTask(WrappedLaunchSpec launchSpec) { try { - Process process = new ProcessBuilder(commands) - .redirectErrorStream(false) - .start(); + Process process = startProcess(launchSpec); CommandSession session = new CommandSession(); StringBuilder stdoutBuffer = new StringBuilder(); StringBuilder stderrBuffer = new StringBuilder(); @@ -110,6 +112,10 @@ public class CommandExecutionService { } } + public CommandSession createSessionTask(String... commands) { + return createSessionTask(defaultLaunchSpec(commands)); + } + private void readToBuffer(java.io.InputStream inputStream, StringBuilder buffer) { try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { String line; @@ -125,6 +131,31 @@ public class CommandExecutionService { } } + private Process startProcess(WrappedLaunchSpec launchSpec) throws Exception { + ProcessBuilder processBuilder = new ProcessBuilder(); + List command = new ArrayList<>(); + command.add(launchSpec.getCommand()); + command.addAll(launchSpec.getArgs()); + processBuilder.command(command); + processBuilder.redirectErrorStream(false); + if (launchSpec.getWorkingDirectory() != null && !launchSpec.getWorkingDirectory().isBlank()) { + processBuilder.directory(new File(launchSpec.getWorkingDirectory())); + } + Map environment = processBuilder.environment(); + environment.clear(); + environment.putAll(launchSpec.getEnvironment()); + return processBuilder.start(); + } + + private WrappedLaunchSpec defaultLaunchSpec(String... commands) { + return new WrappedLaunchSpec( + commands[0], + List.of(commands).subList(1, commands.length), + null, + System.getenv() + ); + } + @Data public static class Result { private boolean ok; diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/OriginExecutionService.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/OriginExecutionService.java index eba315e2..5a841315 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/OriginExecutionService.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/OriginExecutionService.java @@ -6,9 +6,7 @@ import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec; import java.io.File; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import static work.slhaf.partner.core.action.ActionCore.ORIGIN_LOCATION; @@ -22,10 +20,7 @@ public class OriginExecutionService { File file = new File(resolveOriginPath(metaAction)); String[] commands = CommandExecutionService.INSTANCE.buildFileExecutionCommands(metaAction.getLauncher(), metaAction.getParams(), file.getAbsolutePath()); WrappedLaunchSpec wrapped = ExecutionPolicyRegistry.INSTANCE.prepare(Arrays.stream(commands).toList()); - List wrappedCommands = new ArrayList<>(); - wrappedCommands.add(wrapped.getCommand()); - wrappedCommands.addAll(wrapped.getArgs()); - CommandExecutionService.Result execResult = CommandExecutionService.INSTANCE.exec(wrappedCommands); + CommandExecutionService.Result execResult = CommandExecutionService.INSTANCE.exec(wrapped); response.setOk(execResult.isOk()); response.setData(execResult.getTotal()); return response; diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionServiceTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionServiceTest.java index 53e7f189..d90bcf06 100644 --- a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionServiceTest.java +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionServiceTest.java @@ -2,6 +2,7 @@ package work.slhaf.partner.core.action.runner.execution; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec; import java.io.IOException; import java.nio.file.Files; @@ -105,6 +106,32 @@ class CommandExecutionServiceTest { Assertions.assertEquals("oops", session.getStderrBuffer().toString()); } + @Test + void testExecWrappedLaunchSpecAppliesWorkingDirectory(@org.junit.jupiter.api.io.TempDir Path tempDir) { + CommandExecutionService.Result result = service.exec(new WrappedLaunchSpec( + "sh", + List.of("-lc", "pwd"), + tempDir.toString(), + System.getenv() + )); + + Assertions.assertTrue(result.isOk()); + Assertions.assertEquals(tempDir.toString(), result.getTotal()); + } + + @Test + void testExecWrappedLaunchSpecAppliesEnvironmentOverride() { + CommandExecutionService.Result result = service.exec(new WrappedLaunchSpec( + "sh", + List.of("-lc", "printf '%s' \"$PARTNER_TEST_ENV\""), + null, + Map.of("PARTNER_TEST_ENV", "applied") + )); + + Assertions.assertTrue(result.isOk()); + Assertions.assertEquals("applied", result.getTotal()); + } + private void waitForBufferContains(StringBuilder buffer, String expected) throws InterruptedException { long deadline = System.currentTimeMillis() + 2000; while (System.currentTimeMillis() < deadline) { diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/OriginExecutionServiceTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/OriginExecutionServiceTest.java new file mode 100644 index 00000000..b3c73df6 --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/OriginExecutionServiceTest.java @@ -0,0 +1,78 @@ +package work.slhaf.partner.core.action.runner.execution; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Set; + +class OriginExecutionServiceTest { + + private static String originalUserHome; + + @BeforeAll + static void prepareTestHome() throws IOException { + originalUserHome = System.getProperty("user.home"); + Path tempHome = Files.createTempDirectory("partner-test-home"); + System.setProperty("user.home", tempHome.toString()); + } + + @AfterAll + static void restoreUserHome() { + if (originalUserHome != null) { + System.setProperty("user.home", originalUserHome); + } + } + + @Test + void testOriginExecutionServiceAppliesExecutionPolicyEnvironment(@TempDir Path tempDir) throws IOException { + Path script = tempDir.resolve("print_env.py"); + Files.writeString(script, "import os\nprint(os.getenv('PARTNER_ORIGIN_TEST', ''), end='')\n"); + + ExecutionPolicy originalPolicy = new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + true, + Map.of(), + null, + Set.of(), + Set.of() + ); + ExecutionPolicyRegistry.INSTANCE.updatePolicy(new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + false, + Map.of("PARTNER_ORIGIN_TEST", "origin-applied"), + null, + Set.of(), + Set.of() + )); + + try { + var prepared = ExecutionPolicyRegistry.INSTANCE.prepare(List.of("python3", script.toString())); + Assertions.assertEquals("origin-applied", prepared.getEnvironment().get("PARTNER_ORIGIN_TEST")); + var directExec = CommandExecutionService.INSTANCE.exec(prepared); + Assertions.assertTrue(directExec.isOk()); + Assertions.assertEquals("origin-applied", directExec.getTotal()); + OriginExecutionService service = new OriginExecutionService(); + MetaAction metaAction = new MetaAction("run", false, "python3", MetaAction.Type.ORIGIN, script.toString()); + var response = service.run(metaAction); + Assertions.assertTrue(response.isOk()); + Assertions.assertEquals("origin-applied", response.getData()); + } finally { + ExecutionPolicyRegistry.INSTANCE.updatePolicy(originalPolicy); + } + } +} From 15d7eb68507a2165d46143627e5e8c20d26bb84b Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:21:37 +0800 Subject: [PATCH 4/9] fix(DynamicActionMcp): apply execution policy to tool handler --- .../runner/mcp/DynamicActionMcpManager.java | 5 +- .../DynamicActionMcpManagerPolicyTest.java | 97 +++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManagerPolicyTest.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManager.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManager.java index 3c9e4676..b11e33d1 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManager.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManager.java @@ -15,6 +15,7 @@ import work.slhaf.partner.common.mcp.InProcessMcpTransport; import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.exception.ActionInfrastructureStartupException; import work.slhaf.partner.core.action.runner.execution.CommandExecutionService; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; import work.slhaf.partner.framework.agent.support.DirectoryWatchSupport; import java.io.File; @@ -331,7 +332,9 @@ public class DynamicActionMcpManager implements AutoCloseable { .build()); } return Mono.fromCallable(() -> { - CommandExecutionService.Result execResult = commandExecutionService.exec(commands); + CommandExecutionService.Result execResult = commandExecutionService.exec( + ExecutionPolicyRegistry.INSTANCE.prepare(List.of(commands)) + ); McpSchema.CallToolResult.Builder builder = McpSchema.CallToolResult.builder() .isError(!execResult.isOk()); List resultList = execResult.getResultList(); diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManagerPolicyTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManagerPolicyTest.java new file mode 100644 index 00000000..010dd38b --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/mcp/DynamicActionMcpManagerPolicyTest.java @@ -0,0 +1,97 @@ +package work.slhaf.partner.core.action.runner.mcp; + +import io.modelcontextprotocol.spec.McpSchema; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import reactor.core.publisher.Mono; +import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.BiFunction; + +class DynamicActionMcpManagerPolicyTest { + + private static String originalUserHome; + + @BeforeAll + static void prepareTestHome() throws IOException { + originalUserHome = System.getProperty("user.home"); + Path tempHome = Files.createTempDirectory("partner-test-home"); + System.setProperty("user.home", tempHome.toString()); + } + + @AfterAll + static void restoreUserHome() { + if (originalUserHome != null) { + System.setProperty("user.home", originalUserHome); + } + } + + @Test + @SuppressWarnings("unchecked") + void testDynamicActionHandlerAppliesExecutionPolicyEnvironment(@TempDir Path tempDir) throws Exception { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + DynamicActionMcpManager manager = new DynamicActionMcpManager(tempDir, existedMetaActions, executor)) { + Path script = tempDir.resolve("run.py"); + Files.writeString(script, "import os\nprint(os.getenv('PARTNER_DYNAMIC_TEST', ''), end='')\n"); + + ExecutionPolicy originalPolicy = new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + true, + Map.of(), + null, + Set.of(), + Set.of() + ); + ExecutionPolicyRegistry.INSTANCE.updatePolicy(new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + false, + Map.of("PARTNER_DYNAMIC_TEST", "dynamic-applied"), + null, + Set.of(), + Set.of() + )); + + try { + Method method = DynamicActionMcpManager.class.getDeclaredMethod("buildToolHandler", File.class, String.class); + method.setAccessible(true); + BiFunction> handler = + (BiFunction>) method.invoke( + manager, + script.toFile(), + "python3" + ); + + McpSchema.CallToolResult result = handler.apply( + null, + McpSchema.CallToolRequest.builder().name("demo").arguments(Map.of()).build() + ).block(); + + Assertions.assertNotNull(result); + Assertions.assertFalse(Boolean.TRUE.equals(result.isError())); + Assertions.assertEquals("[dynamic-applied]", String.valueOf(result.structuredContent())); + } finally { + ExecutionPolicyRegistry.INSTANCE.updatePolicy(originalPolicy); + } + } + } +} From dd8e20838d5dc141fb4341b287a1526081d45ef5 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:23:04 +0800 Subject: [PATCH 5/9] fix(BuiltinCommand): apply execution policy to command tools --- .../builtin/BuiltinCommandActionProvider.java | 10 +- ...uiltinCommandActionProviderPolicyTest.java | 145 ++++++++++++++++++ 2 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderPolicyTest.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java index 9ae44125..409e6176 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java @@ -4,6 +4,8 @@ import com.alibaba.fastjson2.JSONObject; import lombok.AllArgsConstructor; import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.runner.execution.CommandExecutionService; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; +import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec; import java.time.Instant; import java.util.*; @@ -58,7 +60,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { ); Function, String> invoker = params -> { List commands = requireCommandArguments(params); - CommandExecutionService.Result result = commandExecutionService.exec(commands); + CommandExecutionService.Result result = commandExecutionService.exec(wrapCommands(commands)); return JSONObject.of("result", result.getTotal()).toJSONString(); }; return new BuiltinActionRegistry.BuiltinActionDefinition( @@ -93,7 +95,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { Function, String> invoker = params -> { String desc = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "desc"); List commands = requireCommandArguments(params); - CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(commands); + CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands)); String executionId = UUID.randomUUID().toString(); CommandHandle handle = new CommandHandle( executionId, @@ -336,6 +338,10 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { }); } + private WrappedLaunchSpec wrapCommands(List commands) { + return ExecutionPolicyRegistry.INSTANCE.prepare(commands); + } + private CommandHandle requireHandle(String id) { CommandHandle handle = commandHandles.get(id); if (handle == null) { diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderPolicyTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderPolicyTest.java new file mode 100644 index 00000000..31272694 --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderPolicyTest.java @@ -0,0 +1,145 @@ +package work.slhaf.partner.module.action.builtin; + +import com.alibaba.fastjson2.JSONObject; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Set; + +class BuiltinCommandActionProviderPolicyTest { + + private static String originalUserHome; + + @BeforeAll + static void prepareTestHome() throws IOException { + originalUserHome = System.getProperty("user.home"); + Path tempHome = Files.createTempDirectory("partner-test-home"); + System.setProperty("user.home", tempHome.toString()); + } + + @AfterAll + static void restoreUserHome() { + if (originalUserHome != null) { + System.setProperty("user.home", originalUserHome); + } + } + + @Test + void testExecuteAppliesExecutionPolicyEnvironment() { + BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider(); + BuiltinActionRegistry.BuiltinActionDefinition execute = requireDefinition( + provider.provideBuiltinActions(), + "builtin::command::execute" + ); + + ExecutionPolicy originalPolicy = new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + true, + Map.of(), + null, + Set.of(), + Set.of() + ); + ExecutionPolicyRegistry.INSTANCE.updatePolicy(new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + false, + Map.of("PARTNER_BUILTIN_TEST", "builtin-applied"), + null, + Set.of(), + Set.of() + )); + + try { + String result = execute.invoker().apply(Map.of( + "arg", "sh", + "arg1", "-lc", + "arg2", "printf '%s' \"$PARTNER_BUILTIN_TEST\"" + )); + Assertions.assertEquals("builtin-applied", JSONObject.parseObject(result).getString("result")); + } finally { + ExecutionPolicyRegistry.INSTANCE.updatePolicy(originalPolicy); + } + } + + @Test + void testStartAppliesExecutionPolicyEnvironment() throws Exception { + BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider(); + List definitions = provider.provideBuiltinActions(); + BuiltinActionRegistry.BuiltinActionDefinition start = requireDefinition(definitions, "builtin::command::start"); + BuiltinActionRegistry.BuiltinActionDefinition inspect = requireDefinition(definitions, "builtin::command::inspect"); + + ExecutionPolicy originalPolicy = new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + true, + Map.of(), + null, + Set.of(), + Set.of() + ); + ExecutionPolicyRegistry.INSTANCE.updatePolicy(new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + false, + Map.of("PARTNER_BUILTIN_TEST", "builtin-session"), + null, + Set.of(), + Set.of() + )); + + try { + String startResult = start.invoker().apply(Map.of( + "desc", "policy-session", + "arg", "sh", + "arg1", "-lc", + "arg2", "printf '%s' \"$PARTNER_BUILTIN_TEST\"" + )); + String executionId = JSONObject.parseObject(startResult).getString("executionId"); + Assertions.assertNotNull(executionId); + + JSONObject inspectResult = waitForInspectExit(inspect, executionId); + Assertions.assertTrue(inspectResult.getString("stdoutSummary").contains("builtin-session")); + } finally { + ExecutionPolicyRegistry.INSTANCE.updatePolicy(originalPolicy); + } + } + + private BuiltinActionRegistry.BuiltinActionDefinition requireDefinition( + List definitions, + String actionKey + ) { + return definitions.stream() + .filter(definition -> actionKey.equals(definition.actionKey())) + .findFirst() + .orElseThrow(() -> new AssertionError("definition not found: " + actionKey)); + } + + private JSONObject waitForInspectExit(BuiltinActionRegistry.BuiltinActionDefinition inspectDefinition, String executionId) throws Exception { + long deadline = System.currentTimeMillis() + 3000; + while (System.currentTimeMillis() < deadline) { + JSONObject inspect = JSONObject.parseObject(inspectDefinition.invoker().apply(Map.of( + "id", executionId + ))); + if (inspect.get("exitCode") != null) { + return inspect; + } + Thread.sleep(20); + } + throw new AssertionError("command session did not exit in time"); + } +} From 8c8b0883bb9ad3fb49ac5c28e324377112bf8f0c Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:26:20 +0800 Subject: [PATCH 6/9] fix(BuiltinCommand): expire finished command sessions by ttl --- .../builtin/BuiltinCommandActionProvider.java | 17 +++ .../BuiltinCommandActionProviderTtlTest.java | 139 ++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderTtlTest.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java index 409e6176..8385a8b9 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java @@ -8,6 +8,7 @@ import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec; import java.time.Instant; +import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -21,6 +22,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { private static final int DEFAULT_READ_LIMIT = 4096; private static final int SUMMARY_MAX_LINES = 5; private static final int SUMMARY_MAX_LENGTH = 2048; + private static final Duration COMMAND_SESSION_TTL = Duration.ofMinutes(10); private final Set basicTags = Set.of("Builtin MetaAction", "System Command Tool"); @@ -93,6 +95,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { JSONObject.of("executionId", "Command execution session id.") ); Function, String> invoker = params -> { + cleanupExpiredHandles(); String desc = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "desc"); List commands = requireCommandArguments(params); CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands)); @@ -145,6 +148,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { ) ); Function, String> invoker = params -> { + cleanupExpiredHandles(); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); return JSONObject.of( "executionId", handle.executionId, @@ -196,6 +200,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { ) ); Function, String> invoker = params -> { + cleanupExpiredHandles(); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout"); if (!"stdout".equals(stream) && !"stderr".equals(stream)) { @@ -256,6 +261,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { ) ); Function, String> invoker = params -> { + cleanupExpiredHandles(); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); if (handle.process.isAlive()) { handle.process.destroy(); @@ -308,6 +314,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { ) ); Function, String> invoker = params -> { + cleanupExpiredHandles(); List items = commandHandles.values().stream() .sorted(Comparator.comparing(handle -> handle.startAt)) .map(handle -> JSONObject.of( @@ -338,6 +345,16 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { }); } + private void cleanupExpiredHandles() { + Instant now = Instant.now(); + commandHandles.entrySet().removeIf(entry -> isExpired(entry.getValue(), now)); + } + + private boolean isExpired(CommandHandle handle, Instant now) { + Instant exitTime = handle.exitAt; + return exitTime != null && !exitTime.plus(COMMAND_SESSION_TTL).isAfter(now); + } + private WrappedLaunchSpec wrapCommands(List commands) { return ExecutionPolicyRegistry.INSTANCE.prepare(commands); } diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderTtlTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderTtlTest.java new file mode 100644 index 00000000..266df02a --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderTtlTest.java @@ -0,0 +1,139 @@ +package work.slhaf.partner.module.action.builtin; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy; +import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry; + +import java.lang.reflect.Field; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +class BuiltinCommandActionProviderTtlTest { + + private static String originalUserHome; + + @BeforeAll + static void prepareTestHome() throws IOException { + originalUserHome = System.getProperty("user.home"); + Path tempHome = Files.createTempDirectory("partner-test-home"); + System.setProperty("user.home", tempHome.toString()); + ExecutionPolicyRegistry.INSTANCE.updatePolicy(new ExecutionPolicy( + ExecutionPolicy.Mode.DIRECT, + "direct", + ExecutionPolicy.Network.ENABLE, + true, + Map.of(), + null, + Set.of(), + Set.of() + )); + } + + @AfterAll + static void restoreUserHome() { + if (originalUserHome != null) { + System.setProperty("user.home", originalUserHome); + } + } + + @Test + void testOverviewRemovesExpiredFinishedSessions() throws Exception { + BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider(); + List definitions = provider.provideBuiltinActions(); + BuiltinActionRegistry.BuiltinActionDefinition start = requireDefinition(definitions, "builtin::command::start"); + BuiltinActionRegistry.BuiltinActionDefinition overview = requireDefinition(definitions, "builtin::command::overview"); + BuiltinActionRegistry.BuiltinActionDefinition inspect = requireDefinition(definitions, "builtin::command::inspect"); + + String startResult = start.invoker().apply(Map.of( + "desc", "ttl-session", + "arg", "sh", + "arg1", "-lc", + "arg2", "printf 'done'" + )); + String executionId = JSONObject.parseObject(startResult).getString("executionId"); + waitForInspectExit(inspect, executionId); + + expireHandle(provider, executionId); + + JSONObject overviewResult = JSONObject.parseObject(overview.invoker().apply(Map.of())); + JSONArray result = overviewResult.getJSONArray("result"); + Assertions.assertTrue(result.stream().map(item -> (JSONObject) item) + .noneMatch(item -> executionId.equals(item.getString("executionId")))); + } + + @Test + void testInspectRejectsExpiredFinishedSession() throws Exception { + BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider(); + List definitions = provider.provideBuiltinActions(); + BuiltinActionRegistry.BuiltinActionDefinition start = requireDefinition(definitions, "builtin::command::start"); + BuiltinActionRegistry.BuiltinActionDefinition inspect = requireDefinition(definitions, "builtin::command::inspect"); + + String startResult = start.invoker().apply(Map.of( + "desc", "ttl-session-inspect", + "arg", "sh", + "arg1", "-lc", + "arg2", "printf 'done'" + )); + String executionId = JSONObject.parseObject(startResult).getString("executionId"); + waitForInspectExit(inspect, executionId); + + expireHandle(provider, executionId); + + Assertions.assertThrows(IllegalArgumentException.class, () -> inspect.invoker().apply(Map.of( + "id", executionId + ))); + } + + private void expireHandle(BuiltinCommandActionProvider provider, String executionId) throws Exception { + Field handlesField = BuiltinCommandActionProvider.class.getDeclaredField("commandHandles"); + handlesField.setAccessible(true); + @SuppressWarnings("unchecked") + ConcurrentHashMap handles = (ConcurrentHashMap) handlesField.get(provider); + Object handle = handles.get(executionId); + Assertions.assertNotNull(handle); + + Field exitCodeField = handle.getClass().getDeclaredField("exitCode"); + exitCodeField.setAccessible(true); + exitCodeField.set(handle, 0); + + Field exitAtField = handle.getClass().getDeclaredField("exitAt"); + exitAtField.setAccessible(true); + exitAtField.set(handle, Instant.now().minus(11, ChronoUnit.MINUTES)); + } + + private BuiltinActionRegistry.BuiltinActionDefinition requireDefinition( + List definitions, + String actionKey + ) { + return definitions.stream() + .filter(definition -> actionKey.equals(definition.actionKey())) + .findFirst() + .orElseThrow(() -> new AssertionError("definition not found: " + actionKey)); + } + + private JSONObject waitForInspectExit(BuiltinActionRegistry.BuiltinActionDefinition inspectDefinition, String executionId) throws Exception { + long deadline = System.currentTimeMillis() + 3000; + while (System.currentTimeMillis() < deadline) { + JSONObject inspect = JSONObject.parseObject(inspectDefinition.invoker().apply(Map.of( + "id", executionId + ))); + if (inspect.get("exitCode") != null) { + return inspect; + } + Thread.sleep(20); + } + throw new AssertionError("command session did not exit in time"); + } +} From c5aa5583195e61da4b69ab171650f160b78096ee Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:27:27 +0800 Subject: [PATCH 7/9] fix(McpActionExecutor): handle client call failures gracefully --- .../runner/execution/McpActionExecutor.java | 9 ++++- .../execution/McpActionExecutorTest.java | 38 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/McpActionExecutorTest.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/McpActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/McpActionExecutor.java index c4add8d1..26dde909 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/McpActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/McpActionExecutor.java @@ -29,7 +29,14 @@ public class McpActionExecutor { .name(metaAction.getName()) .arguments(metaAction.getParams()) .build(); - McpSchema.CallToolResult callToolResult = mcpClient.callTool(callToolRequest); + McpSchema.CallToolResult callToolResult; + try { + callToolResult = mcpClient.callTool(callToolRequest); + } catch (Exception e) { + response.setOk(false); + response.setData("MCP tool call failed: " + e.getMessage()); + return response; + } Boolean error = callToolResult.isError(); response.setOk(error == null || !error); response.setData(extractResponseData(callToolResult)); diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/McpActionExecutorTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/McpActionExecutorTest.java new file mode 100644 index 00000000..3956adb3 --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/McpActionExecutorTest.java @@ -0,0 +1,38 @@ +package work.slhaf.partner.core.action.runner.execution; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.action.runner.mcp.McpClientRegistry; + +import java.lang.reflect.Field; +import java.util.Map; + +class McpActionExecutorTest { + + @Test + void testRunReturnsFailureWhenClientThrows() { + McpClientRegistry clientRegistry = new McpClientRegistry(); + clientRegistry.register("broken", buildThrowingMcpClient()); + McpActionExecutor executor = new McpActionExecutor(clientRegistry); + + MetaAction metaAction = new MetaAction("demo-tool", false, null, MetaAction.Type.MCP, "broken"); + metaAction.getParams().putAll(Map.of("value", "demo")); + + var response = executor.run(metaAction); + + Assertions.assertFalse(response.isOk()); + Assertions.assertTrue(response.getData().startsWith("MCP tool call failed:")); + } + + private io.modelcontextprotocol.client.McpSyncClient buildThrowingMcpClient() { + try { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + sun.misc.Unsafe unsafe = (sun.misc.Unsafe) unsafeField.get(null); + return (io.modelcontextprotocol.client.McpSyncClient) unsafe.allocateInstance(io.modelcontextprotocol.client.McpSyncClient.class); + } catch (Exception e) { + throw new IllegalStateException("failed to build throwing mcp client", e); + } + } +} From 137b1ee91784336f376b39956ca1dcdd26bab0e5 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:30:16 +0800 Subject: [PATCH 8/9] fix(CommandExecutionService): avoid persistent reader executor --- .../runner/execution/CommandExecutionService.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java index b6439263..16b2dde6 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java @@ -10,16 +10,12 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public class CommandExecutionService { public static final CommandExecutionService INSTANCE = new CommandExecutionService(); - private final ExecutorService readerExecutor = Executors.newVirtualThreadPerTaskExecutor(); - private CommandExecutionService() { } @@ -47,7 +43,7 @@ public class CommandExecutionService { try { Process process = startProcess(launchSpec); - Thread stdoutThread = new Thread(() -> { + Thread stdoutThread = Thread.startVirtualThread(() -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = reader.readLine()) != null) { @@ -57,7 +53,7 @@ public class CommandExecutionService { } }); - Thread stderrThread = new Thread(() -> { + Thread stderrThread = Thread.startVirtualThread(() -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { String line; while ((line = reader.readLine()) != null) { @@ -67,9 +63,6 @@ public class CommandExecutionService { } }); - readerExecutor.execute(stdoutThread); - readerExecutor.execute(stderrThread); - int exitCode = process.waitFor(); stdoutThread.join(); stderrThread.join(); @@ -103,8 +96,8 @@ public class CommandExecutionService { session.setStdoutBuffer(stdoutBuffer); session.setStderrBuffer(stderrBuffer); - readerExecutor.execute(() -> readToBuffer(process.getInputStream(), stdoutBuffer)); - readerExecutor.execute(() -> readToBuffer(process.getErrorStream(), stderrBuffer)); + Thread.startVirtualThread(() -> readToBuffer(process.getInputStream(), stdoutBuffer)); + Thread.startVirtualThread(() -> readToBuffer(process.getErrorStream(), stderrBuffer)); return session; } catch (Exception e) { From eea72c747cfd8e26d8f031989bdee21fb8611386 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 19 Apr 2026 17:31:36 +0800 Subject: [PATCH 9/9] fix(DirectoryWatchSupport): isolate handler failures --- .../agent/support/DirectoryWatchSupport.java | 6 ++- .../support/DirectoryWatchSupportTest.java | 38 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/support/DirectoryWatchSupport.java b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/support/DirectoryWatchSupport.java index 631129b1..f6e13935 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/support/DirectoryWatchSupport.java +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/support/DirectoryWatchSupport.java @@ -157,7 +157,11 @@ public class DirectoryWatchSupport implements Closeable { if (handler == null) { continue; } - handler.handle(thisDir, resolvedContext); + try { + handler.handle(thisDir, resolvedContext); + } catch (Exception e) { + log.error("监听事件处理失败: dir={}, kind={}, context={}", thisDir, kind.name(), resolvedContext, e); + } } } catch (InterruptedException e) { log.info("监听线程被中断,准备退出..."); diff --git a/Partner-Framework/src/test/java/work/slhaf/partner/framework/common/support/DirectoryWatchSupportTest.java b/Partner-Framework/src/test/java/work/slhaf/partner/framework/common/support/DirectoryWatchSupportTest.java index a12e5ded..ce73b417 100644 --- a/Partner-Framework/src/test/java/work/slhaf/partner/framework/common/support/DirectoryWatchSupportTest.java +++ b/Partner-Framework/src/test/java/work/slhaf/partner/framework/common/support/DirectoryWatchSupportTest.java @@ -11,8 +11,11 @@ import java.nio.file.Path; import java.util.Comparator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; class DirectoryWatchSupportTest { @@ -115,6 +118,41 @@ class DirectoryWatchSupportTest { } } + @Test + void testHandlerExceptionDoesNotStopWatching(@TempDir Path tempDir) throws Exception { + AtomicBoolean shouldThrow = new AtomicBoolean(true); + CountDownLatch goodEventLatch = new CountDownLatch(1); + + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + DirectoryWatchSupport watchSupport = new DirectoryWatchSupport( + new DirectoryWatchSupport.Context(tempDir), + executor, + 0, + null + )) { + List events = new CopyOnWriteArrayList<>(); + watchSupport.onCreate((thisDir, context) -> { + if (context == null || Files.isDirectory(context)) { + return; + } + String relative = tempDir.relativize(context).toString().replace('\\', '/'); + if (shouldThrow.getAndSet(false)) { + throw new IllegalStateException("boom"); + } + events.add(relative); + goodEventLatch.countDown(); + }); + + watchSupport.start(); + + Files.writeString(tempDir.resolve("first.txt"), "first"); + Files.writeString(tempDir.resolve("second.txt"), "second"); + + Assertions.assertTrue(goodEventLatch.await(2, TimeUnit.SECONDS)); + Assertions.assertEquals(List.of("second.txt"), events); + } + } + private WatchHarness createWatchSupport(Path root, ExecutorService executor, int watchDepth) throws IOException { DirectoryWatchSupport watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, watchDepth, null); List events = new CopyOnWriteArrayList<>();