Merge branch 'codex-review-localrunnerclient-v2'

This commit is contained in:
2026-04-20 14:06:03 +08:00
18 changed files with 837 additions and 40 deletions

View File

@@ -176,6 +176,7 @@ public class LocalRunnerClient extends RunnerClient {
if (!closed.compareAndSet(false, true)) {
return;
}
mcpConfigWatcher.unregisterPolicyListener();
closeQuietly(mcpConfigWatcher);
closeQuietly(dynamicActionMcpManager);
closeQuietly(mcpDescWatcher);

View File

@@ -1,23 +1,21 @@
package work.slhaf.partner.core.action.runner.execution;
import lombok.Data;
import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class CommandExecutionService {
public static final CommandExecutionService INSTANCE = new CommandExecutionService();
private final ExecutorService readerExecutor = Executors.newVirtualThreadPerTaskExecutor();
private CommandExecutionService() {
}
@@ -37,17 +35,15 @@ public class CommandExecutionService {
return exec(commands.toArray(new String[0]));
}
public Result exec(String... commands) {
public Result exec(WrappedLaunchSpec launchSpec) {
Result result = new Result();
List<String> output = new ArrayList<>();
List<String> error = new ArrayList<>();
try {
Process process = new ProcessBuilder(commands)
.redirectErrorStream(false)
.start();
Process process = startProcess(launchSpec);
Thread stdoutThread = new Thread(() -> {
Thread stdoutThread = Thread.startVirtualThread(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
@@ -57,7 +53,7 @@ public class CommandExecutionService {
}
});
Thread stderrThread = new Thread(() -> {
Thread stderrThread = Thread.startVirtualThread(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
String line;
while ((line = reader.readLine()) != null) {
@@ -67,9 +63,6 @@ public class CommandExecutionService {
}
});
readerExecutor.execute(stdoutThread);
readerExecutor.execute(stderrThread);
int exitCode = process.waitFor();
stdoutThread.join();
stderrThread.join();
@@ -85,15 +78,17 @@ public class CommandExecutionService {
return result;
}
public Result exec(String... commands) {
return exec(defaultLaunchSpec(commands));
}
public CommandSession createSessionTask(List<String> commands) {
return createSessionTask(commands.toArray(new String[0]));
}
public CommandSession createSessionTask(String... commands) {
public CommandSession createSessionTask(WrappedLaunchSpec launchSpec) {
try {
Process process = new ProcessBuilder(commands)
.redirectErrorStream(false)
.start();
Process process = startProcess(launchSpec);
CommandSession session = new CommandSession();
StringBuilder stdoutBuffer = new StringBuilder();
StringBuilder stderrBuffer = new StringBuilder();
@@ -101,8 +96,8 @@ public class CommandExecutionService {
session.setStdoutBuffer(stdoutBuffer);
session.setStderrBuffer(stderrBuffer);
readerExecutor.execute(() -> readToBuffer(process.getInputStream(), stdoutBuffer));
readerExecutor.execute(() -> readToBuffer(process.getErrorStream(), stderrBuffer));
Thread.startVirtualThread(() -> readToBuffer(process.getInputStream(), stdoutBuffer));
Thread.startVirtualThread(() -> readToBuffer(process.getErrorStream(), stderrBuffer));
return session;
} catch (Exception e) {
@@ -110,6 +105,10 @@ public class CommandExecutionService {
}
}
public CommandSession createSessionTask(String... commands) {
return createSessionTask(defaultLaunchSpec(commands));
}
private void readToBuffer(java.io.InputStream inputStream, StringBuilder buffer) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line;
@@ -125,6 +124,31 @@ public class CommandExecutionService {
}
}
private Process startProcess(WrappedLaunchSpec launchSpec) throws Exception {
ProcessBuilder processBuilder = new ProcessBuilder();
List<String> command = new ArrayList<>();
command.add(launchSpec.getCommand());
command.addAll(launchSpec.getArgs());
processBuilder.command(command);
processBuilder.redirectErrorStream(false);
if (launchSpec.getWorkingDirectory() != null && !launchSpec.getWorkingDirectory().isBlank()) {
processBuilder.directory(new File(launchSpec.getWorkingDirectory()));
}
Map<String, String> environment = processBuilder.environment();
environment.clear();
environment.putAll(launchSpec.getEnvironment());
return processBuilder.start();
}
private WrappedLaunchSpec defaultLaunchSpec(String... commands) {
return new WrappedLaunchSpec(
commands[0],
List.of(commands).subList(1, commands.length),
null,
System.getenv()
);
}
@Data
public static class Result {
private boolean ok;

View File

@@ -29,7 +29,14 @@ public class McpActionExecutor {
.name(metaAction.getName())
.arguments(metaAction.getParams())
.build();
McpSchema.CallToolResult callToolResult = mcpClient.callTool(callToolRequest);
McpSchema.CallToolResult callToolResult;
try {
callToolResult = mcpClient.callTool(callToolRequest);
} catch (Exception e) {
response.setOk(false);
response.setData("MCP tool call failed: " + e.getMessage());
return response;
}
Boolean error = callToolResult.isError();
response.setOk(error == null || !error);
response.setData(extractResponseData(callToolResult));

View File

@@ -6,9 +6,7 @@ import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry;
import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static work.slhaf.partner.core.action.ActionCore.ORIGIN_LOCATION;
@@ -22,10 +20,7 @@ public class OriginExecutionService {
File file = new File(resolveOriginPath(metaAction));
String[] commands = CommandExecutionService.INSTANCE.buildFileExecutionCommands(metaAction.getLauncher(), metaAction.getParams(), file.getAbsolutePath());
WrappedLaunchSpec wrapped = ExecutionPolicyRegistry.INSTANCE.prepare(Arrays.stream(commands).toList());
List<String> wrappedCommands = new ArrayList<>();
wrappedCommands.add(wrapped.getCommand());
wrappedCommands.addAll(wrapped.getArgs());
CommandExecutionService.Result execResult = CommandExecutionService.INSTANCE.exec(wrappedCommands);
CommandExecutionService.Result execResult = CommandExecutionService.INSTANCE.exec(wrapped);
response.setOk(execResult.isOk());
response.setData(execResult.getTotal());
return response;

View File

@@ -15,6 +15,7 @@ import work.slhaf.partner.common.mcp.InProcessMcpTransport;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.exception.ActionInfrastructureStartupException;
import work.slhaf.partner.core.action.runner.execution.CommandExecutionService;
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry;
import work.slhaf.partner.framework.agent.support.DirectoryWatchSupport;
import java.io.File;
@@ -331,7 +332,9 @@ public class DynamicActionMcpManager implements AutoCloseable {
.build());
}
return Mono.fromCallable(() -> {
CommandExecutionService.Result execResult = commandExecutionService.exec(commands);
CommandExecutionService.Result execResult = commandExecutionService.exec(
ExecutionPolicyRegistry.INSTANCE.prepare(List.of(commands))
);
McpSchema.CallToolResult.Builder builder = McpSchema.CallToolResult.builder()
.isError(!execResult.isOk());
List<String> resultList = execResult.getResultList();

View File

@@ -19,6 +19,7 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -95,14 +96,7 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis
return;
}
for (String clientId : fileRecord.paramsCacheMap().keySet()) {
McpSyncClient client = mcpClientRegistry.detach(clientId);
if (client == null) {
continue;
}
for (McpSchema.Tool tool : client.listTools().tools()) {
existedMetaActions.remove(clientId + "::" + tool.name());
}
client.close();
unregisterClient(clientId);
}
}
@@ -126,6 +120,39 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis
}
}
private void unregisterClient(String clientId) {
McpSyncClient client = mcpClientRegistry.detach(clientId);
removeClientActions(clientId, client);
if (client != null) {
try {
client.close();
} catch (Exception e) {
log.warn("[{}] MCP client close failed", clientId, e);
}
}
}
private void removeClientActions(String clientId, McpSyncClient client) {
boolean removedByListing = false;
if (client != null) {
try {
List<McpSchema.Tool> tools = client.listTools().tools();
if (tools != null) {
for (McpSchema.Tool tool : tools) {
existedMetaActions.remove(clientId + "::" + tool.name());
}
removedByListing = true;
}
} catch (Exception e) {
log.warn("[{}] MCP client listTools failed during unregister, fallback to key scan", clientId, e);
}
}
if (!removedByListing) {
String prefix = clientId + "::";
existedMetaActions.keySet().removeIf(actionKey -> actionKey.startsWith(prefix));
}
}
private cn.hutool.json.JSONObject readJson(File file) {
try {
return JSONUtil.readJSONObject(file, StandardCharsets.UTF_8);
@@ -237,13 +264,16 @@ public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyLis
}
private void updateMcpClients(HashMap<String, McpTransportConfig> changedMap, HashSet<String> existingMcpIdSet) {
changedMap.forEach(this::registerMcpClient);
changedMap.forEach((clientId, config) -> {
unregisterClient(clientId);
registerMcpClient(clientId, config);
});
for (String clientId : mcpClientRegistry.listIds()) {
if (clientId.equals(LocalRunnerClient.MCP_NAME_DESC) || clientId.equals(LocalRunnerClient.MCP_NAME_DYNAMIC)) {
continue;
}
if (!existingMcpIdSet.contains(clientId)) {
mcpClientRegistry.remove(clientId);
unregisterClient(clientId);
}
}
existedMetaActions.keySet().removeIf(actionKey -> {

View File

@@ -150,4 +150,8 @@ interface RunnerExecutionPolicyListener {
fun registerPolicyListener() {
ExecutionPolicyRegistry.addListener(this)
}
fun unregisterPolicyListener() {
ExecutionPolicyRegistry.removeListener(this)
}
}

View File

@@ -4,8 +4,11 @@ import com.alibaba.fastjson2.JSONObject;
import lombok.AllArgsConstructor;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.runner.execution.CommandExecutionService;
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry;
import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
import java.time.Instant;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@@ -19,6 +22,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
private static final int DEFAULT_READ_LIMIT = 4096;
private static final int SUMMARY_MAX_LINES = 5;
private static final int SUMMARY_MAX_LENGTH = 2048;
private static final Duration COMMAND_SESSION_TTL = Duration.ofMinutes(10);
private final Set<String> basicTags = Set.of("Builtin MetaAction", "System Command Tool");
@@ -58,7 +62,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
);
Function<Map<String, Object>, String> invoker = params -> {
List<String> commands = requireCommandArguments(params);
CommandExecutionService.Result result = commandExecutionService.exec(commands);
CommandExecutionService.Result result = commandExecutionService.exec(wrapCommands(commands));
return JSONObject.of("result", result.getTotal()).toJSONString();
};
return new BuiltinActionRegistry.BuiltinActionDefinition(
@@ -91,9 +95,10 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
JSONObject.of("executionId", "Command execution session id.")
);
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
String desc = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "desc");
List<String> commands = requireCommandArguments(params);
CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(commands);
CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands));
String executionId = UUID.randomUUID().toString();
CommandHandle handle = new CommandHandle(
executionId,
@@ -143,6 +148,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
)
);
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
return JSONObject.of(
"executionId", handle.executionId,
@@ -194,6 +200,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
)
);
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout");
if (!"stdout".equals(stream) && !"stderr".equals(stream)) {
@@ -254,6 +261,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
)
);
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
if (handle.process.isAlive()) {
handle.process.destroy();
@@ -306,6 +314,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
)
);
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
List<JSONObject> items = commandHandles.values().stream()
.sorted(Comparator.comparing(handle -> handle.startAt))
.map(handle -> JSONObject.of(
@@ -336,6 +345,20 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
});
}
private void cleanupExpiredHandles() {
Instant now = Instant.now();
commandHandles.entrySet().removeIf(entry -> isExpired(entry.getValue(), now));
}
private boolean isExpired(CommandHandle handle, Instant now) {
Instant exitTime = handle.exitAt;
return exitTime != null && !exitTime.plus(COMMAND_SESSION_TTL).isAfter(now);
}
private WrappedLaunchSpec wrapCommands(List<String> commands) {
return ExecutionPolicyRegistry.INSTANCE.prepare(commands);
}
private CommandHandle requireHandle(String id) {
CommandHandle handle = commandHandles.get(id);
if (handle == null) {