From c84d88eab74c1a5b4202b5d94f11e91468eb1dec Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sat, 2 May 2026 21:55:11 +0800 Subject: [PATCH] refactor(communication): simplify reply consumer buffering --- .../communication/CommunicationProducer.java | 4 +- .../module/communication/ReplyDispatcher.kt | 280 ++++++++++-------- 2 files changed, 156 insertions(+), 128 deletions(-) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java index 0c9d2878..0edd9734 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java @@ -30,7 +30,6 @@ import java.util.stream.Collectors; @Slf4j public class CommunicationProducer extends AbstractAgentModule.Running implements ActivateModel { - private static final String INTERRUPTED_MARKER = " [response interrupted due to internal exception]"; private static final String NO_REPLY_MARKER = "NO_REPLY"; private static final String AGENT_MARKER = "[[AGENT]: self]"; private static final String NOT_REPLIED_PREFIX = "[NOT_REPLIED]"; @@ -95,8 +94,7 @@ public class CommunicationProducer extends AbstractAgentModule.Running consumer.onDelta(INTERRUPTED_MARKER)); + this.streamChat(buildChatMessages(runningFlowContext), consumer); updateChatMessages(runningFlowContext, consumer.collectResponse()); cognitionCapability.refreshRecentChatMessagesContext(); } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt index dbdeb2b4..4af72069 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt @@ -1,7 +1,8 @@ package work.slhaf.partner.module.communication import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel +import work.slhaf.partner.framework.agent.exception.AgentRuntimeException +import work.slhaf.partner.framework.agent.exception.ExceptionReporterHandler import work.slhaf.partner.framework.agent.interaction.AgentRuntime import work.slhaf.partner.framework.agent.interaction.data.InteractionEvent.EventStatus import work.slhaf.partner.framework.agent.interaction.data.ReplyEvent @@ -10,164 +11,193 @@ import kotlin.time.Duration.Companion.milliseconds object ReplyDispatcher { - private const val AGGREGATE_WINDOW_MILLIS = 100L - private const val NO_REPLY_MARKER = "NO_REPLY" - - private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) - private val collectorChannel = Channel(Channel.UNLIMITED) - - init { - scope.launch { - var pendingChunk: ReplyChunk? = null - while (true) { - val firstChunk = pendingChunk ?: collectorChannel.receiveCatching().getOrNull() ?: break - pendingChunk = null - val builder = StringBuilder(firstChunk.delta) - - while (true) { - val nextChunk = withTimeoutOrNull(AGGREGATE_WINDOW_MILLIS.milliseconds) { - collectorChannel.receiveCatching() - } ?: break - - if (nextChunk.isClosed) { - flush(builder.toString(), firstChunk.target, firstChunk.responseChannel) - return@launch - } - - val chunk = nextChunk.getOrNull() ?: break - if (chunk.target == firstChunk.target) { - builder.append(chunk.delta) - } else { - pendingChunk = chunk - break - } - } - - flush(builder.toString(), firstChunk.target, firstChunk.responseChannel) - } - } - } - - /** - * flush 将推送至 AgentRuntime 的默认通道。 - */ - private fun flush(content: String, target: String, responseChannel: String?) { - if (content.isEmpty()) { - return - } - val event = ReplyEvent( - status = EventStatus.RUNNING, - target = target, - content = content, - mode = ReplyEvent.ContentMode.APPEND, - done = false - ) - if (responseChannel == null) { - AgentRuntime.response(event) - }else{ - AgentRuntime.response(event, responseChannel) - } - } - fun createConsumer(target: String, responseChannel: String?): StreamChatMessageConsumer = ReplyConsumer( - collectorChannel = collectorChannel, target = target, responseChannel = responseChannel ) - private data class ReplyChunk( - val delta: String, - val target: String, - val responseChannel: String?, - ) +} - private class ReplyConsumer( - private val collectorChannel: Channel, - private val target: String, - private val responseChannel: String? - ) : StreamChatMessageConsumer() { +private class ReplyConsumer( + private val target: String, + private val responseChannel: String? +) : StreamChatMessageConsumer { - private enum class VisibilityState { - UNDECIDED, - NO_REPLY, - VISIBLE - } + private enum class VisibilityState { + UNDECIDED, + NO_REPLY, + VISIBLE + } - private val rawResponse = StringBuilder() - private val undecidedBuffer = StringBuilder() - private var visibilityState = VisibilityState.UNDECIDED + private val rawResponse = StringBuilder() + private val undecidedBuffer = StringBuilder() + private val visibleBuffer = StringBuilder() + private val lock = Any() + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) - override fun onDelta(delta: String) { + private var flushJob: Job? = null + private var visibilityState = VisibilityState.UNDECIDED + private var terminalEmitted = false + + override fun onDelta(delta: String) { + synchronized(lock) { rawResponse.append(delta) routeDelta(delta) } + } - override fun collectResponse(): String { + override fun onComplete() { + synchronized(lock) { finalizeUndecidedBuffer() + flushVisibleBufferLocked() + dispatchTerminalLocked(EventStatus.DONE) + shutdownScopeLocked() + } + } + + override fun onError(exception: AgentRuntimeException) { + synchronized(lock) { + ExceptionReporterHandler.report(exception) + rawResponse.append(INTERRUPTED_MARKER) + routeDelta(INTERRUPTED_MARKER) + finalizeUndecidedBuffer() + flushVisibleBufferLocked() + dispatchTerminalLocked(EventStatus.ERROR) + shutdownScopeLocked() + } + } + + override fun collectResponse(): String { + synchronized(lock) { + finalizeUndecidedBuffer() + flushVisibleBufferLocked() return rawResponse.toString() } + } - private fun routeDelta(delta: String) { - when (visibilityState) { - VisibilityState.NO_REPLY -> return - VisibilityState.VISIBLE -> flushVisible(delta) - VisibilityState.UNDECIDED -> { - undecidedBuffer.append(delta) - resolveVisibility() - } + private fun routeDelta(delta: String) { + when (visibilityState) { + VisibilityState.NO_REPLY -> return + VisibilityState.VISIBLE -> appendVisibleLocked(delta) + VisibilityState.UNDECIDED -> { + undecidedBuffer.append(delta) + resolveVisibility() } } + } - private fun resolveVisibility() { - val content = undecidedBuffer.toString() - if (content.length <= NO_REPLY_MARKER.length) { - if (NO_REPLY_MARKER.startsWith(content)) { - return - } - revealBufferedContent() - return - } - - if (!content.startsWith(NO_REPLY_MARKER)) { - revealBufferedContent() - return - } - - val suffixFirstChar = content[NO_REPLY_MARKER.length] - if (suffixFirstChar == '\n' || suffixFirstChar == '\r') { - visibilityState = VisibilityState.NO_REPLY - undecidedBuffer.setLength(0) + private fun resolveVisibility() { + val content = undecidedBuffer.toString() + if (content.length <= NO_REPLY_MARKER.length) { + if (NO_REPLY_MARKER.startsWith(content)) { return } revealBufferedContent() + return } - private fun finalizeUndecidedBuffer() { - if (visibilityState != VisibilityState.UNDECIDED) { - return - } - if (undecidedBuffer.toString() == NO_REPLY_MARKER) { - visibilityState = VisibilityState.NO_REPLY - undecidedBuffer.setLength(0) - return - } + if (!content.startsWith(NO_REPLY_MARKER)) { revealBufferedContent() + return } - private fun revealBufferedContent() { - visibilityState = VisibilityState.VISIBLE - if (undecidedBuffer.isNotEmpty()) { - flushVisible(undecidedBuffer.toString()) - undecidedBuffer.setLength(0) + val suffixFirstChar = content[NO_REPLY_MARKER.length] + if (suffixFirstChar == '\n' || suffixFirstChar == '\r') { + visibilityState = VisibilityState.NO_REPLY + undecidedBuffer.setLength(0) + return + } + revealBufferedContent() + } + + private fun finalizeUndecidedBuffer() { + if (visibilityState != VisibilityState.UNDECIDED) { + return + } + if (undecidedBuffer.toString() == NO_REPLY_MARKER) { + visibilityState = VisibilityState.NO_REPLY + undecidedBuffer.setLength(0) + return + } + revealBufferedContent() + } + + private fun revealBufferedContent() { + visibilityState = VisibilityState.VISIBLE + if (undecidedBuffer.isNotEmpty()) { + appendVisibleLocked(undecidedBuffer.toString()) + undecidedBuffer.setLength(0) + } + } + + private fun appendVisibleLocked(delta: String) { + if (delta.isEmpty()) { + return + } + visibleBuffer.append(delta) + scheduleFlushLocked() + } + + private fun scheduleFlushLocked() { + flushJob?.cancel() + flushJob = scope.launch { + delay(AGGREGATE_WINDOW_MILLIS.milliseconds) + synchronized(lock) { + flushVisibleBufferLocked() } } + } - private fun flushVisible(delta: String) { - collectorChannel.trySend(ReplyChunk(delta, target, responseChannel)).isSuccess + private fun flushVisibleBufferLocked() { + flushJob?.cancel() + flushJob = null + if (visibleBuffer.isEmpty()) { + return } + val content = visibleBuffer.toString() + visibleBuffer.setLength(0) + dispatchLocked( + ReplyEvent( + status = EventStatus.RUNNING, + target = target, + content = content, + mode = ReplyEvent.ContentMode.APPEND + ) + ) + } - override fun consumeDelta(delta: String?) { + private fun dispatchTerminalLocked(status: EventStatus) { + if (terminalEmitted) { + return } + terminalEmitted = true + dispatchLocked( + ReplyEvent( + status = status, + target = target, + content = "", + mode = ReplyEvent.ContentMode.APPEND + ) + ) + } + private fun dispatchLocked(event: ReplyEvent) { + if (responseChannel == null) { + AgentRuntime.response(event) + } else { + AgentRuntime.response(event, responseChannel) + } + } + + private fun shutdownScopeLocked() { + flushJob?.cancel() + flushJob = null + scope.cancel() + } + + private companion object { + private const val AGGREGATE_WINDOW_MILLIS = 100L + private const val INTERRUPTED_MARKER = "\n[response interrupted due to internal exception]" + private const val NO_REPLY_MARKER = "NO_REPLY" } }