Merge branch 'codex-review-localrunnerclient-v2'

This commit is contained in:
2026-04-20 14:48:51 +08:00
4 changed files with 209 additions and 29 deletions

View File

@@ -8,6 +8,7 @@ import java.io.InputStreamReader;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,8 +38,8 @@ public class CommandExecutionService {
public Result exec(WrappedLaunchSpec launchSpec) {
Result result = new Result();
List<String> output = new ArrayList<>();
List<String> error = new ArrayList<>();
List<String> output = Collections.synchronizedList(new ArrayList<>());
List<String> error = Collections.synchronizedList(new ArrayList<>());
try {
Process process = startProcess(launchSpec);
@@ -68,11 +69,18 @@ public class CommandExecutionService {
stderrThread.join();
result.setOk(exitCode == 0);
result.setResultList(output.isEmpty() ? error : output);
result.setTotal(String.join("\n", output.isEmpty() ? error : output));
List<String> stdoutLines = List.copyOf(output);
List<String> stderrLines = List.copyOf(error);
result.setStdoutLines(stdoutLines);
result.setStderrLines(stderrLines);
result.setResultList(stdoutLines.isEmpty() ? stderrLines : stdoutLines);
result.setTotal(buildDisplayText(stdoutLines, stderrLines));
} catch (Exception e) {
result.setOk(false);
result.setTotal(e.getMessage());
result.setStdoutLines(List.of());
result.setStderrLines(List.of(e.getMessage()));
result.setResultList(result.getStderrLines());
}
return result;
@@ -149,11 +157,23 @@ public class CommandExecutionService {
);
}
private String buildDisplayText(List<String> stdoutLines, List<String> stderrLines) {
if (stdoutLines.isEmpty()) {
return String.join("\n", stderrLines);
}
if (stderrLines.isEmpty()) {
return String.join("\n", stdoutLines);
}
return String.join("\n", stdoutLines) + "\n" + String.join("\n", stderrLines);
}
@Data
public static class Result {
private boolean ok;
private String total;
private List<String> resultList;
private List<String> stdoutLines;
private List<String> stderrLines;
}
@Data

View File

@@ -9,6 +9,12 @@ import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
import java.time.Instant;
import java.time.Duration;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@@ -23,6 +29,9 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
private static final int SUMMARY_MAX_LINES = 5;
private static final int SUMMARY_MAX_LENGTH = 2048;
private static final Duration COMMAND_SESSION_TTL = Duration.ofMinutes(10);
private static final int COMMAND_SESSION_TAIL_LIMIT = 64 * 1024;
private static final Duration OUTPUT_FLUSH_INTERVAL = Duration.ofMillis(100);
private static final Path COMMAND_SESSION_LOG_DIR = Path.of(System.getProperty("java.io.tmpdir"), "partner-command-sessions");
private final Set<String> basicTags = Set.of("Builtin MetaAction", "System Command Tool");
@@ -100,6 +109,8 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
List<String> commands = requireCommandArguments(params);
CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands));
String executionId = UUID.randomUUID().toString();
Path stdoutLogPath = createSessionLogPath(executionId, "stdout.log");
Path stderrLogPath = createSessionLogPath(executionId, "stderr.log");
CommandHandle handle = new CommandHandle(
executionId,
desc,
@@ -108,10 +119,15 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
session.getProcess(),
session.getStdoutBuffer(),
session.getStderrBuffer(),
new StringBuilder(),
new StringBuilder(),
stdoutLogPath,
stderrLogPath,
null,
null
);
commandHandles.put(executionId, handle);
startOutputFlusher(handle);
monitorProcess(handle);
return JSONObject.of("executionId", executionId).toJSONString();
};
@@ -150,12 +166,13 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
flushHandleBuffers(handle);
return JSONObject.of(
"executionId", handle.executionId,
"desc", handle.desc,
"exitCode", handle.exitCode,
"stdoutSize", bufferLength(handle.stdoutBuffer),
"stderrSize", bufferLength(handle.stderrBuffer),
"stdoutSize", streamLength(handle.stdoutLogPath, handle.stdoutSourceBuffer),
"stderrSize", streamLength(handle.stderrLogPath, handle.stderrSourceBuffer),
"stdoutSummary", summarizeBuffer(handle.stdoutBuffer),
"stderrSummary", summarizeBuffer(handle.stderrBuffer),
"startAt", handle.startAt,
@@ -202,6 +219,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
flushHandleBuffers(handle);
String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout");
if (!"stdout".equals(stream) && !"stderr".equals(stream)) {
throw new IllegalArgumentException("参数 stream 只能为 stdout 或 stderr");
@@ -215,22 +233,18 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
throw new IllegalArgumentException("参数 limit 必须大于 0");
}
StringBuilder buffer = "stderr".equals(stream) ? handle.stderrBuffer : handle.stdoutBuffer;
String snapshot = bufferSnapshot(buffer);
int safeOffset = Math.min(offset, snapshot.length());
int nextOffset = Math.min(safeOffset + limit, snapshot.length());
String content = snapshot.substring(safeOffset, nextOffset);
boolean truncated = nextOffset < snapshot.length();
boolean eof = !handle.isRunning() && nextOffset >= snapshot.length();
Path logPath = "stderr".equals(stream) ? handle.stderrLogPath : handle.stdoutLogPath;
StreamChunk chunk = readChunk(logPath, offset, limit);
boolean eof = !handle.isRunning() && chunk.nextOffset >= chunk.totalLength;
return JSONObject.of(
"executionId", handle.executionId,
"desc", handle.desc,
"stream", stream,
"content", content,
"contentTruncated", truncated,
"offset", safeOffset,
"nextOffset", nextOffset,
"content", chunk.content,
"contentTruncated", chunk.truncated,
"offset", chunk.offset,
"nextOffset", chunk.nextOffset,
"eof", eof
).toJSONString();
};
@@ -263,6 +277,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
flushHandleBuffers(handle);
if (handle.process.isAlive()) {
handle.process.destroy();
waitProcessExit(handle.process, 200);
@@ -315,6 +330,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
);
Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles();
commandHandles.values().forEach(this::flushHandleBuffers);
List<JSONObject> items = commandHandles.values().stream()
.sorted(Comparator.comparing(handle -> handle.startAt))
.map(handle -> JSONObject.of(
@@ -340,14 +356,72 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} finally {
flushHandleBuffers(handle);
handle.exitAt = Instant.now();
}
});
}
private void startOutputFlusher(CommandHandle handle) {
Thread.startVirtualThread(() -> {
try {
while (handle.process.isAlive()) {
flushHandleBuffers(handle);
Thread.sleep(OUTPUT_FLUSH_INTERVAL.toMillis());
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} finally {
flushHandleBuffers(handle);
}
});
}
private void flushHandleBuffers(CommandHandle handle) {
flushStreamBuffer(handle.stdoutSourceBuffer, handle.stdoutBuffer, handle.stdoutLogPath);
flushStreamBuffer(handle.stderrSourceBuffer, handle.stderrBuffer, handle.stderrLogPath);
}
private void flushStreamBuffer(StringBuilder sourceBuffer, StringBuilder tailBuffer, Path logPath) {
String chunk;
synchronized (sourceBuffer) {
if (sourceBuffer.isEmpty()) {
return;
}
chunk = sourceBuffer.toString();
sourceBuffer.setLength(0);
}
try {
Files.writeString(logPath, chunk, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
throw new IllegalStateException("写入命令输出日志失败: " + logPath, e);
}
synchronized (tailBuffer) {
tailBuffer.append(chunk);
trimTailBuffer(tailBuffer);
}
}
private void trimTailBuffer(StringBuilder buffer) {
if (buffer.length() <= COMMAND_SESSION_TAIL_LIMIT) {
return;
}
buffer.delete(0, buffer.length() - COMMAND_SESSION_TAIL_LIMIT);
}
private void cleanupExpiredHandles() {
Instant now = Instant.now();
commandHandles.entrySet().removeIf(entry -> isExpired(entry.getValue(), now));
Iterator<Map.Entry<String, CommandHandle>> iterator = commandHandles.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, CommandHandle> entry = iterator.next();
CommandHandle handle = entry.getValue();
if (!isExpired(handle, now)) {
continue;
}
flushHandleBuffers(handle);
cleanupLogFiles(handle);
iterator.remove();
}
}
private boolean isExpired(CommandHandle handle, Instant now) {
@@ -355,6 +429,56 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
return exitTime != null && !exitTime.plus(COMMAND_SESSION_TTL).isAfter(now);
}
private Path createSessionLogPath(String executionId, String fileName) {
try {
Files.createDirectories(COMMAND_SESSION_LOG_DIR);
Path file = COMMAND_SESSION_LOG_DIR.resolve(executionId + "-" + fileName);
Files.deleteIfExists(file);
Files.createFile(file);
return file;
} catch (IOException e) {
throw new IllegalStateException("创建命令会话日志文件失败", e);
}
}
private long streamLength(Path logPath, StringBuilder sourceBuffer) {
long fileLength;
try {
fileLength = Files.exists(logPath) ? Files.size(logPath) : 0L;
} catch (IOException e) {
throw new IllegalStateException("读取命令会话日志长度失败", e);
}
synchronized (sourceBuffer) {
return fileLength + sourceBuffer.length();
}
}
private StreamChunk readChunk(Path logPath, int offset, int limit) {
if (!Files.exists(logPath)) {
return new StreamChunk("", 0, 0, false, 0);
}
try (RandomAccessFile raf = new RandomAccessFile(logPath.toFile(), "r")) {
int totalLength = (int) raf.length();
int safeOffset = Math.min(offset, totalLength);
int nextOffset = Math.min(safeOffset + limit, totalLength);
byte[] bytes = new byte[nextOffset - safeOffset];
raf.seek(safeOffset);
raf.readFully(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);
return new StreamChunk(content, safeOffset, nextOffset, nextOffset < totalLength, totalLength);
} catch (IOException e) {
throw new IllegalStateException("读取命令会话日志失败", e);
}
}
private void cleanupLogFiles(CommandHandle handle) {
try {
Files.deleteIfExists(handle.stdoutLogPath);
Files.deleteIfExists(handle.stderrLogPath);
} catch (IOException ignored) {
}
}
private WrappedLaunchSpec wrapCommands(List<String> commands) {
return ExecutionPolicyRegistry.INSTANCE.prepare(commands);
}
@@ -394,12 +518,6 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
}
}
private int bufferLength(StringBuilder buffer) {
synchronized (buffer) {
return buffer.length();
}
}
private String bufferSnapshot(StringBuilder buffer) {
synchronized (buffer) {
return buffer.toString();
@@ -439,6 +557,9 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
}
}
private record StreamChunk(String content, int offset, int nextOffset, boolean truncated, int totalLength) {
}
@AllArgsConstructor
private static class CommandHandle {
private String executionId;
@@ -448,15 +569,18 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
private Process process;
private StringBuilder stdoutSourceBuffer;
private StringBuilder stderrSourceBuffer;
/**
* stdout 输出内容
* stdout/stderr 摘要 tail
*/
private StringBuilder stdoutBuffer;
/**
* stderr 输出内容
*/
private StringBuilder stderrBuffer;
private Path stdoutLogPath;
private Path stderrLogPath;
/**
* 退出码:进程未结束时可为 null
*/

View File

@@ -56,6 +56,8 @@ class CommandExecutionServiceTest {
Assertions.assertTrue(result.isOk());
Assertions.assertEquals(List.of("hello", "world"), result.getResultList());
Assertions.assertEquals(List.of("hello", "world"), result.getStdoutLines());
Assertions.assertEquals(List.of(), result.getStderrLines());
Assertions.assertEquals("hello\nworld", result.getTotal());
}
@@ -67,6 +69,8 @@ class CommandExecutionServiceTest {
Assertions.assertTrue(result.isOk());
Assertions.assertEquals(List.of("ok"), result.getResultList());
Assertions.assertEquals(List.of("ok"), result.getStdoutLines());
Assertions.assertEquals(List.of(), result.getStderrLines());
Assertions.assertEquals("ok", result.getTotal());
}
@@ -78,6 +82,8 @@ class CommandExecutionServiceTest {
Assertions.assertFalse(result.isOk());
Assertions.assertEquals(List.of("fail"), result.getResultList());
Assertions.assertEquals(List.of(), result.getStdoutLines());
Assertions.assertEquals(List.of("fail"), result.getStderrLines());
Assertions.assertEquals("fail", result.getTotal());
}
@@ -89,7 +95,9 @@ class CommandExecutionServiceTest {
Assertions.assertTrue(result.isOk());
Assertions.assertEquals(List.of("out"), result.getResultList());
Assertions.assertEquals("out", result.getTotal());
Assertions.assertEquals(List.of("out"), result.getStdoutLines());
Assertions.assertEquals(List.of("err"), result.getStderrLines());
Assertions.assertEquals("out\nerr", result.getTotal());
}
@Test

View File

@@ -96,6 +96,34 @@ class BuiltinCommandActionProviderTtlTest {
)));
}
@Test
void testReadCanAccessSpilledLogBeyondTailBuffer() throws Exception {
BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider();
List<BuiltinActionRegistry.BuiltinActionDefinition> definitions = provider.provideBuiltinActions();
BuiltinActionRegistry.BuiltinActionDefinition start = requireDefinition(definitions, "builtin::command::start");
BuiltinActionRegistry.BuiltinActionDefinition inspect = requireDefinition(definitions, "builtin::command::inspect");
BuiltinActionRegistry.BuiltinActionDefinition read = requireDefinition(definitions, "builtin::command::read");
String startResult = start.invoker().apply(Map.of(
"desc", "spill-session",
"arg", "python3",
"arg1", "-c",
"arg2", "print('A'*120000, end='')"
));
String executionId = JSONObject.parseObject(startResult).getString("executionId");
waitForInspectExit(inspect, executionId);
JSONObject readResult = JSONObject.parseObject(read.invoker().apply(Map.of(
"id", executionId,
"stream", "stdout",
"offset", 70000,
"limit", 20
)));
Assertions.assertEquals("AAAAAAAAAAAAAAAAAAAA", readResult.getString("content"));
Assertions.assertEquals(70020, readResult.getIntValue("nextOffset"));
}
private void expireHandle(BuiltinCommandActionProvider provider, String executionId) throws Exception {
Field handlesField = BuiltinCommandActionProvider.class.getDeclaredField("commandHandles");
handlesField.setAccessible(true);