From 73f6ff27453f80d7a8d8893420c7df1f93f23e84 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Mon, 27 Apr 2026 23:53:20 +0800 Subject: [PATCH] refactor(trace): decouple recorder from file persistence Turn TraceRecorder into a lightweight trace event entry point and move file persistence responsibilities into the default FileTraceSink. Trace events are now published through TraceSinkRegistry, allowing additional runtime observers to subscribe without parsing trace files. Add TraceSink and TraceSinkRegistry, keep FileTraceSink registered as the default sink, and preserve the existing active/historical/archived trace file rotation behavior inside the file sink. Also change TraceEvent to carry a logical key instead of a caller-provided path, so trace storage locations are resolved internally under the traceroot. Update existing trace producers to emit logical keys such ascontext-workspace, exception, and advice targets. --- .../core/cognition/ContextWorkspace.kt | 9 +- .../slhaf/partner/framework/agent/Agent.java | 6 +- .../framework/agent/exception/exception.kt | 8 +- .../framework/agent/log/LogAdviceProvider.kt | 9 +- .../framework/agent/log/TraceRecorder.kt | 101 ++++++++++++++++-- 5 files changed, 98 insertions(+), 35 deletions(-) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/ContextWorkspace.kt b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/ContextWorkspace.kt index 18b23af0..2f94f078 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/ContextWorkspace.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/ContextWorkspace.kt @@ -4,10 +4,8 @@ import com.alibaba.fastjson2.JSONObject import org.w3c.dom.Document import org.w3c.dom.Element import work.slhaf.partner.common.base.Block -import work.slhaf.partner.framework.agent.config.ConfigCenter import work.slhaf.partner.framework.agent.log.TraceEvent import work.slhaf.partner.framework.agent.log.TraceRecorder -import java.nio.file.Path import java.time.Duration import java.time.Instant import java.util.* @@ -18,11 +16,6 @@ import kotlin.math.min class ContextWorkspace { - private val tracePath: Path = ConfigCenter.paths.stateDir - .resolve("trace") - .resolve("context-workspace") - .normalize() - .toAbsolutePath() private val stateSet = mutableSetOf() private val lock = ReentrantReadWriteLock() @@ -157,7 +150,7 @@ class ContextWorkspace { .thenBy { it.sourceKey.source } ) .map(::blockSnapshot) - TraceRecorder.record(TraceEvent(tracePath, payload)) + TraceRecorder.record(TraceEvent("context-workspace", payload)) } private fun blockSnapshot(block: ContextBlock): JSONObject { diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java index 25017c27..b3100730 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java @@ -12,7 +12,7 @@ import work.slhaf.partner.framework.agent.factory.context.AgentContext; import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistration; import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistry; import work.slhaf.partner.framework.agent.log.LogAdviceProvider; -import work.slhaf.partner.framework.agent.log.TraceRecorder; +import work.slhaf.partner.framework.agent.log.TraceSinkRegistry; import work.slhaf.partner.framework.agent.model.ModelRuntimeRegistry; import work.slhaf.partner.framework.agent.state.StateCenter; @@ -135,9 +135,9 @@ public final class Agent { StateCenter.INSTANCE::save ); AgentContext.INSTANCE.addPostShutdownHook( - "trace-recorder-close", + "trace-sink-registry-close", 90, - TraceRecorder.INSTANCE::close + TraceSinkRegistry::close ); AgentContext.INSTANCE.addPostShutdownHook( "config-center-close", diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/exception/exception.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/exception/exception.kt index fc0a422d..68777486 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/exception/exception.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/exception/exception.kt @@ -2,7 +2,6 @@ package work.slhaf.partner.framework.agent.exception import com.alibaba.fastjson2.JSONObject import org.slf4j.LoggerFactory -import work.slhaf.partner.framework.agent.config.ConfigCenter import work.slhaf.partner.framework.agent.log.TraceEvent import work.slhaf.partner.framework.agent.log.TraceRecorder @@ -96,17 +95,12 @@ interface ExceptionReporter { object LoggerExceptionReporter : ExceptionReporter { private val log = LoggerFactory.getLogger(this::class.java) - private val tracePath = ConfigCenter.paths.stateDir - .resolve("trace") - .resolve("log-exception-reporter") - .toAbsolutePath() - .normalize() override fun reporterName(): String = "logger-reporter" override fun report(exception: AgentException) { val exceptionReport = exception.toReport().toDetailedString() - TraceRecorder.record(TraceEvent(tracePath, JSONObject.of("exception", exceptionReport))) + TraceRecorder.record(TraceEvent("exception", JSONObject.of("exception", exceptionReport))) log.error("exception occurred: $exceptionReport") } diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt index 51a1c4c3..e6349fe1 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt @@ -4,25 +4,19 @@ import com.alibaba.fastjson2.JSONException import com.alibaba.fastjson2.JSONObject import org.slf4j.LoggerFactory import work.slhaf.partner.framework.agent.config.Config -import work.slhaf.partner.framework.agent.config.ConfigCenter import work.slhaf.partner.framework.agent.config.ConfigRegistration import work.slhaf.partner.framework.agent.config.Configurable -import java.nio.file.Files import java.nio.file.Path import java.time.ZonedDateTime object LogAdviceProvider : Configurable, ConfigRegistration { - private val logPath = ConfigCenter.paths.stateDir.resolve("trace").normalize().toAbsolutePath() private val _adviceRegistry = mutableSetOf>() val adviceRegistry: Set> get() = _adviceRegistry var logLevel = AdviceLoggingConfig.LogLevel.NONE - init { - Files.createDirectories(logPath) - } @JvmOverloads fun createAdvice( @@ -54,8 +48,7 @@ object LogAdviceProvider : Configurable, ConfigRegistration } internal fun record(result: AdviceResult) { - val path = logPath.resolve(result.adviceTarget).normalize().toAbsolutePath() - val traceEvent = TraceEvent(path, result.toJSON(), result.finishTime.toInstant().toEpochMilli()) + val traceEvent = TraceEvent(result.adviceTarget, result.toJSON(), result.finishTime.toInstant().toEpochMilli()) TraceRecorder.record(traceEvent) } diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt index 2185c20e..0ac7a83a 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt @@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSONObject import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import org.slf4j.LoggerFactory -import work.slhaf.partner.framework.agent.factory.context.AgentContext +import work.slhaf.partner.framework.agent.config.ConfigCenter import java.io.BufferedWriter import java.io.OutputStream import java.nio.charset.StandardCharsets @@ -16,11 +16,80 @@ import java.time.Instant import java.time.LocalDate import java.time.ZoneId import java.time.format.DateTimeFormatter +import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.AtomicBoolean import java.util.zip.GZIPOutputStream +object TraceSinkRegistry { + + private val log = LoggerFactory.getLogger(TraceSinkRegistry::class.java) + private val sinks = CopyOnWriteArrayList() + private val closed = AtomicBoolean(false) + + init { + register(FileTraceSink) + } + + @JvmStatic + fun register(sink: TraceSink) { + if (closed.get()) { + log.warn("TraceSinkRegistry is closed, skip trace sink: {}", sink.javaClass.name) + return + } + if (!sinks.contains(sink)) { + sinks.add(sink) + } + } + + @JvmStatic + fun unregister(sink: TraceSink) { + sinks.remove(sink) + } + + @JvmStatic + fun publish(event: TraceEvent) { + for (sink in sinks) { + runCatching { + sink.consume(event) + }.onFailure { + log.error("Trace sink failed: {}", sink.javaClass.name, it) + } + } + } + + @JvmStatic + fun close() { + if (!closed.compareAndSet(false, true)) { + return + } + sinks.forEach { sink -> + runCatching { + sink.close() + }.onFailure { + log.error("Failed to close trace sink: {}", sink.javaClass.name, it) + } + } + sinks.clear() + } +} + +interface TraceSink : AutoCloseable { + fun consume(event: TraceEvent) + + override fun close() { + } +} + object TraceRecorder { + @JvmStatic + fun record(event: TraceEvent) { + TraceSinkRegistry.publish(event) + } +} + +object FileTraceSink : TraceSink { + private const val ACTIVE_FILE_NAME = "active.jsonl" private const val HISTORICAL_DIR_NAME = "historical" private const val ARCHIVED_DIR_NAME = "archived" @@ -40,7 +109,6 @@ object TraceRecorder { private val writerJob: Job init { - AgentContext.addPostShutdownHook("trace-recorder-close") { close() } writerJob = scope.launch { try { for (event in channel) { @@ -54,18 +122,18 @@ object TraceRecorder { } } - fun record(event: TraceEvent) { + override fun consume(event: TraceEvent) { if (closed.get()) { - log.warn("TraceRecorder is closed, skip event for path: {}", event.path) + log.warn("FileTraceSink is closed, skip event for key: {}", event.key) return } val result = channel.trySend(event) if (result.isFailure) { - log.error("Failed to enqueue trace event for path: {}", event.path, result.exceptionOrNull()) + log.error("Failed to enqueue trace event for key: {}", event.key, result.exceptionOrNull()) } } - fun close() { + override fun close() { if (!closed.compareAndSet(false, true)) { return } @@ -77,7 +145,7 @@ object TraceRecorder { } private fun handleEvent(event: TraceEvent) { - val basePath = event.path.normalize().toAbsolutePath() + val basePath = resolveBasePath(event.key) runCatching { val state = writerStates.getOrPut(basePath) { openWriterState(basePath) } writeEvent(state, event) @@ -85,12 +153,27 @@ object TraceRecorder { rotateActiveFile(state) } }.onFailure { - log.error("Failed to persist trace event for path: {}", basePath, it) + log.error("Failed to persist trace event for key: {}, path: {}", event.key, basePath, it) } } + private fun resolveBasePath(key: String): Path { + val traceRoot = ConfigCenter.paths.stateDir.resolve("trace").normalize().toAbsolutePath() + val normalizedKey = key.trim().ifBlank { "default" } + val candidate = traceRoot.resolve(normalizedKey).normalize().toAbsolutePath() + if (candidate.startsWith(traceRoot)) { + return candidate + } + return traceRoot.resolve(sanitizeKey(normalizedKey)).normalize().toAbsolutePath() + } + + private fun sanitizeKey(key: String): String { + return key.replace(Regex("[^A-Za-z0-9._-]+"), "_").trim('_').ifBlank { "default" } + } + private fun writeEvent(state: WriterState, event: TraceEvent) { val json = JSONObject(event.payload) + json["traceKey"] = event.key json["timestamp"] = event.timestamp val line = json.toJSONString() state.writer.write(line) @@ -327,7 +410,7 @@ object TraceRecorder { } data class TraceEvent @JvmOverloads constructor( - val path: Path, + val key: String, val payload: JSONObject, val timestamp: Long = System.currentTimeMillis() )