mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
refactor(trace): decouple recorder from file persistence
Turn TraceRecorder into a lightweight trace event entry point and move file persistence responsibilities into the default FileTraceSink. Trace events are now published through TraceSinkRegistry, allowing additional runtime observers to subscribe without parsing trace files. Add TraceSink and TraceSinkRegistry, keep FileTraceSink registered as the default sink, and preserve the existing active/historical/archived trace file rotation behavior inside the file sink. Also change TraceEvent to carry a logical key instead of a caller-provided path, so trace storage locations are resolved internally under the traceroot. Update existing trace producers to emit logical keys such ascontext-workspace, exception, and advice targets.
This commit is contained in:
@@ -12,7 +12,7 @@ import work.slhaf.partner.framework.agent.factory.context.AgentContext;
|
||||
import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistration;
|
||||
import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistry;
|
||||
import work.slhaf.partner.framework.agent.log.LogAdviceProvider;
|
||||
import work.slhaf.partner.framework.agent.log.TraceRecorder;
|
||||
import work.slhaf.partner.framework.agent.log.TraceSinkRegistry;
|
||||
import work.slhaf.partner.framework.agent.model.ModelRuntimeRegistry;
|
||||
import work.slhaf.partner.framework.agent.state.StateCenter;
|
||||
|
||||
@@ -135,9 +135,9 @@ public final class Agent {
|
||||
StateCenter.INSTANCE::save
|
||||
);
|
||||
AgentContext.INSTANCE.addPostShutdownHook(
|
||||
"trace-recorder-close",
|
||||
"trace-sink-registry-close",
|
||||
90,
|
||||
TraceRecorder.INSTANCE::close
|
||||
TraceSinkRegistry::close
|
||||
);
|
||||
AgentContext.INSTANCE.addPostShutdownHook(
|
||||
"config-center-close",
|
||||
|
||||
@@ -2,7 +2,6 @@ package work.slhaf.partner.framework.agent.exception
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject
|
||||
import org.slf4j.LoggerFactory
|
||||
import work.slhaf.partner.framework.agent.config.ConfigCenter
|
||||
import work.slhaf.partner.framework.agent.log.TraceEvent
|
||||
import work.slhaf.partner.framework.agent.log.TraceRecorder
|
||||
|
||||
@@ -96,17 +95,12 @@ interface ExceptionReporter {
|
||||
object LoggerExceptionReporter : ExceptionReporter {
|
||||
|
||||
private val log = LoggerFactory.getLogger(this::class.java)
|
||||
private val tracePath = ConfigCenter.paths.stateDir
|
||||
.resolve("trace")
|
||||
.resolve("log-exception-reporter")
|
||||
.toAbsolutePath()
|
||||
.normalize()
|
||||
|
||||
override fun reporterName(): String = "logger-reporter"
|
||||
|
||||
override fun report(exception: AgentException) {
|
||||
val exceptionReport = exception.toReport().toDetailedString()
|
||||
TraceRecorder.record(TraceEvent(tracePath, JSONObject.of("exception", exceptionReport)))
|
||||
TraceRecorder.record(TraceEvent("exception", JSONObject.of("exception", exceptionReport)))
|
||||
log.error("exception occurred: $exceptionReport")
|
||||
}
|
||||
|
||||
|
||||
@@ -4,25 +4,19 @@ import com.alibaba.fastjson2.JSONException
|
||||
import com.alibaba.fastjson2.JSONObject
|
||||
import org.slf4j.LoggerFactory
|
||||
import work.slhaf.partner.framework.agent.config.Config
|
||||
import work.slhaf.partner.framework.agent.config.ConfigCenter
|
||||
import work.slhaf.partner.framework.agent.config.ConfigRegistration
|
||||
import work.slhaf.partner.framework.agent.config.Configurable
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.time.ZonedDateTime
|
||||
|
||||
object LogAdviceProvider : Configurable, ConfigRegistration<AdviceLoggingConfig> {
|
||||
|
||||
private val logPath = ConfigCenter.paths.stateDir.resolve("trace").normalize().toAbsolutePath()
|
||||
private val _adviceRegistry = mutableSetOf<LogAdvice<*, *>>()
|
||||
val adviceRegistry: Set<LogAdvice<*, *>>
|
||||
get() = _adviceRegistry
|
||||
|
||||
var logLevel = AdviceLoggingConfig.LogLevel.NONE
|
||||
|
||||
init {
|
||||
Files.createDirectories(logPath)
|
||||
}
|
||||
|
||||
@JvmOverloads
|
||||
fun <I, O> createAdvice(
|
||||
@@ -54,8 +48,7 @@ object LogAdviceProvider : Configurable, ConfigRegistration<AdviceLoggingConfig>
|
||||
}
|
||||
|
||||
internal fun record(result: AdviceResult) {
|
||||
val path = logPath.resolve(result.adviceTarget).normalize().toAbsolutePath()
|
||||
val traceEvent = TraceEvent(path, result.toJSON(), result.finishTime.toInstant().toEpochMilli())
|
||||
val traceEvent = TraceEvent(result.adviceTarget, result.toJSON(), result.finishTime.toInstant().toEpochMilli())
|
||||
TraceRecorder.record(traceEvent)
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSONObject
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import org.slf4j.LoggerFactory
|
||||
import work.slhaf.partner.framework.agent.factory.context.AgentContext
|
||||
import work.slhaf.partner.framework.agent.config.ConfigCenter
|
||||
import java.io.BufferedWriter
|
||||
import java.io.OutputStream
|
||||
import java.nio.charset.StandardCharsets
|
||||
@@ -16,11 +16,80 @@ import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.time.ZoneId
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.zip.GZIPOutputStream
|
||||
|
||||
object TraceSinkRegistry {
|
||||
|
||||
private val log = LoggerFactory.getLogger(TraceSinkRegistry::class.java)
|
||||
private val sinks = CopyOnWriteArrayList<TraceSink>()
|
||||
private val closed = AtomicBoolean(false)
|
||||
|
||||
init {
|
||||
register(FileTraceSink)
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun register(sink: TraceSink) {
|
||||
if (closed.get()) {
|
||||
log.warn("TraceSinkRegistry is closed, skip trace sink: {}", sink.javaClass.name)
|
||||
return
|
||||
}
|
||||
if (!sinks.contains(sink)) {
|
||||
sinks.add(sink)
|
||||
}
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun unregister(sink: TraceSink) {
|
||||
sinks.remove(sink)
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun publish(event: TraceEvent) {
|
||||
for (sink in sinks) {
|
||||
runCatching {
|
||||
sink.consume(event)
|
||||
}.onFailure {
|
||||
log.error("Trace sink failed: {}", sink.javaClass.name, it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun close() {
|
||||
if (!closed.compareAndSet(false, true)) {
|
||||
return
|
||||
}
|
||||
sinks.forEach { sink ->
|
||||
runCatching {
|
||||
sink.close()
|
||||
}.onFailure {
|
||||
log.error("Failed to close trace sink: {}", sink.javaClass.name, it)
|
||||
}
|
||||
}
|
||||
sinks.clear()
|
||||
}
|
||||
}
|
||||
|
||||
interface TraceSink : AutoCloseable {
|
||||
fun consume(event: TraceEvent)
|
||||
|
||||
override fun close() {
|
||||
}
|
||||
}
|
||||
|
||||
object TraceRecorder {
|
||||
|
||||
@JvmStatic
|
||||
fun record(event: TraceEvent) {
|
||||
TraceSinkRegistry.publish(event)
|
||||
}
|
||||
}
|
||||
|
||||
object FileTraceSink : TraceSink {
|
||||
|
||||
private const val ACTIVE_FILE_NAME = "active.jsonl"
|
||||
private const val HISTORICAL_DIR_NAME = "historical"
|
||||
private const val ARCHIVED_DIR_NAME = "archived"
|
||||
@@ -40,7 +109,6 @@ object TraceRecorder {
|
||||
private val writerJob: Job
|
||||
|
||||
init {
|
||||
AgentContext.addPostShutdownHook("trace-recorder-close") { close() }
|
||||
writerJob = scope.launch {
|
||||
try {
|
||||
for (event in channel) {
|
||||
@@ -54,18 +122,18 @@ object TraceRecorder {
|
||||
}
|
||||
}
|
||||
|
||||
fun record(event: TraceEvent) {
|
||||
override fun consume(event: TraceEvent) {
|
||||
if (closed.get()) {
|
||||
log.warn("TraceRecorder is closed, skip event for path: {}", event.path)
|
||||
log.warn("FileTraceSink is closed, skip event for key: {}", event.key)
|
||||
return
|
||||
}
|
||||
val result = channel.trySend(event)
|
||||
if (result.isFailure) {
|
||||
log.error("Failed to enqueue trace event for path: {}", event.path, result.exceptionOrNull())
|
||||
log.error("Failed to enqueue trace event for key: {}", event.key, result.exceptionOrNull())
|
||||
}
|
||||
}
|
||||
|
||||
fun close() {
|
||||
override fun close() {
|
||||
if (!closed.compareAndSet(false, true)) {
|
||||
return
|
||||
}
|
||||
@@ -77,7 +145,7 @@ object TraceRecorder {
|
||||
}
|
||||
|
||||
private fun handleEvent(event: TraceEvent) {
|
||||
val basePath = event.path.normalize().toAbsolutePath()
|
||||
val basePath = resolveBasePath(event.key)
|
||||
runCatching {
|
||||
val state = writerStates.getOrPut(basePath) { openWriterState(basePath) }
|
||||
writeEvent(state, event)
|
||||
@@ -85,12 +153,27 @@ object TraceRecorder {
|
||||
rotateActiveFile(state)
|
||||
}
|
||||
}.onFailure {
|
||||
log.error("Failed to persist trace event for path: {}", basePath, it)
|
||||
log.error("Failed to persist trace event for key: {}, path: {}", event.key, basePath, it)
|
||||
}
|
||||
}
|
||||
|
||||
private fun resolveBasePath(key: String): Path {
|
||||
val traceRoot = ConfigCenter.paths.stateDir.resolve("trace").normalize().toAbsolutePath()
|
||||
val normalizedKey = key.trim().ifBlank { "default" }
|
||||
val candidate = traceRoot.resolve(normalizedKey).normalize().toAbsolutePath()
|
||||
if (candidate.startsWith(traceRoot)) {
|
||||
return candidate
|
||||
}
|
||||
return traceRoot.resolve(sanitizeKey(normalizedKey)).normalize().toAbsolutePath()
|
||||
}
|
||||
|
||||
private fun sanitizeKey(key: String): String {
|
||||
return key.replace(Regex("[^A-Za-z0-9._-]+"), "_").trim('_').ifBlank { "default" }
|
||||
}
|
||||
|
||||
private fun writeEvent(state: WriterState, event: TraceEvent) {
|
||||
val json = JSONObject(event.payload)
|
||||
json["traceKey"] = event.key
|
||||
json["timestamp"] = event.timestamp
|
||||
val line = json.toJSONString()
|
||||
state.writer.write(line)
|
||||
@@ -327,7 +410,7 @@ object TraceRecorder {
|
||||
}
|
||||
|
||||
data class TraceEvent @JvmOverloads constructor(
|
||||
val path: Path,
|
||||
val key: String,
|
||||
val payload: JSONObject,
|
||||
val timestamp: Long = System.currentTimeMillis()
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user