diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java index 8385a8b9..0193e009 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProvider.java @@ -9,6 +9,12 @@ import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec; import java.time.Instant; import java.time.Duration; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -23,6 +29,9 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { private static final int SUMMARY_MAX_LINES = 5; private static final int SUMMARY_MAX_LENGTH = 2048; private static final Duration COMMAND_SESSION_TTL = Duration.ofMinutes(10); + private static final int COMMAND_SESSION_TAIL_LIMIT = 64 * 1024; + private static final Duration OUTPUT_FLUSH_INTERVAL = Duration.ofMillis(100); + private static final Path COMMAND_SESSION_LOG_DIR = Path.of(System.getProperty("java.io.tmpdir"), "partner-command-sessions"); private final Set basicTags = Set.of("Builtin MetaAction", "System Command Tool"); @@ -100,6 +109,8 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { List commands = requireCommandArguments(params); CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands)); String executionId = UUID.randomUUID().toString(); + Path stdoutLogPath = createSessionLogPath(executionId, "stdout.log"); + Path stderrLogPath = createSessionLogPath(executionId, "stderr.log"); CommandHandle handle = new CommandHandle( executionId, desc, @@ -108,10 +119,15 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { session.getProcess(), session.getStdoutBuffer(), session.getStderrBuffer(), + new StringBuilder(), + new StringBuilder(), + stdoutLogPath, + stderrLogPath, null, null ); commandHandles.put(executionId, handle); + startOutputFlusher(handle); monitorProcess(handle); return JSONObject.of("executionId", executionId).toJSONString(); }; @@ -150,12 +166,13 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { Function, String> invoker = params -> { cleanupExpiredHandles(); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); + flushHandleBuffers(handle); return JSONObject.of( "executionId", handle.executionId, "desc", handle.desc, "exitCode", handle.exitCode, - "stdoutSize", bufferLength(handle.stdoutBuffer), - "stderrSize", bufferLength(handle.stderrBuffer), + "stdoutSize", streamLength(handle.stdoutLogPath, handle.stdoutSourceBuffer), + "stderrSize", streamLength(handle.stderrLogPath, handle.stderrSourceBuffer), "stdoutSummary", summarizeBuffer(handle.stdoutBuffer), "stderrSummary", summarizeBuffer(handle.stderrBuffer), "startAt", handle.startAt, @@ -202,6 +219,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { Function, String> invoker = params -> { cleanupExpiredHandles(); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); + flushHandleBuffers(handle); String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout"); if (!"stdout".equals(stream) && !"stderr".equals(stream)) { throw new IllegalArgumentException("参数 stream 只能为 stdout 或 stderr"); @@ -215,22 +233,18 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { throw new IllegalArgumentException("参数 limit 必须大于 0"); } - StringBuilder buffer = "stderr".equals(stream) ? handle.stderrBuffer : handle.stdoutBuffer; - String snapshot = bufferSnapshot(buffer); - int safeOffset = Math.min(offset, snapshot.length()); - int nextOffset = Math.min(safeOffset + limit, snapshot.length()); - String content = snapshot.substring(safeOffset, nextOffset); - boolean truncated = nextOffset < snapshot.length(); - boolean eof = !handle.isRunning() && nextOffset >= snapshot.length(); + Path logPath = "stderr".equals(stream) ? handle.stderrLogPath : handle.stdoutLogPath; + StreamChunk chunk = readChunk(logPath, offset, limit); + boolean eof = !handle.isRunning() && chunk.nextOffset >= chunk.totalLength; return JSONObject.of( "executionId", handle.executionId, "desc", handle.desc, "stream", stream, - "content", content, - "contentTruncated", truncated, - "offset", safeOffset, - "nextOffset", nextOffset, + "content", chunk.content, + "contentTruncated", chunk.truncated, + "offset", chunk.offset, + "nextOffset", chunk.nextOffset, "eof", eof ).toJSONString(); }; @@ -263,6 +277,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { Function, String> invoker = params -> { cleanupExpiredHandles(); CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id")); + flushHandleBuffers(handle); if (handle.process.isAlive()) { handle.process.destroy(); waitProcessExit(handle.process, 200); @@ -315,6 +330,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { ); Function, String> invoker = params -> { cleanupExpiredHandles(); + commandHandles.values().forEach(this::flushHandleBuffers); List items = commandHandles.values().stream() .sorted(Comparator.comparing(handle -> handle.startAt)) .map(handle -> JSONObject.of( @@ -340,14 +356,72 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } finally { + flushHandleBuffers(handle); handle.exitAt = Instant.now(); } }); } + private void startOutputFlusher(CommandHandle handle) { + Thread.startVirtualThread(() -> { + try { + while (handle.process.isAlive()) { + flushHandleBuffers(handle); + Thread.sleep(OUTPUT_FLUSH_INTERVAL.toMillis()); + } + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } finally { + flushHandleBuffers(handle); + } + }); + } + + private void flushHandleBuffers(CommandHandle handle) { + flushStreamBuffer(handle.stdoutSourceBuffer, handle.stdoutBuffer, handle.stdoutLogPath); + flushStreamBuffer(handle.stderrSourceBuffer, handle.stderrBuffer, handle.stderrLogPath); + } + + private void flushStreamBuffer(StringBuilder sourceBuffer, StringBuilder tailBuffer, Path logPath) { + String chunk; + synchronized (sourceBuffer) { + if (sourceBuffer.isEmpty()) { + return; + } + chunk = sourceBuffer.toString(); + sourceBuffer.setLength(0); + } + try { + Files.writeString(logPath, chunk, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.APPEND); + } catch (IOException e) { + throw new IllegalStateException("写入命令输出日志失败: " + logPath, e); + } + synchronized (tailBuffer) { + tailBuffer.append(chunk); + trimTailBuffer(tailBuffer); + } + } + + private void trimTailBuffer(StringBuilder buffer) { + if (buffer.length() <= COMMAND_SESSION_TAIL_LIMIT) { + return; + } + buffer.delete(0, buffer.length() - COMMAND_SESSION_TAIL_LIMIT); + } + private void cleanupExpiredHandles() { Instant now = Instant.now(); - commandHandles.entrySet().removeIf(entry -> isExpired(entry.getValue(), now)); + Iterator> iterator = commandHandles.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + CommandHandle handle = entry.getValue(); + if (!isExpired(handle, now)) { + continue; + } + flushHandleBuffers(handle); + cleanupLogFiles(handle); + iterator.remove(); + } } private boolean isExpired(CommandHandle handle, Instant now) { @@ -355,6 +429,56 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { return exitTime != null && !exitTime.plus(COMMAND_SESSION_TTL).isAfter(now); } + private Path createSessionLogPath(String executionId, String fileName) { + try { + Files.createDirectories(COMMAND_SESSION_LOG_DIR); + Path file = COMMAND_SESSION_LOG_DIR.resolve(executionId + "-" + fileName); + Files.deleteIfExists(file); + Files.createFile(file); + return file; + } catch (IOException e) { + throw new IllegalStateException("创建命令会话日志文件失败", e); + } + } + + private long streamLength(Path logPath, StringBuilder sourceBuffer) { + long fileLength; + try { + fileLength = Files.exists(logPath) ? Files.size(logPath) : 0L; + } catch (IOException e) { + throw new IllegalStateException("读取命令会话日志长度失败", e); + } + synchronized (sourceBuffer) { + return fileLength + sourceBuffer.length(); + } + } + + private StreamChunk readChunk(Path logPath, int offset, int limit) { + if (!Files.exists(logPath)) { + return new StreamChunk("", 0, 0, false, 0); + } + try (RandomAccessFile raf = new RandomAccessFile(logPath.toFile(), "r")) { + int totalLength = (int) raf.length(); + int safeOffset = Math.min(offset, totalLength); + int nextOffset = Math.min(safeOffset + limit, totalLength); + byte[] bytes = new byte[nextOffset - safeOffset]; + raf.seek(safeOffset); + raf.readFully(bytes); + String content = new String(bytes, StandardCharsets.UTF_8); + return new StreamChunk(content, safeOffset, nextOffset, nextOffset < totalLength, totalLength); + } catch (IOException e) { + throw new IllegalStateException("读取命令会话日志失败", e); + } + } + + private void cleanupLogFiles(CommandHandle handle) { + try { + Files.deleteIfExists(handle.stdoutLogPath); + Files.deleteIfExists(handle.stderrLogPath); + } catch (IOException ignored) { + } + } + private WrappedLaunchSpec wrapCommands(List commands) { return ExecutionPolicyRegistry.INSTANCE.prepare(commands); } @@ -394,12 +518,6 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { } } - private int bufferLength(StringBuilder buffer) { - synchronized (buffer) { - return buffer.length(); - } - } - private String bufferSnapshot(StringBuilder buffer) { synchronized (buffer) { return buffer.toString(); @@ -439,6 +557,9 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { } } + private record StreamChunk(String content, int offset, int nextOffset, boolean truncated, int totalLength) { + } + @AllArgsConstructor private static class CommandHandle { private String executionId; @@ -448,15 +569,18 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider { private Process process; + private StringBuilder stdoutSourceBuffer; + private StringBuilder stderrSourceBuffer; + /** - * stdout 输出内容 + * stdout/stderr 摘要 tail */ private StringBuilder stdoutBuffer; - /** - * stderr 输出内容 - */ private StringBuilder stderrBuffer; + private Path stdoutLogPath; + private Path stderrLogPath; + /** * 退出码:进程未结束时可为 null */ diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderTtlTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderTtlTest.java index 266df02a..3aab39d1 100644 --- a/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderTtlTest.java +++ b/Partner-Core/src/test/java/work/slhaf/partner/module/action/builtin/BuiltinCommandActionProviderTtlTest.java @@ -96,6 +96,34 @@ class BuiltinCommandActionProviderTtlTest { ))); } + @Test + void testReadCanAccessSpilledLogBeyondTailBuffer() throws Exception { + BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider(); + List definitions = provider.provideBuiltinActions(); + BuiltinActionRegistry.BuiltinActionDefinition start = requireDefinition(definitions, "builtin::command::start"); + BuiltinActionRegistry.BuiltinActionDefinition inspect = requireDefinition(definitions, "builtin::command::inspect"); + BuiltinActionRegistry.BuiltinActionDefinition read = requireDefinition(definitions, "builtin::command::read"); + + String startResult = start.invoker().apply(Map.of( + "desc", "spill-session", + "arg", "python3", + "arg1", "-c", + "arg2", "print('A'*120000, end='')" + )); + String executionId = JSONObject.parseObject(startResult).getString("executionId"); + waitForInspectExit(inspect, executionId); + + JSONObject readResult = JSONObject.parseObject(read.invoker().apply(Map.of( + "id", executionId, + "stream", "stdout", + "offset", 70000, + "limit", 20 + ))); + + Assertions.assertEquals("AAAAAAAAAAAAAAAAAAAA", readResult.getString("content")); + Assertions.assertEquals(70020, readResult.getIntValue("nextOffset")); + } + private void expireHandle(BuiltinCommandActionProvider provider, String executionId) throws Exception { Field handlesField = BuiltinCommandActionProvider.class.getDeclaredField("commandHandles"); handlesField.setAccessible(true);