feat(runner): implement builtin command session actions with start/inspect/read/cancel/overview

This commit is contained in:
2026-03-18 23:00:56 +08:00
parent 7d9ec976e3
commit 12368ded53
4 changed files with 478 additions and 17 deletions

View File

@@ -4,6 +4,7 @@ import lombok.Data;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -84,6 +85,46 @@ public class CommandExecutionService {
return result; return result;
} }
public CommandSession createSessionTask(List<String> commands) {
return createSessionTask(commands.toArray(new String[0]));
}
public CommandSession createSessionTask(String... commands) {
try {
Process process = new ProcessBuilder(commands)
.redirectErrorStream(false)
.start();
CommandSession session = new CommandSession();
StringBuilder stdoutBuffer = new StringBuilder();
StringBuilder stderrBuffer = new StringBuilder();
session.setProcess(process);
session.setStdoutBuffer(stdoutBuffer);
session.setStderrBuffer(stderrBuffer);
readerExecutor.execute(() -> readToBuffer(process.getInputStream(), stdoutBuffer));
readerExecutor.execute(() -> readToBuffer(process.getErrorStream(), stderrBuffer));
return session;
} catch (Exception e) {
throw new IllegalStateException("创建命令会话失败", e);
}
}
private void readToBuffer(java.io.InputStream inputStream, StringBuilder buffer) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
synchronized (buffer) {
if (!buffer.isEmpty()) {
buffer.append('\n');
}
buffer.append(line);
}
}
} catch (Exception ignored) {
}
}
@Data @Data
public static class Result { public static class Result {
private boolean ok; private boolean ok;
@@ -92,6 +133,9 @@ public class CommandExecutionService {
} }
@Data @Data
public static class CommandSessionResult { public static class CommandSession {
private Process process;
private StringBuilder stdoutBuffer;
private StringBuilder stderrBuffer;
} }
} }

View File

