From fece67135f52116d476bf3132c1741a69c5599f5 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Mon, 13 Apr 2026 21:39:42 +0800 Subject: [PATCH] feat(trace): support create loggable advice and implement common trace recorder --- .../slhaf/partner/framework/agent/Agent.java | 14 +- .../framework/agent/log/LogAdviceProvider.kt | 224 ++++++++++++ .../framework/agent/log/TraceRecorder.kt | 333 ++++++++++++++++++ 3 files changed, 569 insertions(+), 2 deletions(-) create mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt create mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java index 533b8360..25017c27 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java @@ -11,6 +11,8 @@ import work.slhaf.partner.framework.agent.factory.AgentRegisterFactory; import work.slhaf.partner.framework.agent.factory.context.AgentContext; import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistration; import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistry; +import work.slhaf.partner.framework.agent.log.LogAdviceProvider; +import work.slhaf.partner.framework.agent.log.TraceRecorder; import work.slhaf.partner.framework.agent.model.ModelRuntimeRegistry; import work.slhaf.partner.framework.agent.state.StateCenter; @@ -77,14 +79,17 @@ public final class Agent { public boolean launch() { try { + // load class + ConfigCenter.INSTANCE.toString(); + StateCenter.INSTANCE.toString(); + // Keep startup order explicit so registries are ready before component scanning. for (ExceptionReporter exceptionReporter : exceptionReporters) { exceptionReporter.register(); } - // Load class - StateCenter.INSTANCE.toString(); // Register into config center + LogAdviceProvider.INSTANCE.register(); ModelRuntimeRegistry.INSTANCE.register(); AgentGatewayRegistry.INSTANCE.register(); for (Configurable configurable : configurables) { @@ -129,6 +134,11 @@ public final class Agent { 0, StateCenter.INSTANCE::save ); + AgentContext.INSTANCE.addPostShutdownHook( + "trace-recorder-close", + 90, + TraceRecorder.INSTANCE::close + ); AgentContext.INSTANCE.addPostShutdownHook( "config-center-close", 100, diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt new file mode 100644 index 00000000..6f75e98b --- /dev/null +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt @@ -0,0 +1,224 @@ +package work.slhaf.partner.framework.agent.log + +import com.alibaba.fastjson2.JSONException +import com.alibaba.fastjson2.JSONObject +import org.slf4j.LoggerFactory +import work.slhaf.partner.framework.agent.config.Config +import work.slhaf.partner.framework.agent.config.ConfigCenter +import work.slhaf.partner.framework.agent.config.ConfigRegistration +import work.slhaf.partner.framework.agent.config.Configurable +import java.nio.file.Files +import java.nio.file.Path +import java.time.ZonedDateTime + +object LogAdviceProvider : Configurable, ConfigRegistration { + + private val logPath = ConfigCenter.paths.stateDir.resolve("trace").normalize().toAbsolutePath() + private val _adviceRegistry = mutableSetOf>() + val adviceRegistry: Set> + get() = _adviceRegistry + + var logLevel = AdviceLoggingConfig.LogLevel.NONE + + init { + Files.createDirectories(logPath) + } + + @JvmOverloads + fun createAdvice( + adviceTarget: String, + inputType: Class, + outputType: Class, + meta: Map = emptyMap(), + invoker: (I) -> O + ): LogAdvice { + return LogAdvice( + adviceTarget = adviceTarget, + invoker = invoker, + AdviceMeta(adviceTarget, inputType, outputType, meta) + ).apply { _adviceRegistry.add(this) } + } + + internal fun record(result: AdviceResult) { + val path = logPath.resolve(result.adviceTarget).normalize().toAbsolutePath() + val traceEvent = TraceEvent(path, result.toJSON(), result.finishTime.toInstant().toEpochMilli()) + TraceRecorder.record(traceEvent) + } + + override fun declare(): Map> = mapOf(Path.of("advice_logging.json") to this) + + override fun type(): Class = AdviceLoggingConfig::class.java + + override fun init( + config: AdviceLoggingConfig, + json: JSONObject? + ) { + logLevel = config.logLevel + } + + override fun defaultConfig(): AdviceLoggingConfig = AdviceLoggingConfig(AdviceLoggingConfig.LogLevel.NONE) +} + +class LogAdvice internal constructor( + val adviceTarget: String, + private val invoker: (I) -> O, + private val adviceMeta: AdviceMeta +) { + + companion object { + private val log = LoggerFactory.getLogger(LogAdvice::class.java) + } + + fun invoke(input: I): Result { + val startAt = ZonedDateTime.now() + return try { + logEnter(input) + val output = invoker(input) + logOutput(output) + createResult(input, output, startAt) + Result.success(output) + } catch (e: Exception) { + logException(e) + createUnexpectedResult(input, e, startAt) + throw e + } + } + + private fun logException(e: Exception) { + when (LogAdviceProvider.logLevel) { + AdviceLoggingConfig.LogLevel.NONE -> return + AdviceLoggingConfig.LogLevel.ABSTRACT -> log.error("${adviceMeta.adviceTarget} occurred exception: $e..") + AdviceLoggingConfig.LogLevel.DETAIL -> log.error("${adviceMeta.adviceTarget} occurred exception: ", e) + } + } + + private fun logOutput(output: O) { + when (LogAdviceProvider.logLevel) { + AdviceLoggingConfig.LogLevel.NONE -> return + AdviceLoggingConfig.LogLevel.ABSTRACT -> log.info("${adviceMeta.adviceTarget} ended.") + AdviceLoggingConfig.LogLevel.DETAIL -> { + try { + log.info("${adviceMeta.adviceTarget} ended with output: ${JSONObject.toJSONString(output)}") + } catch (_: Exception) { + log.info("${adviceMeta.adviceTarget} ended with output: ${output.toString()}, which cannot be printed as json string.") + } + } + } + } + + private fun logEnter(input: I) { + when (LogAdviceProvider.logLevel) { + AdviceLoggingConfig.LogLevel.NONE -> return + AdviceLoggingConfig.LogLevel.ABSTRACT -> log.info("${adviceMeta.adviceTarget} entered.") + AdviceLoggingConfig.LogLevel.DETAIL -> { + try { + log.info("${adviceMeta.adviceTarget} entered with input : ${JSONObject.toJSONString(input)}") + } catch (_: Exception) { + log.info("${adviceMeta.adviceTarget} entered with input : ${input.toString()}, which cannot be printed as json string.") + } + } + } + } + + private fun createResult(input: I, output: O, startAt: ZonedDateTime) { + val inputSerialized = try { + JSONObject.toJSONString(input) + } catch (_: JSONException) { + input.toString() + } + val outputSerialized = try { + JSONObject.toJSONString(output) + } catch (_: JSONException) { + output.toString() + } + LogAdviceProvider.record( + AdviceResult.Normal( + adviceTarget, + inputSerialized, + startAt, + adviceMeta, + outputSerialized + ) + ) + } + + private fun createUnexpectedResult(input: I, throwable: Throwable, startAt: ZonedDateTime) { /* 落盘 */ + val inputSerialized = try { + JSONObject.toJSONString(input) + } catch (_: JSONException) { + input.toString() + } + LogAdviceProvider.record( + AdviceResult.Unexpected( + adviceTarget, + inputSerialized, + startAt, + adviceMeta, + throwable.localizedMessage ?: "", + throwable.stackTraceToString() + ) + ) + } +} + +data class AdviceMeta( + val adviceTarget: String, + val inputType: Class<*>, + val outputType: Class<*>, + val meta: Map +) + +sealed class AdviceResult { + + abstract val adviceTarget: String + abstract val input: String + abstract val type: Type + abstract val startAt: ZonedDateTime + abstract val adviceMeta: AdviceMeta + + val finishTime: ZonedDateTime = ZonedDateTime.now() + val elapsed: Long + get() = finishTime.toInstant().toEpochMilli() - startAt.toInstant().toEpochMilli() + + enum class Type { + NORMAL, + UNEXPECTED + } + + + data class Normal( + override val adviceTarget: String, + override val input: String, + override val startAt: ZonedDateTime, + override val adviceMeta: AdviceMeta, + val output: String, + ) : AdviceResult() { + override val type: Type = Type.NORMAL + } + + data class Unexpected( + override val adviceTarget: String, + override val input: String, + override val startAt: ZonedDateTime, + override val adviceMeta: AdviceMeta, + val message: String, + val stackTrace: String + ) : AdviceResult() { + override val type: Type = Type.UNEXPECTED + } + + fun toJSON(): JSONObject = JSONObject.from(this) + + override fun toString(): String = toJSON().toJSONString() + +} + +data class AdviceLoggingConfig( + val logLevel: LogLevel +) : Config() { + enum class LogLevel { + NONE, + ABSTRACT, + DETAIL + } +} diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt new file mode 100644 index 00000000..2185c20e --- /dev/null +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt @@ -0,0 +1,333 @@ +package work.slhaf.partner.framework.agent.log + +import com.alibaba.fastjson2.JSONObject +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import org.slf4j.LoggerFactory +import work.slhaf.partner.framework.agent.factory.context.AgentContext +import java.io.BufferedWriter +import java.io.OutputStream +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.StandardCopyOption +import java.nio.file.StandardOpenOption +import java.time.Instant +import java.time.LocalDate +import java.time.ZoneId +import java.time.format.DateTimeFormatter +import java.util.concurrent.atomic.AtomicBoolean +import java.util.zip.GZIPOutputStream + +object TraceRecorder { + + private const val ACTIVE_FILE_NAME = "active.jsonl" + private const val HISTORICAL_DIR_NAME = "historical" + private const val ARCHIVED_DIR_NAME = "archived" + private const val MAX_ACTIVE_SIZE_BYTES = 16L * 1024 * 1024 + private const val MAX_ACTIVE_RECORDS = 2000 + private const val MAX_HISTORICAL_SIZE_BYTES = 256L * 1024 * 1024 + private const val MAX_ARCHIVED_SIZE_BYTES = 1024L * 1024 * 1024 + private const val ARCHIVED_RETENTION_DAYS = 14L + + private val log = LoggerFactory.getLogger(TraceRecorder::class.java) + private val zoneId: ZoneId = ZoneId.systemDefault() + private val historyNameFormatter = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss-SSS") + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + private val channel = Channel(Channel.UNLIMITED) + private val writerStates = linkedMapOf() + private val closed = AtomicBoolean(false) + private val writerJob: Job + + init { + AgentContext.addPostShutdownHook("trace-recorder-close") { close() } + writerJob = scope.launch { + try { + for (event in channel) { + handleEvent(event) + } + } catch (e: Exception) { + log.error("TraceRecorder writer loop failed", e) + } finally { + closeAllWriters() + } + } + } + + fun record(event: TraceEvent) { + if (closed.get()) { + log.warn("TraceRecorder is closed, skip event for path: {}", event.path) + return + } + val result = channel.trySend(event) + if (result.isFailure) { + log.error("Failed to enqueue trace event for path: {}", event.path, result.exceptionOrNull()) + } + } + + fun close() { + if (!closed.compareAndSet(false, true)) { + return + } + channel.close() + runBlocking { + writerJob.join() + } + scope.cancel() + } + + private fun handleEvent(event: TraceEvent) { + val basePath = event.path.normalize().toAbsolutePath() + runCatching { + val state = writerStates.getOrPut(basePath) { openWriterState(basePath) } + writeEvent(state, event) + if (shouldRotate(state)) { + rotateActiveFile(state) + } + }.onFailure { + log.error("Failed to persist trace event for path: {}", basePath, it) + } + } + + private fun writeEvent(state: WriterState, event: TraceEvent) { + val json = JSONObject(event.payload) + json["timestamp"] = event.timestamp + val line = json.toJSONString() + state.writer.write(line) + state.writer.newLine() + state.writer.flush() + state.recordCount += 1 + state.byteCount += line.toByteArray(StandardCharsets.UTF_8).size + 1L + } + + private fun shouldRotate(state: WriterState): Boolean { + return state.byteCount >= MAX_ACTIVE_SIZE_BYTES || state.recordCount >= MAX_ACTIVE_RECORDS + } + + private fun openWriterState(basePath: Path): WriterState { + ensureDirectories(basePath) + val activeFile = activeFile(basePath) + if (Files.exists(activeFile)) { + val existingSize = Files.size(activeFile) + val existingCount = countLines(activeFile) + if (existingSize >= MAX_ACTIVE_SIZE_BYTES || existingCount >= MAX_ACTIVE_RECORDS) { + rotateExistingActiveFile(basePath, activeFile) + } + } + if (Files.notExists(activeFile)) { + Files.createFile(activeFile) + } + val writer = Files.newBufferedWriter( + activeFile, + StandardCharsets.UTF_8, + StandardOpenOption.CREATE, + StandardOpenOption.APPEND + ) + return WriterState( + basePath = basePath, + activeFile = activeFile, + writer = writer, + recordCount = countLines(activeFile), + byteCount = Files.size(activeFile) + ) + } + + private fun rotateExistingActiveFile(basePath: Path, activeFile: Path) { + if (Files.notExists(activeFile) || Files.size(activeFile) == 0L) { + return + } + val historicalFile = nextHistoricalFile(basePath) + Files.move(activeFile, historicalFile, StandardCopyOption.REPLACE_EXISTING) + archiveHistoricalFiles(basePath) + cleanupArchivedFiles(basePath) + } + + private fun rotateActiveFile(state: WriterState) { + state.writer.flush() + state.writer.close() + val historicalFile = nextHistoricalFile(state.basePath) + Files.move(state.activeFile, historicalFile, StandardCopyOption.REPLACE_EXISTING) + Files.createFile(state.activeFile) + state.writer = Files.newBufferedWriter( + state.activeFile, + StandardCharsets.UTF_8, + StandardOpenOption.CREATE, + StandardOpenOption.APPEND + ) + state.recordCount = 0 + state.byteCount = 0L + archiveHistoricalFiles(state.basePath) + cleanupArchivedFiles(state.basePath) + } + + private fun archiveHistoricalFiles(basePath: Path) { + val historicalDir = historicalDir(basePath) + if (Files.notExists(historicalDir)) { + return + } + val files = listRegularFiles(historicalDir) + if (files.isEmpty()) { + return + } + + val today = LocalDate.now(zoneId) + val totalSize = files.sumOf { Files.size(it) } + val hasNonToday = files.any { !fileDate(it).isEqual(today) } + if (totalSize < MAX_HISTORICAL_SIZE_BYTES && !hasNonToday) { + return + } + + val candidates = if (totalSize >= MAX_HISTORICAL_SIZE_BYTES) { + files + } else { + files.filter { !fileDate(it).isEqual(today) } + } + + candidates.forEach { file -> + runCatching { + val archiveFile = archivedDir(basePath).resolve("${file.fileName}.gz") + Files.newInputStream(file).use { input -> + Files.newOutputStream( + archiveFile, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + StandardOpenOption.WRITE + ).use { output -> + gzip(input, output) + } + } + Files.setLastModifiedTime(archiveFile, Files.getLastModifiedTime(file)) + Files.deleteIfExists(file) + }.onFailure { + log.error("Failed to archive historical trace file: {}", file, it) + } + } + } + + private fun cleanupArchivedFiles(basePath: Path) { + val archivedDir = archivedDir(basePath) + if (Files.notExists(archivedDir)) { + return + } + val files = listRegularFiles(archivedDir).toMutableList() + if (files.isEmpty()) { + return + } + + val today = LocalDate.now(zoneId) + val retainedFiles = mutableListOf() + files.forEach { file -> + val expired = fileDate(file).plusDays(ARCHIVED_RETENTION_DAYS).isBefore(today) + if (expired) { + deleteArchivedFile(file) + } else { + retainedFiles.add(file) + } + } + + var totalSize = retainedFiles.sumOf { Files.size(it) } + val iterator = retainedFiles.iterator() + while (totalSize > MAX_ARCHIVED_SIZE_BYTES && iterator.hasNext()) { + val file = iterator.next() + val fileSize = Files.size(file) + if (deleteArchivedFile(file)) { + totalSize -= fileSize + iterator.remove() + } + } + } + + private fun deleteArchivedFile(file: Path): Boolean { + return runCatching { + Files.deleteIfExists(file) + }.onFailure { + log.error("Failed to delete archived trace file: {}", file, it) + }.getOrDefault(false) + } + + private fun ensureDirectories(basePath: Path) { + Files.createDirectories(basePath) + Files.createDirectories(historicalDir(basePath)) + Files.createDirectories(archivedDir(basePath)) + } + + private fun closeAllWriters() { + writerStates.values.forEach { state -> + runCatching { + state.writer.flush() + state.writer.close() + }.onFailure { + log.error("Failed to close trace writer for path: {}", state.basePath, it) + } + } + writerStates.clear() + } + + private fun activeFile(basePath: Path): Path = basePath.resolve(ACTIVE_FILE_NAME) + + private fun historicalDir(basePath: Path): Path = basePath.resolve(HISTORICAL_DIR_NAME) + + private fun archivedDir(basePath: Path): Path = basePath.resolve(ARCHIVED_DIR_NAME) + + private fun nextHistoricalFile(basePath: Path): Path { + val baseName = historyNameFormatter.format(Instant.now().atZone(zoneId)) + var candidate = historicalDir(basePath).resolve("$baseName.jsonl") + var index = 1 + while (Files.exists(candidate)) { + candidate = historicalDir(basePath).resolve("$baseName-$index.jsonl") + index += 1 + } + return candidate + } + + private fun fileDate(path: Path): LocalDate { + return Files.getLastModifiedTime(path).toInstant().atZone(zoneId).toLocalDate() + } + + private fun countLines(path: Path): Int { + if (Files.notExists(path)) { + return 0 + } + Files.newBufferedReader(path, StandardCharsets.UTF_8).use { reader -> + var count = 0 + while (reader.readLine() != null) { + count += 1 + } + return count + } + } + + private fun gzip(input: java.io.InputStream, output: OutputStream) { + GZIPOutputStream(output).use { gzipOutput -> + input.copyTo(gzipOutput) + } + } + + private fun listRegularFiles(dir: Path): List { + val files = mutableListOf() + Files.list(dir).use { stream -> + stream.forEach { path -> + if (Files.isRegularFile(path)) { + files.add(path) + } + } + } + return files.sortedWith(compareBy { + Files.getLastModifiedTime(it).toMillis() + }.thenBy { it.fileName.toString() }) + } + + private data class WriterState( + val basePath: Path, + val activeFile: Path, + var writer: BufferedWriter, + var recordCount: Int, + var byteCount: Long + ) +} + +data class TraceEvent @JvmOverloads constructor( + val path: Path, + val payload: JSONObject, + val timestamp: Long = System.currentTimeMillis() +)