diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/ContextWorkspace.kt b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/ContextWorkspace.kt index 18b23af0..2f94f078 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/ContextWorkspace.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/ContextWorkspace.kt @@ -4,10 +4,8 @@ import com.alibaba.fastjson2.JSONObject import org.w3c.dom.Document import org.w3c.dom.Element import work.slhaf.partner.common.base.Block -import work.slhaf.partner.framework.agent.config.ConfigCenter import work.slhaf.partner.framework.agent.log.TraceEvent import work.slhaf.partner.framework.agent.log.TraceRecorder -import java.nio.file.Path import java.time.Duration import java.time.Instant import java.util.* @@ -18,11 +16,6 @@ import kotlin.math.min class ContextWorkspace { - private val tracePath: Path = ConfigCenter.paths.stateDir - .resolve("trace") - .resolve("context-workspace") - .normalize() - .toAbsolutePath() private val stateSet = mutableSetOf() private val lock = ReentrantReadWriteLock() @@ -157,7 +150,7 @@ class ContextWorkspace { .thenBy { it.sourceKey.source } ) .map(::blockSnapshot) - TraceRecorder.record(TraceEvent(tracePath, payload)) + TraceRecorder.record(TraceEvent("context-workspace", payload)) } private fun blockSnapshot(block: ContextBlock): JSONObject { diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java index 25017c27..b3100730 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/Agent.java @@ -12,7 +12,7 @@ import work.slhaf.partner.framework.agent.factory.context.AgentContext; import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistration; import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistry; import work.slhaf.partner.framework.agent.log.LogAdviceProvider; -import work.slhaf.partner.framework.agent.log.TraceRecorder; +import work.slhaf.partner.framework.agent.log.TraceSinkRegistry; import work.slhaf.partner.framework.agent.model.ModelRuntimeRegistry; import work.slhaf.partner.framework.agent.state.StateCenter; @@ -135,9 +135,9 @@ public final class Agent { StateCenter.INSTANCE::save ); AgentContext.INSTANCE.addPostShutdownHook( - "trace-recorder-close", + "trace-sink-registry-close", 90, - TraceRecorder.INSTANCE::close + TraceSinkRegistry::close ); AgentContext.INSTANCE.addPostShutdownHook( "config-center-close", diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/exception/exception.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/exception/exception.kt index fc0a422d..68777486 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/exception/exception.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/exception/exception.kt @@ -2,7 +2,6 @@ package work.slhaf.partner.framework.agent.exception import com.alibaba.fastjson2.JSONObject import org.slf4j.LoggerFactory -import work.slhaf.partner.framework.agent.config.ConfigCenter import work.slhaf.partner.framework.agent.log.TraceEvent import work.slhaf.partner.framework.agent.log.TraceRecorder @@ -96,17 +95,12 @@ interface ExceptionReporter { object LoggerExceptionReporter : ExceptionReporter { private val log = LoggerFactory.getLogger(this::class.java) - private val tracePath = ConfigCenter.paths.stateDir - .resolve("trace") - .resolve("log-exception-reporter") - .toAbsolutePath() - .normalize() override fun reporterName(): String = "logger-reporter" override fun report(exception: AgentException) { val exceptionReport = exception.toReport().toDetailedString() - TraceRecorder.record(TraceEvent(tracePath, JSONObject.of("exception", exceptionReport))) + TraceRecorder.record(TraceEvent("exception", JSONObject.of("exception", exceptionReport))) log.error("exception occurred: $exceptionReport") } diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt index 51a1c4c3..e6349fe1 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/LogAdviceProvider.kt @@ -4,25 +4,19 @@ import com.alibaba.fastjson2.JSONException import com.alibaba.fastjson2.JSONObject import org.slf4j.LoggerFactory import work.slhaf.partner.framework.agent.config.Config -import work.slhaf.partner.framework.agent.config.ConfigCenter import work.slhaf.partner.framework.agent.config.ConfigRegistration import work.slhaf.partner.framework.agent.config.Configurable -import java.nio.file.Files import java.nio.file.Path import java.time.ZonedDateTime object LogAdviceProvider : Configurable, ConfigRegistration { - private val logPath = ConfigCenter.paths.stateDir.resolve("trace").normalize().toAbsolutePath() private val _adviceRegistry = mutableSetOf>() val adviceRegistry: Set> get() = _adviceRegistry var logLevel = AdviceLoggingConfig.LogLevel.NONE - init { - Files.createDirectories(logPath) - } @JvmOverloads fun createAdvice( @@ -54,8 +48,7 @@ object LogAdviceProvider : Configurable, ConfigRegistration } internal fun record(result: AdviceResult) { - val path = logPath.resolve(result.adviceTarget).normalize().toAbsolutePath() - val traceEvent = TraceEvent(path, result.toJSON(), result.finishTime.toInstant().toEpochMilli()) + val traceEvent = TraceEvent(result.adviceTarget, result.toJSON(), result.finishTime.toInstant().toEpochMilli()) TraceRecorder.record(traceEvent) } diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt index 2185c20e..0ac7a83a 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/log/TraceRecorder.kt @@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSONObject import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import org.slf4j.LoggerFactory -import work.slhaf.partner.framework.agent.factory.context.AgentContext +import work.slhaf.partner.framework.agent.config.ConfigCenter import java.io.BufferedWriter import java.io.OutputStream import java.nio.charset.StandardCharsets @@ -16,11 +16,80 @@ import java.time.Instant import java.time.LocalDate import java.time.ZoneId import java.time.format.DateTimeFormatter +import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.AtomicBoolean import java.util.zip.GZIPOutputStream +object TraceSinkRegistry { + + private val log = LoggerFactory.getLogger(TraceSinkRegistry::class.java) + private val sinks = CopyOnWriteArrayList() + private val closed = AtomicBoolean(false) + + init { + register(FileTraceSink) + } + + @JvmStatic + fun register(sink: TraceSink) { + if (closed.get()) { + log.warn("TraceSinkRegistry is closed, skip trace sink: {}", sink.javaClass.name) + return + } + if (!sinks.contains(sink)) { + sinks.add(sink) + } + } + + @JvmStatic + fun unregister(sink: TraceSink) { + sinks.remove(sink) + } + + @JvmStatic + fun publish(event: TraceEvent) { + for (sink in sinks) { + runCatching { + sink.consume(event) + }.onFailure { + log.error("Trace sink failed: {}", sink.javaClass.name, it) + } + } + } + + @JvmStatic + fun close() { + if (!closed.compareAndSet(false, true)) { + return + } + sinks.forEach { sink -> + runCatching { + sink.close() + }.onFailure { + log.error("Failed to close trace sink: {}", sink.javaClass.name, it) + } + } + sinks.clear() + } +} + +interface TraceSink : AutoCloseable { + fun consume(event: TraceEvent) + + override fun close() { + } +} + object TraceRecorder { + @JvmStatic + fun record(event: TraceEvent) { + TraceSinkRegistry.publish(event) + } +} + +object FileTraceSink : TraceSink { + private const val ACTIVE_FILE_NAME = "active.jsonl" private const val HISTORICAL_DIR_NAME = "historical" private const val ARCHIVED_DIR_NAME = "archived" @@ -40,7 +109,6 @@ object TraceRecorder { private val writerJob: Job init { - AgentContext.addPostShutdownHook("trace-recorder-close") { close() } writerJob = scope.launch { try { for (event in channel) { @@ -54,18 +122,18 @@ object TraceRecorder { } } - fun record(event: TraceEvent) { + override fun consume(event: TraceEvent) { if (closed.get()) { - log.warn("TraceRecorder is closed, skip event for path: {}", event.path) + log.warn("FileTraceSink is closed, skip event for key: {}", event.key) return } val result = channel.trySend(event) if (result.isFailure) { - log.error("Failed to enqueue trace event for path: {}", event.path, result.exceptionOrNull()) + log.error("Failed to enqueue trace event for key: {}", event.key, result.exceptionOrNull()) } } - fun close() { + override fun close() { if (!closed.compareAndSet(false, true)) { return } @@ -77,7 +145,7 @@ object TraceRecorder { } private fun handleEvent(event: TraceEvent) { - val basePath = event.path.normalize().toAbsolutePath() + val basePath = resolveBasePath(event.key) runCatching { val state = writerStates.getOrPut(basePath) { openWriterState(basePath) } writeEvent(state, event) @@ -85,12 +153,27 @@ object TraceRecorder { rotateActiveFile(state) } }.onFailure { - log.error("Failed to persist trace event for path: {}", basePath, it) + log.error("Failed to persist trace event for key: {}, path: {}", event.key, basePath, it) } } + private fun resolveBasePath(key: String): Path { + val traceRoot = ConfigCenter.paths.stateDir.resolve("trace").normalize().toAbsolutePath() + val normalizedKey = key.trim().ifBlank { "default" } + val candidate = traceRoot.resolve(normalizedKey).normalize().toAbsolutePath() + if (candidate.startsWith(traceRoot)) { + return candidate + } + return traceRoot.resolve(sanitizeKey(normalizedKey)).normalize().toAbsolutePath() + } + + private fun sanitizeKey(key: String): String { + return key.replace(Regex("[^A-Za-z0-9._-]+"), "_").trim('_').ifBlank { "default" } + } + private fun writeEvent(state: WriterState, event: TraceEvent) { val json = JSONObject(event.payload) + json["traceKey"] = event.key json["timestamp"] = event.timestamp val line = json.toJSONString() state.writer.write(line) @@ -327,7 +410,7 @@ object TraceRecorder { } data class TraceEvent @JvmOverloads constructor( - val path: Path, + val key: String, val payload: JSONObject, val timestamp: Long = System.currentTimeMillis() )