fix(BuiltinCommand): spill session streams to log files

This commit is contained in:
2026-04-20 14:37:40 +08:00
parent 2ec2d8e096
commit e9eaaa24db
2 changed files with 176 additions and 24 deletions

View File

@@ -9,6 +9,12 @@ import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
import java.time.Instant; import java.time.Instant;
import java.time.Duration; import java.time.Duration;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function; import java.util.function.Function;
@@ -23,6 +29,9 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
private static final int SUMMARY_MAX_LINES = 5; private static final int SUMMARY_MAX_LINES = 5;
private static final int SUMMARY_MAX_LENGTH = 2048; private static final int SUMMARY_MAX_LENGTH = 2048;
private static final Duration COMMAND_SESSION_TTL = Duration.ofMinutes(10); private static final Duration COMMAND_SESSION_TTL = Duration.ofMinutes(10);
private static final int COMMAND_SESSION_TAIL_LIMIT = 64 * 1024;
private static final Duration OUTPUT_FLUSH_INTERVAL = Duration.ofMillis(100);
private static final Path COMMAND_SESSION_LOG_DIR = Path.of(System.getProperty("java.io.tmpdir"), "partner-command-sessions");
private final Set<String> basicTags = Set.of("Builtin MetaAction", "System Command Tool"); private final Set<String> basicTags = Set.of("Builtin MetaAction", "System Command Tool");
@@ -100,6 +109,8 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
List<String> commands = requireCommandArguments(params); List<String> commands = requireCommandArguments(params);
CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands)); CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands));
String executionId = UUID.randomUUID().toString(); String executionId = UUID.randomUUID().toString();
Path stdoutLogPath = createSessionLogPath(executionId, "stdout.log");
Path stderrLogPath = createSessionLogPath(executionId, "stderr.log");
CommandHandle handle = new CommandHandle( CommandHandle handle = new CommandHandle(
executionId, executionId,
desc, desc,
@@ -108,10 +119,15 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
session.getProcess(), session.getProcess(),
session.getStdoutBuffer(), session.getStdoutBuffer(),
session.getStderrBuffer(), session.getStderrBuffer(),
new StringBuilder(),
new StringBuilder(),
stdoutLogPath,
stderrLogPath,
null, null,
null null
); );
commandHandles.put(executionId, handle); commandHandles.put(executionId, handle);
startOutputFlusher(handle);
monitorProcess(handle); monitorProcess(handle);
return JSONObject.of("executionId", executionId).toJSONString(); return JSONObject.of("executionId", executionId).toJSONString();
}; };
@@ -150,12 +166,13 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
Function<Map<String, Object>, String> invoker = params -> { Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles(); cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
flushHandleBuffers(handle);
return JSONObject.of( return JSONObject.of(
"executionId", handle.executionId, "executionId", handle.executionId,
"desc", handle.desc, "desc", handle.desc,
"exitCode", handle.exitCode, "exitCode", handle.exitCode,
"stdoutSize", bufferLength(handle.stdoutBuffer), "stdoutSize", streamLength(handle.stdoutLogPath, handle.stdoutSourceBuffer),
"stderrSize", bufferLength(handle.stderrBuffer), "stderrSize", streamLength(handle.stderrLogPath, handle.stderrSourceBuffer),
"stdoutSummary", summarizeBuffer(handle.stdoutBuffer), "stdoutSummary", summarizeBuffer(handle.stdoutBuffer),
"stderrSummary", summarizeBuffer(handle.stderrBuffer), "stderrSummary", summarizeBuffer(handle.stderrBuffer),
"startAt", handle.startAt, "startAt", handle.startAt,
@@ -202,6 +219,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
Function<Map<String, Object>, String> invoker = params -> { Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles(); cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
flushHandleBuffers(handle);
String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout"); String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout");
if (!"stdout".equals(stream) && !"stderr".equals(stream)) { if (!"stdout".equals(stream) && !"stderr".equals(stream)) {
throw new IllegalArgumentException("参数 stream 只能为 stdout 或 stderr"); throw new IllegalArgumentException("参数 stream 只能为 stdout 或 stderr");
@@ -215,22 +233,18 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
throw new IllegalArgumentException("参数 limit 必须大于 0"); throw new IllegalArgumentException("参数 limit 必须大于 0");
} }
StringBuilder buffer = "stderr".equals(stream) ? handle.stderrBuffer : handle.stdoutBuffer; Path logPath = "stderr".equals(stream) ? handle.stderrLogPath : handle.stdoutLogPath;
String snapshot = bufferSnapshot(buffer); StreamChunk chunk = readChunk(logPath, offset, limit);
int safeOffset = Math.min(offset, snapshot.length()); boolean eof = !handle.isRunning() && chunk.nextOffset >= chunk.totalLength;
int nextOffset = Math.min(safeOffset + limit, snapshot.length());
String content = snapshot.substring(safeOffset, nextOffset);
boolean truncated = nextOffset < snapshot.length();
boolean eof = !handle.isRunning() && nextOffset >= snapshot.length();
return JSONObject.of( return JSONObject.of(
"executionId", handle.executionId, "executionId", handle.executionId,
"desc", handle.desc, "desc", handle.desc,
"stream", stream, "stream", stream,
"content", content, "content", chunk.content,
"contentTruncated", truncated, "contentTruncated", chunk.truncated,
"offset", safeOffset, "offset", chunk.offset,
"nextOffset", nextOffset, "nextOffset", chunk.nextOffset,
"eof", eof "eof", eof
).toJSONString(); ).toJSONString();
}; };
@@ -263,6 +277,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
Function<Map<String, Object>, String> invoker = params -> { Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles(); cleanupExpiredHandles();
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
flushHandleBuffers(handle);
if (handle.process.isAlive()) { if (handle.process.isAlive()) {
handle.process.destroy(); handle.process.destroy();
waitProcessExit(handle.process, 200); waitProcessExit(handle.process, 200);
@@ -315,6 +330,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
); );
Function<Map<String, Object>, String> invoker = params -> { Function<Map<String, Object>, String> invoker = params -> {
cleanupExpiredHandles(); cleanupExpiredHandles();
commandHandles.values().forEach(this::flushHandleBuffers);
List<JSONObject> items = commandHandles.values().stream() List<JSONObject> items = commandHandles.values().stream()
.sorted(Comparator.comparing(handle -> handle.startAt)) .sorted(Comparator.comparing(handle -> handle.startAt))
.map(handle -> JSONObject.of( .map(handle -> JSONObject.of(
@@ -340,14 +356,72 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} finally { } finally {
flushHandleBuffers(handle);
handle.exitAt = Instant.now(); handle.exitAt = Instant.now();
} }
}); });
} }
private void startOutputFlusher(CommandHandle handle) {
Thread.startVirtualThread(() -> {
try {
while (handle.process.isAlive()) {
flushHandleBuffers(handle);
Thread.sleep(OUTPUT_FLUSH_INTERVAL.toMillis());
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} finally {
flushHandleBuffers(handle);
}
});
}
private void flushHandleBuffers(CommandHandle handle) {
flushStreamBuffer(handle.stdoutSourceBuffer, handle.stdoutBuffer, handle.stdoutLogPath);
flushStreamBuffer(handle.stderrSourceBuffer, handle.stderrBuffer, handle.stderrLogPath);
}
private void flushStreamBuffer(StringBuilder sourceBuffer, StringBuilder tailBuffer, Path logPath) {
String chunk;
synchronized (sourceBuffer) {
if (sourceBuffer.isEmpty()) {
return;
}
chunk = sourceBuffer.toString();
sourceBuffer.setLength(0);
}
try {
Files.writeString(logPath, chunk, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
throw new IllegalStateException("写入命令输出日志失败: " + logPath, e);
}
synchronized (tailBuffer) {
tailBuffer.append(chunk);
trimTailBuffer(tailBuffer);
}
}
private void trimTailBuffer(StringBuilder buffer) {
if (buffer.length() <= COMMAND_SESSION_TAIL_LIMIT) {
return;
}
buffer.delete(0, buffer.length() - COMMAND_SESSION_TAIL_LIMIT);
}
private void cleanupExpiredHandles() { private void cleanupExpiredHandles() {
Instant now = Instant.now(); Instant now = Instant.now();
commandHandles.entrySet().removeIf(entry -> isExpired(entry.getValue(), now)); Iterator<Map.Entry<String, CommandHandle>> iterator = commandHandles.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, CommandHandle> entry = iterator.next();
CommandHandle handle = entry.getValue();
if (!isExpired(handle, now)) {
continue;
}
flushHandleBuffers(handle);
cleanupLogFiles(handle);
iterator.remove();
}
} }
private boolean isExpired(CommandHandle handle, Instant now) { private boolean isExpired(CommandHandle handle, Instant now) {
@@ -355,6 +429,56 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
return exitTime != null && !exitTime.plus(COMMAND_SESSION_TTL).isAfter(now); return exitTime != null && !exitTime.plus(COMMAND_SESSION_TTL).isAfter(now);
} }
private Path createSessionLogPath(String executionId, String fileName) {
try {
Files.createDirectories(COMMAND_SESSION_LOG_DIR);
Path file = COMMAND_SESSION_LOG_DIR.resolve(executionId + "-" + fileName);
Files.deleteIfExists(file);
Files.createFile(file);
return file;
} catch (IOException e) {
throw new IllegalStateException("创建命令会话日志文件失败", e);
}
}
private long streamLength(Path logPath, StringBuilder sourceBuffer) {
long fileLength;
try {
fileLength = Files.exists(logPath) ? Files.size(logPath) : 0L;
} catch (IOException e) {
throw new IllegalStateException("读取命令会话日志长度失败", e);
}
synchronized (sourceBuffer) {
return fileLength + sourceBuffer.length();
}
}
private StreamChunk readChunk(Path logPath, int offset, int limit) {
if (!Files.exists(logPath)) {
return new StreamChunk("", 0, 0, false, 0);
}
try (RandomAccessFile raf = new RandomAccessFile(logPath.toFile(), "r")) {
int totalLength = (int) raf.length();
int safeOffset = Math.min(offset, totalLength);
int nextOffset = Math.min(safeOffset + limit, totalLength);
byte[] bytes = new byte[nextOffset - safeOffset];
raf.seek(safeOffset);
raf.readFully(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);
return new StreamChunk(content, safeOffset, nextOffset, nextOffset < totalLength, totalLength);
} catch (IOException e) {
throw new IllegalStateException("读取命令会话日志失败", e);
}
}
private void cleanupLogFiles(CommandHandle handle) {
try {
Files.deleteIfExists(handle.stdoutLogPath);
Files.deleteIfExists(handle.stderrLogPath);
} catch (IOException ignored) {
}
}
private WrappedLaunchSpec wrapCommands(List<String> commands) { private WrappedLaunchSpec wrapCommands(List<String> commands) {
return ExecutionPolicyRegistry.INSTANCE.prepare(commands); return ExecutionPolicyRegistry.INSTANCE.prepare(commands);
} }
@@ -394,12 +518,6 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
} }
} }
private int bufferLength(StringBuilder buffer) {
synchronized (buffer) {
return buffer.length();
}
}
private String bufferSnapshot(StringBuilder buffer) { private String bufferSnapshot(StringBuilder buffer) {
synchronized (buffer) { synchronized (buffer) {
return buffer.toString(); return buffer.toString();
@@ -439,6 +557,9 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
} }
} }
private record StreamChunk(String content, int offset, int nextOffset, boolean truncated, int totalLength) {
}
@AllArgsConstructor @AllArgsConstructor
private static class CommandHandle { private static class CommandHandle {
private String executionId; private String executionId;
@@ -448,15 +569,18 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
private Process process; private Process process;
private StringBuilder stdoutSourceBuffer;
private StringBuilder stderrSourceBuffer;
/** /**
* stdout 输出内容 * stdout/stderr 摘要 tail
*/ */
private StringBuilder stdoutBuffer; private StringBuilder stdoutBuffer;
/**
* stderr 输出内容
*/
private StringBuilder stderrBuffer; private StringBuilder stderrBuffer;
private Path stdoutLogPath;
private Path stderrLogPath;
/** /**
* 退出码:进程未结束时可为 null * 退出码:进程未结束时可为 null
*/ */

View File

@@ -96,6 +96,34 @@ class BuiltinCommandActionProviderTtlTest {
))); )));
} }
@Test
void testReadCanAccessSpilledLogBeyondTailBuffer() throws Exception {
BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider();
List<BuiltinActionRegistry.BuiltinActionDefinition> definitions = provider.provideBuiltinActions();
BuiltinActionRegistry.BuiltinActionDefinition start = requireDefinition(definitions, "builtin::command::start");
BuiltinActionRegistry.BuiltinActionDefinition inspect = requireDefinition(definitions, "builtin::command::inspect");
BuiltinActionRegistry.BuiltinActionDefinition read = requireDefinition(definitions, "builtin::command::read");
String startResult = start.invoker().apply(Map.of(
"desc", "spill-session",
"arg", "python3",
"arg1", "-c",
"arg2", "print('A'*120000, end='')"
));
String executionId = JSONObject.parseObject(startResult).getString("executionId");
waitForInspectExit(inspect, executionId);
JSONObject readResult = JSONObject.parseObject(read.invoker().apply(Map.of(
"id", executionId,
"stream", "stdout",
"offset", 70000,
"limit", 20
)));
Assertions.assertEquals("AAAAAAAAAAAAAAAAAAAA", readResult.getString("content"));
Assertions.assertEquals(70020, readResult.getIntValue("nextOffset"));
}
private void expireHandle(BuiltinCommandActionProvider provider, String executionId) throws Exception { private void expireHandle(BuiltinCommandActionProvider provider, String executionId) throws Exception {
Field handlesField = BuiltinCommandActionProvider.class.getDeclaredField("commandHandles"); Field handlesField = BuiltinCommandActionProvider.class.getDeclaredField("commandHandles");
handlesField.setAccessible(true); handlesField.setAccessible(true);