@@ -6,10 +6,7 @@ import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.runner.execution.CommandExecutionService; import work.slhaf.partner.core.action.runner.execution.CommandExecutionService;
import java.time.Instant; import java.time.Instant;
import java.util.HashSet; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function; import java.util.function.Function;
@@ -18,11 +15,15 @@ import static work.slhaf.partner.core.action.ActionCore.BUILTIN_LOCATION;
class BuiltinCommandActionManager { class BuiltinCommandActionManager {
private static final String COMMAND_LOCATION = BUILTIN_LOCATION + "::" + "command"; private static final String COMMAND_LOCATION = BUILTIN_LOCATION + "::" + "command";
private static final String COMMAND_ARG_PREFIX = "arg";
private static final int DEFAULT_READ_LIMIT = 4096;
private static final int SUMMARY_MAX_LINES = 5;
private static final int SUMMARY_MAX_LENGTH = 2048;
private final Set<String> basicTags = Set.of("Builtin MetaAction", "System Command Tool"); private final Set<String> basicTags = Set.of("Builtin MetaAction", "System Command Tool");
private ConcurrentHashMap<String, CommandHandle> commandHandles = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, CommandHandle> commandHandles = new ConcurrentHashMap<>();
private CommandExecutionService commandExecutionService = CommandExecutionService.INSTANCE; private final CommandExecutionService commandExecutionService = CommandExecutionService.INSTANCE;
/** /**
* 用于直接执行的 Builtin MetaAction * 用于直接执行的 Builtin MetaAction
@@ -35,7 +36,7 @@ class BuiltinCommandActionManager {
MetaActionInfo info = new MetaActionInfo( MetaActionInfo info = new MetaActionInfo(
false, false,
null, null,
Map.of("Command Arguments", "Command Arguments"), Map.of("arg / argN", "Command arguments. Use arg for first argument, arg1/arg2... for remaining arguments."),
"Execute any allowed system commands and get result instantly, the number of arguments is not limited.", "Execute any allowed system commands and get result instantly, the number of arguments is not limited.",
tags, tags,
Set.of(), Set.of(),
@@ -44,9 +45,7 @@ class BuiltinCommandActionManager {
JSONObject.of("result", "Command execution result.") JSONObject.of("result", "Command execution result.")
); );
Function<Map<String, Object>, String> invoker = params -> { Function<Map<String, Object>, String> invoker = params -> {
List<String> commands = params.keySet().stream() List<String> commands = requireCommandArguments(params);
.map(paramKey -> BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, paramKey))
.toList();
CommandExecutionService.Result result = commandExecutionService.exec(commands); CommandExecutionService.Result result = commandExecutionService.exec(commands);
return JSONObject.of("result", result.getTotal()).toJSONString(); return JSONObject.of("result", result.getTotal()).toJSONString();
}; };
@@ -63,7 +62,43 @@ class BuiltinCommandActionManager {
* @return 内建 MetaAction 定义数据,参数为命令列表及进程描述,返回值为进程句柄 id * @return 内建 MetaAction 定义数据,参数为命令列表及进程描述,返回值为进程句柄 id
*/ */
BuiltinActionRegistry.BuiltinActionDefinition buildCommandStartDefinition() { BuiltinActionRegistry.BuiltinActionDefinition buildCommandStartDefinition() {
return null; Set<String> tags = new HashSet<>(basicTags);
tags.add("Command Session");
MetaActionInfo info = new MetaActionInfo(
false,
null,
Map.of(
"desc", "Command session description.",
"arg / argN", "Command arguments. Use arg for first argument, arg1/arg2... for remaining arguments."
),
"Start a background command session and return execution id.",
tags,
Set.of(),
Set.of(createActionKey("inspect")),
false,
JSONObject.of("executionId", "Command execution session id.")
);
Function<Map<String, Object>, String> invoker = params -> {
String desc = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "desc");
List<String> commands = requireCommandArguments(params);
CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(commands);
String executionId = UUID.randomUUID().toString();
CommandHandle handle = new CommandHandle(
executionId,
desc,
commands,
Instant.now(),
session.getProcess(),
session.getStdoutBuffer(),
session.getStderrBuffer(),
null,
null
);
commandHandles.put(executionId, handle);
monitorProcess(handle);
return JSONObject.of("executionId", executionId).toJSONString();
};
return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("start"), info, invoker);
} }
/** /**
@@ -72,7 +107,35 @@ class BuiltinCommandActionManager {
* @return 内建 MetaAction 定义数据,参数为进程 id返回值为摘要内容(CommandInspectData) * @return 内建 MetaAction 定义数据,参数为进程 id返回值为摘要内容(CommandInspectData)
*/ */
BuiltinActionRegistry.BuiltinActionDefinition buildCommandInspectDefinition() { BuiltinActionRegistry.BuiltinActionDefinition buildCommandInspectDefinition() {
return null; Set<String> tags = new HashSet<>(basicTags);
tags.add("Command Session");
MetaActionInfo info = new MetaActionInfo(
false,
null,
Map.of("id", "Command session id."),
"Inspect a background command session.",
tags,
Set.of(createActionKey("overview")),
Set.of(),
false,
JSONObject.of("result", "CommandInspectData")
);
Function<Map<String, Object>, String> invoker = params -> {
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
CommandInspectData data = new CommandInspectData(
handle.executionId,
handle.desc,
handle.exitCode,
bufferLength(handle.stdoutBuffer),
bufferLength(handle.stderrBuffer),
summarizeBuffer(handle.stdoutBuffer),
summarizeBuffer(handle.stderrBuffer),
handle.startAt,
handle.exitAt
);
return toJson(data);
};
return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("inspect"), info, invoker);
} }
/** /**
@@ -81,7 +144,61 @@ class BuiltinCommandActionManager {
* @return 内建 MetaAction 定义数据,参数为进程 id 与读取流(stdout/stderr),返回值为读取内容(CommandReadData) * @return 内建 MetaAction 定义数据,参数为进程 id 与读取流(stdout/stderr),返回值为读取内容(CommandReadData)
*/ */
BuiltinActionRegistry.BuiltinActionDefinition buildCommandReadDefinition() { BuiltinActionRegistry.BuiltinActionDefinition buildCommandReadDefinition() {
return null; Set<String> tags = new HashSet<>(basicTags);
tags.add("Command Session");
tags.add("Command Read");
MetaActionInfo info = new MetaActionInfo(
false,
null,
Map.of(
"id", "Command execution session id.",
"stream", "Target stream, stdout or stderr. Default stdout.",
"offset", "Read start offset. Default 0.",
"limit", "Read max length. Default 4096."
),
"Read output from a background command session.",
tags,
Set.of(createActionKey("overview")),
Set.of(),
false,
JSONObject.of("result", "CommandReadData")
);
Function<Map<String, Object>, String> invoker = params -> {
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout");
if (!"stdout".equals(stream) && !"stderr".equals(stream)) {
throw new IllegalArgumentException("参数 stream 只能为 stdout 或 stderr");
}
int offset = BuiltinActionRegistry.BuiltinActionDefinition.optionalInt(params, "offset", 0);
int limit = BuiltinActionRegistry.BuiltinActionDefinition.optionalInt(params, "limit", DEFAULT_READ_LIMIT);
if (offset < 0) {
throw new IllegalArgumentException("参数 offset 必须大于等于 0");
}
if (limit <= 0) {
throw new IllegalArgumentException("参数 limit 必须大于 0");
}
StringBuilder buffer = "stderr".equals(stream) ? handle.stderrBuffer : handle.stdoutBuffer;
String snapshot = bufferSnapshot(buffer);
int safeOffset = Math.min(offset, snapshot.length());
int nextOffset = Math.min(safeOffset + limit, snapshot.length());
String content = snapshot.substring(safeOffset, nextOffset);
boolean truncated = nextOffset < snapshot.length();
boolean eof = !handle.isRunning() && nextOffset >= snapshot.length();
CommandReadData data = new CommandReadData(
handle.executionId,
handle.desc,
stream,
content,
truncated,
safeOffset,
nextOffset,
eof
);
return toJson(data);
};
return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("read"), info, invoker);
} }
/** /**
@@ -90,7 +207,45 @@ class BuiltinCommandActionManager {
* @return 内建 MetaAction 定义数据,参数为进程 id返回值为是否成功取消 * @return 内建 MetaAction 定义数据,参数为进程 id返回值为是否成功取消
*/ */
BuiltinActionRegistry.BuiltinActionDefinition buildCommandCancelDefinition() { BuiltinActionRegistry.BuiltinActionDefinition buildCommandCancelDefinition() {
return null; Set<String> tags = new HashSet<>(basicTags);
tags.add("Command Session");
tags.add("Command Cancel");
MetaActionInfo info = new MetaActionInfo(
false,
null,
Map.of("id", "Command session id."),
"Cancel a background command session.",
tags,
Set.of(),
Set.of(createActionKey("overview")),
false,
JSONObject.of("ok", "Whether the command has been cancelled.", "executionId", "Command execution session id.")
);
Function<Map<String, Object>, String> invoker = params -> {
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
if (handle.process.isAlive()) {
handle.process.destroy();
waitProcessExit(handle.process, 200);
if (handle.process.isAlive()) {
handle.process.destroyForcibly();
waitProcessExit(handle.process, 200);
}
}
if (!handle.process.isAlive()) {
try {
handle.exitCode = handle.process.exitValue();
} catch (IllegalThreadStateException ignored) {
}
if (handle.exitAt == null) {
handle.exitAt = Instant.now();
}
}
return JSONObject.of(
"ok", !handle.process.isAlive(),
"executionId", handle.executionId
).toJSONString();
};
return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("cancel"), info, invoker);
} }
/** /**
@@ -99,13 +254,164 @@ class BuiltinCommandActionManager {
* @return 内建 MetaAction 定义数据,无参数,返回值为后台进程集合(CommandOverviewItem) * @return 内建 MetaAction 定义数据,无参数,返回值为后台进程集合(CommandOverviewItem)
*/ */
BuiltinActionRegistry.BuiltinActionDefinition buildCommandOverviewDefinition() { BuiltinActionRegistry.BuiltinActionDefinition buildCommandOverviewDefinition() {
return null; Set<String> tags = new HashSet<>(basicTags);
tags.add("Command Session");
tags.add("Command Overview");
MetaActionInfo info = new MetaActionInfo(
false,
null,
Map.of(),
"List all background command sessions.",
tags,
Set.of(createActionKey("start")),
Set.of(createActionKey("inspect"), createActionKey("read"), createActionKey("cancel")),
false,
JSONObject.of("result", "CommandOverviewItem[]")
);
Function<Map<String, Object>, String> invoker = params -> {
List<JSONObject> items = commandHandles.values().stream()
.sorted(Comparator.comparing(handle -> handle.startAt))
.map(handle -> {
CommandOverviewItem item = new CommandOverviewItem(
handle.executionId,
handle.desc,
handle.exitCode
);
JSONObject json = new JSONObject();
json.put("executionId", item.executionId);
json.put("desc", item.desc);
json.put("exitCode", item.exitCode);
return json;
})
.toList();
return JSONObject.of("result", items).toJSONString();
};
return new BuiltinActionRegistry.BuiltinActionDefinition(createActionKey("overview"), info, invoker);
} }
private String createActionKey(String actionName) { private String createActionKey(String actionName) {
return COMMAND_LOCATION + "::" + actionName; return COMMAND_LOCATION + "::" + actionName;
} }
private void monitorProcess(CommandHandle handle) {
Thread.startVirtualThread(() -> {
try {
handle.exitCode = handle.process.waitFor();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} finally {
handle.exitAt = Instant.now();
}
});
}
private CommandHandle requireHandle(String id) {
CommandHandle handle = commandHandles.get(id);
if (handle == null) {
throw new IllegalArgumentException("未找到对应命令会话: " + id);
}
return handle;
}
private List<String> requireCommandArguments(Map<String, Object> params) {
List<Map.Entry<String, Object>> entries = params.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(COMMAND_ARG_PREFIX))
.sorted(Comparator.comparingInt(entry -> commandArgIndex(entry.getKey())))
.toList();
if (entries.isEmpty()) {
throw new IllegalArgumentException("缺少命令参数");
}
List<String> commands = new ArrayList<>();
for (Map.Entry<String, Object> entry : entries) {
commands.add(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, entry.getKey()));
}
return commands;
}
private int commandArgIndex(String key) {
String suffix = key.substring(COMMAND_ARG_PREFIX.length()).trim();
if (suffix.isEmpty()) {
return 0;
}
try {
return Integer.parseInt(suffix);
} catch (NumberFormatException ignored) {
return Integer.MAX_VALUE;
}
}
private int bufferLength(StringBuilder buffer) {
synchronized (buffer) {
return buffer.length();
}
}
private String bufferSnapshot(StringBuilder buffer) {
synchronized (buffer) {
return buffer.toString();
}
}
private String summarizeBuffer(StringBuilder buffer) {
String snapshot = bufferSnapshot(buffer);
if (snapshot.isBlank()) {
return "";
}
List<String> lines = snapshot.lines().toList();
if (lines.size() <= SUMMARY_MAX_LINES * 2) {
return trimSummary(snapshot);
}
List<String> head = lines.subList(0, SUMMARY_MAX_LINES);
List<String> tail = lines.subList(Math.max(lines.size() - SUMMARY_MAX_LINES, SUMMARY_MAX_LINES), lines.size());
String summary = String.join("\n", head)
+ "\n...\n"
+ String.join("\n", tail);
return trimSummary(summary);
}
private String trimSummary(String content) {
if (content.length() <= SUMMARY_MAX_LENGTH) {
return content;
}
return content.substring(0, SUMMARY_MAX_LENGTH);
}
private void waitProcessExit(Process process, long millis) {
try {
process.waitFor(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
private String toJson(CommandInspectData data) {
JSONObject result = new JSONObject();
result.put("executionId", data.executionId);
result.put("desc", data.desc);
result.put("exitCode", data.exitCode);
result.put("stdoutSize", data.stdoutSize);
result.put("stderrSize", data.stderrSize);
result.put("stdoutSummary", data.stdoutSummary);
result.put("stderrSummary", data.stderrSummary);
result.put("startAt", data.startAt);
result.put("endAt", data.endAt);
return result.toJSONString();
}
private String toJson(CommandReadData data) {
JSONObject result = new JSONObject();
result.put("executionId", data.executionId);
result.put("desc", data.desc);
result.put("stream", data.stream);
result.put("content", data.content);
result.put("contentTruncated", data.contentTruncated);
result.put("offset", data.offset);
result.put("nextOffset", data.nextOffset);
result.put("eof", data.eof);
return result.toJSONString();
}
@AllArgsConstructor @AllArgsConstructor
private static class CommandHandle { private static class CommandHandle {
private String executionId; private String executionId;

View File

@@ -90,4 +90,31 @@ class CommandExecutionServiceTest {
Assertions.assertEquals(List.of("out"), result.getResultList()); Assertions.assertEquals(List.of("out"), result.getResultList());
Assertions.assertEquals("out", result.getTotal()); Assertions.assertEquals("out", result.getTotal());
} }
@Test
void testCreateSessionTaskCollectsStdoutAndStderr() throws Exception {
CommandExecutionService.CommandSession session = service.createSessionTask(
"sh", "-lc", "printf 'hello\\nworld\\n'; printf 'oops\\n' >&2"
);
session.getProcess().waitFor();
waitForBufferContains(session.getStdoutBuffer(), "world");
waitForBufferContains(session.getStderrBuffer(), "oops");
Assertions.assertEquals("hello\nworld", session.getStdoutBuffer().toString());
Assertions.assertEquals("oops", session.getStderrBuffer().toString());
}
private void waitForBufferContains(StringBuilder buffer, String expected) throws InterruptedException {
long deadline = System.currentTimeMillis() + 2000;
while (System.currentTimeMillis() < deadline) {
synchronized (buffer) {
if (buffer.toString().contains(expected)) {
return;
}
}
Thread.sleep(20);
}
Assertions.fail("buffer did not contain expected text: " + expected);
}
} }

View File

@@ -0,0 +1,84 @@
package work.slhaf.partner.module.modules.action.builtin;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Map;
class BuiltinCommandActionManagerTest {
@Test
void testStartInspectReadAndOverview() throws Exception {
BuiltinCommandActionManager manager = new BuiltinCommandActionManager();
String startResult = manager.buildCommandStartDefinition().invoker().apply(Map.of(
"desc", "demo-session",
"arg", "sh",
"arg1", "-lc",
"arg2", "printf 'hello\\nworld\\n'; printf 'oops\\n' >&2"
));
String executionId = JSONObject.parseObject(startResult).getString("executionId");
Assertions.assertNotNull(executionId);
JSONObject inspect = waitForInspectExit(manager, executionId);
Assertions.assertEquals("demo-session", inspect.getString("desc"));
Assertions.assertEquals(0, inspect.getInteger("exitCode"));
Assertions.assertTrue(inspect.getInteger("stdoutSize") > 0);
Assertions.assertTrue(inspect.getInteger("stderrSize") > 0);
Assertions.assertTrue(inspect.getString("stdoutSummary").contains("hello"));
Assertions.assertTrue(inspect.getString("stderrSummary").contains("oops"));
JSONObject read = JSONObject.parseObject(manager.buildCommandReadDefinition().invoker().apply(Map.of(
"id", executionId,
"limit", 5
)));
Assertions.assertEquals("stdout", read.getString("stream"));
Assertions.assertEquals(0, read.getIntValue("offset"));
Assertions.assertEquals(5, read.getIntValue("nextOffset"));
Assertions.assertTrue(read.getBooleanValue("contentTruncated"));
Assertions.assertEquals("hello", read.getString("content"));
JSONObject overview = JSONObject.parseObject(manager.buildCommandOverviewDefinition().invoker().apply(Map.of()));
JSONArray result = overview.getJSONArray("result");
Assertions.assertTrue(result.stream().map(item -> (JSONObject) item)
.anyMatch(item -> executionId.equals(item.getString("executionId"))));
}
@Test
void testCancelStopsBackgroundCommand() throws Exception {
BuiltinCommandActionManager manager = new BuiltinCommandActionManager();
String startResult = manager.buildCommandStartDefinition().invoker().apply(Map.of(
"desc", "sleep-session",
"arg", "sh",
"arg1", "-lc",
"arg2", "sleep 5"
));
String executionId = JSONObject.parseObject(startResult).getString("executionId");
JSONObject cancel = JSONObject.parseObject(manager.buildCommandCancelDefinition().invoker().apply(Map.of(
"id", executionId
)));
Assertions.assertEquals(executionId, cancel.getString("executionId"));
Assertions.assertTrue(cancel.getBooleanValue("ok"));
JSONObject inspect = waitForInspectExit(manager, executionId);
Assertions.assertNotNull(inspect.get("endAt"));
}
private JSONObject waitForInspectExit(BuiltinCommandActionManager manager, String executionId) throws Exception {
long deadline = System.currentTimeMillis() + 3000;
while (System.currentTimeMillis() < deadline) {
JSONObject inspect = JSONObject.parseObject(manager.buildCommandInspectDefinition().invoker().apply(Map.of(
"id", executionId
)));
if (inspect.get("exitCode") != null) {
return inspect;
}
Thread.sleep(20);
}
throw new AssertionError("command session did not exit in time");
}
}