From 12368ded53875949efa9113b8ff9132128e125aa Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Wed, 18 Mar 2026 23:00:56 +0800 Subject: [PATCH] feat(runner): implement builtin command session actions with start/inspect/read/cancel/overview --- .../execution/CommandExecutionService.java | 46 ++- .../builtin/BuiltinCommandActionManager.java | 338 +++++++++++++++++- .../CommandExecutionServiceTest.java | 27 ++ .../BuiltinCommandActionManagerTest.java | 84 +++++ 4 files changed, 478 insertions(+), 17 deletions(-) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/builtin/BuiltinCommandActionManagerTest.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java index 8ecf32d6..23c1ccee 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionService.java @@ -4,6 +4,7 @@ import lombok.Data; import java.io.BufferedReader; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -84,6 +85,46 @@ public class CommandExecutionService { return result; } + public CommandSession createSessionTask(List commands) { + return createSessionTask(commands.toArray(new String[0])); + } + + public CommandSession createSessionTask(String... commands) { + try { + Process process = new ProcessBuilder(commands) + .redirectErrorStream(false) + .start(); + CommandSession session = new CommandSession(); + StringBuilder stdoutBuffer = new StringBuilder(); + StringBuilder stderrBuffer = new StringBuilder(); + session.setProcess(process); + session.setStdoutBuffer(stdoutBuffer); + session.setStderrBuffer(stderrBuffer); + + readerExecutor.execute(() -> readToBuffer(process.getInputStream(), stdoutBuffer)); + readerExecutor.execute(() -> readToBuffer(process.getErrorStream(), stderrBuffer)); + + return session; + } catch (Exception e) { + throw new IllegalStateException("创建命令会话失败", e); + } + } + + private void readToBuffer(java.io.InputStream inputStream, StringBuilder buffer) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + synchronized (buffer) { + if (!buffer.isEmpty()) { + buffer.append('\n'); + } + buffer.append(line); + } + } + } catch (Exception ignored) { + } + } + @Data public static class Result { private boolean ok; @@ -92,6 +133,9 @@ public class CommandExecutionService { } @Data - public static class CommandSessionResult { + public static class CommandSession { + private Process process; + private StringBuilder stdoutBuffer; + private StringBuilder stderrBuffer; } } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/builtin/BuiltinCommandActionManager.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/builtin/BuiltinCommandActionManager.java index f4deeb68..cf94f784 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/builtin/BuiltinCommandActionManager.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/builtin/BuiltinCommandActionManager.java @@ -6,10 +6,7 @@ import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.runner.execution.CommandExecutionService; import java.time.Instant; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -18,11 +15,15 @@ import static work.slhaf.partner.core.action.ActionCore.BUILTIN_LOCATION; class BuiltinCommandActionManager { private static final String COMMAND_LOCATION = BUILTIN_LOCATION + "::" + "command"; + private static final String COMMAND_ARG_PREFIX = "arg"; + private static final int DEFAULT_READ_LIMIT = 4096; + private static final int SUMMARY_MAX_LINES = 5; + private static final int SUMMARY_MAX_LENGTH = 2048; private final Set basicTags = Set.of("Builtin MetaAction", "System Command Tool"); - private ConcurrentHashMap commandHandles = new ConcurrentHashMap<>(); - private CommandExecutionService commandExecutionService = CommandExecutionService.INSTANCE; + private final ConcurrentHashMap commandHandles = new ConcurrentHashMap<>(); + private final CommandExecutionService commandExecutionService = CommandExecutionService.INSTANCE; /** * 用于直接执行的 Builtin MetaAction @@ -35,7 +36,7 @@ class BuiltinCommandActionManager { MetaActionInfo info = new MetaActionInfo( false, null, - Map.of("Command Arguments", "Command Arguments"), + Map.of("arg / argN", "Command arguments. Use arg for first argument, arg1/arg2... for remaining arguments."), "Execute any allowed system commands and get result instantly, the number of arguments is not limited.", tags, Set.of(), @@ -44,9 +45,7 @@ class BuiltinCommandActionManager { JSONObject.of("result", "Command execution result.") ); Function, String> invoker = params -> { - List commands = params.keySet().stream() - .map(paramKey -> BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, paramKey)) - .toList(); + List commands = requireCommandArguments(params); CommandExecutionService.Result result = commandExecutionService.exec(commands); return JSONObject.of("result", result.getTotal()).toJSONString(); }; @@ -63,7 +62,43 @@ class BuiltinCommandActionManager { * @return 内建 MetaAction 定义数据,参数为命令列表及进程描述,返回值为进程句柄 id */ BuiltinActionRegistry.BuiltinActionDefinition buildCommandStartDefinition() { - return null; + Set tags = new HashSet<>(basicTags); + tags.add("Command Session"); + MetaActionInfo info = new MetaActionInfo( + false, + null, + Map.of( + "desc", "Command session description.", + "arg / argN", "Command arguments. Use arg for first argument, arg1/arg2... for remaining arguments." + ), + "Start a background command session and return execution id.", + tags, + Set.of(), + Set.of(createActionKey("inspect")), + false, + JSONObject.of("executionId", "Command execution session id.") + ); + Function, String> invoker = params -> { + String desc = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "desc"); + List commands = requireCommandArguments(params); + CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(commands); + String executionId = UUID.randomUUID().toString(); + CommandHandle handle = new CommandHandle( + executionId, + desc, + commands, + Instant.now(), + session.getProcess(), + session.getStdoutBuffer(), + session.getStderrBuffer(), + null, + null + ); + commandHandles.put(executionId, handle); + monitorProcess(handle); + return JSONObject.of("executionId", executionId).toJSONString(); + }; + return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("start"), info, invoker); } /** @@ -72,7 +107,35 @@ class BuiltinCommandActionManager { * @return 内建 MetaAction 定义数据,参数为进程 id,返回值为摘要内容(CommandInspectData) */ BuiltinActionRegistry.BuiltinActionDefinition buildCommandInspectDefinition() { - return null; + Set tags = new HashSet<>(basicTags); + tags.add("Command Session"); + MetaActionInfo info = new MetaActionInfo( + false, + null, + Map.of("id", "Command session id."), + "Inspect a background command session.", + tags, + Set.of(createActionKey("overview")), + Set.of(), + false, + JSONObject.of("result", "CommandInspectData") + ); + Function, String> invoker = params -> { + CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); + CommandInspectData data = new CommandInspectData( + handle.executionId, + handle.desc, + handle.exitCode, + bufferLength(handle.stdoutBuffer), + bufferLength(handle.stderrBuffer), + summarizeBuffer(handle.stdoutBuffer), + summarizeBuffer(handle.stderrBuffer), + handle.startAt, + handle.exitAt + ); + return toJson(data); + }; + return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("inspect"), info, invoker); } /** @@ -81,7 +144,61 @@ class BuiltinCommandActionManager { * @return 内建 MetaAction 定义数据,参数为进程 id 与读取流(stdout/stderr),返回值为读取内容(CommandReadData) */ BuiltinActionRegistry.BuiltinActionDefinition buildCommandReadDefinition() { - return null; + Set tags = new HashSet<>(basicTags); + tags.add("Command Session"); + tags.add("Command Read"); + MetaActionInfo info = new MetaActionInfo( + false, + null, + Map.of( + "id", "Command execution session id.", + "stream", "Target stream, stdout or stderr. Default stdout.", + "offset", "Read start offset. Default 0.", + "limit", "Read max length. Default 4096." + ), + "Read output from a background command session.", + tags, + Set.of(createActionKey("overview")), + Set.of(), + false, + JSONObject.of("result", "CommandReadData") + ); + Function, String> invoker = params -> { + CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); + String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout"); + if (!"stdout".equals(stream) && !"stderr".equals(stream)) { + throw new IllegalArgumentException("参数 stream 只能为 stdout 或 stderr"); + } + int offset = BuiltinActionRegistry.BuiltinActionDefinition.optionalInt(params, "offset", 0); + int limit = BuiltinActionRegistry.BuiltinActionDefinition.optionalInt(params, "limit", DEFAULT_READ_LIMIT); + if (offset < 0) { + throw new IllegalArgumentException("参数 offset 必须大于等于 0"); + } + if (limit <= 0) { + throw new IllegalArgumentException("参数 limit 必须大于 0"); + } + + StringBuilder buffer = "stderr".equals(stream) ? handle.stderrBuffer : handle.stdoutBuffer; + String snapshot = bufferSnapshot(buffer); + int safeOffset = Math.min(offset, snapshot.length()); + int nextOffset = Math.min(safeOffset + limit, snapshot.length()); + String content = snapshot.substring(safeOffset, nextOffset); + boolean truncated = nextOffset < snapshot.length(); + boolean eof = !handle.isRunning() && nextOffset >= snapshot.length(); + + CommandReadData data = new CommandReadData( + handle.executionId, + handle.desc, + stream, + content, + truncated, + safeOffset, + nextOffset, + eof + ); + return toJson(data); + }; + return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("read"), info, invoker); } /** @@ -90,7 +207,45 @@ class BuiltinCommandActionManager { * @return 内建 MetaAction 定义数据,参数为进程 id,返回值为是否成功取消 */ BuiltinActionRegistry.BuiltinActionDefinition buildCommandCancelDefinition() { - return null; + Set tags = new HashSet<>(basicTags); + tags.add("Command Session"); + tags.add("Command Cancel"); + MetaActionInfo info = new MetaActionInfo( + false, + null, + Map.of("id", "Command session id."), + "Cancel a background command session.", + tags, + Set.of(), + Set.of(createActionKey("overview")), + false, + JSONObject.of("ok", "Whether the command has been cancelled.", "executionId", "Command execution session id.") + ); + Function, String> invoker = params -> { + CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); + if (handle.process.isAlive()) { + handle.process.destroy(); + waitProcessExit(handle.process, 200); + if (handle.process.isAlive()) { + handle.process.destroyForcibly(); + waitProcessExit(handle.process, 200); + } + } + if (!handle.process.isAlive()) { + try { + handle.exitCode = handle.process.exitValue(); + } catch (IllegalThreadStateException ignored) { + } + if (handle.exitAt == null) { + handle.exitAt = Instant.now(); + } + } + return JSONObject.of( + "ok", !handle.process.isAlive(), + "executionId", handle.executionId + ).toJSONString(); + }; + return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("cancel"), info, invoker); } /** @@ -99,13 +254,164 @@ class BuiltinCommandActionManager { * @return 内建 MetaAction 定义数据,无参数,返回值为后台进程集合(CommandOverviewItem) */ BuiltinActionRegistry.BuiltinActionDefinition buildCommandOverviewDefinition() { - return null; + Set tags = new HashSet<>(basicTags); + tags.add("Command Session"); + tags.add("Command Overview"); + MetaActionInfo info = new MetaActionInfo( + false, + null, + Map.of(), + "List all background command sessions.", + tags, + Set.of(createActionKey("start")), + Set.of(createActionKey("inspect"), createActionKey("read"), createActionKey("cancel")), + false, + JSONObject.of("result", "CommandOverviewItem[]") + ); + Function, String> invoker = params -> { + List items = commandHandles.values().stream() + .sorted(Comparator.comparing(handle -> handle.startAt)) + .map(handle -> { + CommandOverviewItem item = new CommandOverviewItem( + handle.executionId, + handle.desc, + handle.exitCode + ); + JSONObject json = new JSONObject(); + json.put("executionId", item.executionId); + json.put("desc", item.desc); + json.put("exitCode", item.exitCode); + return json; + }) + .toList(); + return JSONObject.of("result", items).toJSONString(); + }; + return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("overview"), info, invoker); } private String createActionKey(String actionName) { return COMMAND_LOCATION + "::" + actionName; } + private void monitorProcess(CommandHandle handle) { + Thread.startVirtualThread(() -> { + try { + handle.exitCode = handle.process.waitFor(); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } finally { + handle.exitAt = Instant.now(); + } + }); + } + + private CommandHandle requireHandle(String id) { + CommandHandle handle = commandHandles.get(id); + if (handle == null) { + throw new IllegalArgumentException("未找到对应命令会话: " + id); + } + return handle; + } + + private List requireCommandArguments(Map params) { + List> entries = params.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(COMMAND_ARG_PREFIX)) + .sorted(Comparator.comparingInt(entry -> commandArgIndex(entry.getKey()))) + .toList(); + if (entries.isEmpty()) { + throw new IllegalArgumentException("缺少命令参数"); + } + List commands = new ArrayList<>(); + for (Map.Entry entry : entries) { + commands.add(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, entry.getKey())); + } + return commands; + } + + private int commandArgIndex(String key) { + String suffix = key.substring(COMMAND_ARG_PREFIX.length()).trim(); + if (suffix.isEmpty()) { + return 0; + } + try { + return Integer.parseInt(suffix); + } catch (NumberFormatException ignored) { + return Integer.MAX_VALUE; + } + } + + private int bufferLength(StringBuilder buffer) { + synchronized (buffer) { + return buffer.length(); + } + } + + private String bufferSnapshot(StringBuilder buffer) { + synchronized (buffer) { + return buffer.toString(); + } + } + + private String summarizeBuffer(StringBuilder buffer) { + String snapshot = bufferSnapshot(buffer); + if (snapshot.isBlank()) { + return ""; + } + List lines = snapshot.lines().toList(); + if (lines.size() <= SUMMARY_MAX_LINES * 2) { + return trimSummary(snapshot); + } + + List head = lines.subList(0, SUMMARY_MAX_LINES); + List tail = lines.subList(Math.max(lines.size() - SUMMARY_MAX_LINES, SUMMARY_MAX_LINES), lines.size()); + String summary = String.join("\n", head) + + "\n...\n" + + String.join("\n", tail); + return trimSummary(summary); + } + + private String trimSummary(String content) { + if (content.length() <= SUMMARY_MAX_LENGTH) { + return content; + } + return content.substring(0, SUMMARY_MAX_LENGTH); + } + + private void waitProcessExit(Process process, long millis) { + try { + process.waitFor(millis, java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + + private String toJson(CommandInspectData data) { + JSONObject result = new JSONObject(); + result.put("executionId", data.executionId); + result.put("desc", data.desc); + result.put("exitCode", data.exitCode); + result.put("stdoutSize", data.stdoutSize); + result.put("stderrSize", data.stderrSize); + result.put("stdoutSummary", data.stdoutSummary); + result.put("stderrSummary", data.stderrSummary); + result.put("startAt", data.startAt); + result.put("endAt", data.endAt); + return result.toJSONString(); + } + + private String toJson(CommandReadData data) { + JSONObject result = new JSONObject(); + result.put("executionId", data.executionId); + result.put("desc", data.desc); + result.put("stream", data.stream); + result.put("content", data.content); + result.put("contentTruncated", data.contentTruncated); + result.put("offset", data.offset); + result.put("nextOffset", data.nextOffset); + result.put("eof", data.eof); + return result.toJSONString(); + } + @AllArgsConstructor private static class CommandHandle { private String executionId; @@ -190,4 +496,4 @@ class BuiltinCommandActionManager { private String desc; private Integer exitCode; } -} +} \ No newline at end of file diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionServiceTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionServiceTest.java index 3e4f1eb0..53e7f189 100644 --- a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionServiceTest.java +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/execution/CommandExecutionServiceTest.java @@ -90,4 +90,31 @@ class CommandExecutionServiceTest { Assertions.assertEquals(List.of("out"), result.getResultList()); Assertions.assertEquals("out", result.getTotal()); } + + @Test + void testCreateSessionTaskCollectsStdoutAndStderr() throws Exception { + CommandExecutionService.CommandSession session = service.createSessionTask( + "sh", "-lc", "printf 'hello\\nworld\\n'; printf 'oops\\n' >&2" + ); + + session.getProcess().waitFor(); + waitForBufferContains(session.getStdoutBuffer(), "world"); + waitForBufferContains(session.getStderrBuffer(), "oops"); + + Assertions.assertEquals("hello\nworld", session.getStdoutBuffer().toString()); + Assertions.assertEquals("oops", session.getStderrBuffer().toString()); + } + + private void waitForBufferContains(StringBuilder buffer, String expected) throws InterruptedException { + long deadline = System.currentTimeMillis() + 2000; + while (System.currentTimeMillis() < deadline) { + synchronized (buffer) { + if (buffer.toString().contains(expected)) { + return; + } + } + Thread.sleep(20); + } + Assertions.fail("buffer did not contain expected text: " + expected); + } } diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/builtin/BuiltinCommandActionManagerTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/builtin/BuiltinCommandActionManagerTest.java new file mode 100644 index 00000000..d2a40f10 --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/builtin/BuiltinCommandActionManagerTest.java @@ -0,0 +1,84 @@ +package work.slhaf.partner.module.modules.action.builtin; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +class BuiltinCommandActionManagerTest { + + @Test + void testStartInspectReadAndOverview() throws Exception { + BuiltinCommandActionManager manager = new BuiltinCommandActionManager(); + + String startResult = manager.buildCommandStartDefinition().invoker().apply(Map.of( + "desc", "demo-session", + "arg", "sh", + "arg1", "-lc", + "arg2", "printf 'hello\\nworld\\n'; printf 'oops\\n' >&2" + )); + String executionId = JSONObject.parseObject(startResult).getString("executionId"); + Assertions.assertNotNull(executionId); + + JSONObject inspect = waitForInspectExit(manager, executionId); + Assertions.assertEquals("demo-session", inspect.getString("desc")); + Assertions.assertEquals(0, inspect.getInteger("exitCode")); + Assertions.assertTrue(inspect.getInteger("stdoutSize") > 0); + Assertions.assertTrue(inspect.getInteger("stderrSize") > 0); + Assertions.assertTrue(inspect.getString("stdoutSummary").contains("hello")); + Assertions.assertTrue(inspect.getString("stderrSummary").contains("oops")); + + JSONObject read = JSONObject.parseObject(manager.buildCommandReadDefinition().invoker().apply(Map.of( + "id", executionId, + "limit", 5 + ))); + Assertions.assertEquals("stdout", read.getString("stream")); + Assertions.assertEquals(0, read.getIntValue("offset")); + Assertions.assertEquals(5, read.getIntValue("nextOffset")); + Assertions.assertTrue(read.getBooleanValue("contentTruncated")); + Assertions.assertEquals("hello", read.getString("content")); + + JSONObject overview = JSONObject.parseObject(manager.buildCommandOverviewDefinition().invoker().apply(Map.of())); + JSONArray result = overview.getJSONArray("result"); + Assertions.assertTrue(result.stream().map(item -> (JSONObject) item) + .anyMatch(item -> executionId.equals(item.getString("executionId")))); + } + + @Test + void testCancelStopsBackgroundCommand() throws Exception { + BuiltinCommandActionManager manager = new BuiltinCommandActionManager(); + + String startResult = manager.buildCommandStartDefinition().invoker().apply(Map.of( + "desc", "sleep-session", + "arg", "sh", + "arg1", "-lc", + "arg2", "sleep 5" + )); + String executionId = JSONObject.parseObject(startResult).getString("executionId"); + + JSONObject cancel = JSONObject.parseObject(manager.buildCommandCancelDefinition().invoker().apply(Map.of( + "id", executionId + ))); + Assertions.assertEquals(executionId, cancel.getString("executionId")); + Assertions.assertTrue(cancel.getBooleanValue("ok")); + + JSONObject inspect = waitForInspectExit(manager, executionId); + Assertions.assertNotNull(inspect.get("endAt")); + } + + private JSONObject waitForInspectExit(BuiltinCommandActionManager manager, String executionId) throws Exception { + long deadline = System.currentTimeMillis() + 3000; + while (System.currentTimeMillis() < deadline) { + JSONObject inspect = JSONObject.parseObject(manager.buildCommandInspectDefinition().invoker().apply(Map.of( + "id", executionId + ))); + if (inspect.get("exitCode") != null) { + return inspect; + } + Thread.sleep(20); + } + throw new AssertionError("command session did not exit in time"); + } +}