refactor(communication): simplify reply consumer buffering

This commit is contained in:
2026-05-02 21:55:11 +08:00
parent 692c601f17
commit c84d88eab7
2 changed files with 156 additions and 128 deletions

View File

@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
public class CommunicationProducer extends AbstractAgentModule.Running<PartnerRunningFlowContext> implements ActivateModel { public class CommunicationProducer extends AbstractAgentModule.Running<PartnerRunningFlowContext> implements ActivateModel {
private static final String INTERRUPTED_MARKER = " [response interrupted due to internal exception]";
private static final String NO_REPLY_MARKER = "NO_REPLY"; private static final String NO_REPLY_MARKER = "NO_REPLY";
private static final String AGENT_MARKER = "[[AGENT]: self]"; private static final String AGENT_MARKER = "[[AGENT]: self]";
private static final String NOT_REPLIED_PREFIX = "[NOT_REPLIED]"; private static final String NOT_REPLIED_PREFIX = "[NOT_REPLIED]";
@@ -95,8 +94,7 @@ public class CommunicationProducer extends AbstractAgentModule.Running<PartnerRu
private void executeChat(PartnerRunningFlowContext runningFlowContext) { private void executeChat(PartnerRunningFlowContext runningFlowContext) {
StreamChatMessageConsumer consumer = ReplyDispatcher.INSTANCE.createConsumer(runningFlowContext.getTarget(), runningFlowContext.getResopnseChannel()); StreamChatMessageConsumer consumer = ReplyDispatcher.INSTANCE.createConsumer(runningFlowContext.getTarget(), runningFlowContext.getResopnseChannel());
this.streamChat(buildChatMessages(runningFlowContext), consumer) this.streamChat(buildChatMessages(runningFlowContext), consumer);
.onFailure(exception -> consumer.onDelta(INTERRUPTED_MARKER));
updateChatMessages(runningFlowContext, consumer.collectResponse()); updateChatMessages(runningFlowContext, consumer.collectResponse());
cognitionCapability.refreshRecentChatMessagesContext(); cognitionCapability.refreshRecentChatMessagesContext();
} }

View File

@@ -1,7 +1,8 @@
package work.slhaf.partner.module.communication package work.slhaf.partner.module.communication
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import work.slhaf.partner.framework.agent.exception.AgentRuntimeException
import work.slhaf.partner.framework.agent.exception.ExceptionReporterHandler
import work.slhaf.partner.framework.agent.interaction.AgentRuntime import work.slhaf.partner.framework.agent.interaction.AgentRuntime
import work.slhaf.partner.framework.agent.interaction.data.InteractionEvent.EventStatus import work.slhaf.partner.framework.agent.interaction.data.InteractionEvent.EventStatus
import work.slhaf.partner.framework.agent.interaction.data.ReplyEvent import work.slhaf.partner.framework.agent.interaction.data.ReplyEvent
@@ -10,82 +11,17 @@ import kotlin.time.Duration.Companion.milliseconds
object ReplyDispatcher { object ReplyDispatcher {
private const val AGGREGATE_WINDOW_MILLIS = 100L
private const val NO_REPLY_MARKER = "NO_REPLY"
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val collectorChannel = Channel<ReplyChunk>(Channel.UNLIMITED)
init {
scope.launch {
var pendingChunk: ReplyChunk? = null
while (true) {
val firstChunk = pendingChunk ?: collectorChannel.receiveCatching().getOrNull() ?: break
pendingChunk = null
val builder = StringBuilder(firstChunk.delta)
while (true) {
val nextChunk = withTimeoutOrNull(AGGREGATE_WINDOW_MILLIS.milliseconds) {
collectorChannel.receiveCatching()
} ?: break
if (nextChunk.isClosed) {
flush(builder.toString(), firstChunk.target, firstChunk.responseChannel)
return@launch
}
val chunk = nextChunk.getOrNull() ?: break
if (chunk.target == firstChunk.target) {
builder.append(chunk.delta)
} else {
pendingChunk = chunk
break
}
}
flush(builder.toString(), firstChunk.target, firstChunk.responseChannel)
}
}
}
/**
* flush 将推送至 AgentRuntime 的默认通道。
*/
private fun flush(content: String, target: String, responseChannel: String?) {
if (content.isEmpty()) {
return
}
val event = ReplyEvent(
status = EventStatus.RUNNING,
target = target,
content = content,
mode = ReplyEvent.ContentMode.APPEND,
done = false
)
if (responseChannel == null) {
AgentRuntime.response(event)
}else{
AgentRuntime.response(event, responseChannel)
}
}
fun createConsumer(target: String, responseChannel: String?): StreamChatMessageConsumer = ReplyConsumer( fun createConsumer(target: String, responseChannel: String?): StreamChatMessageConsumer = ReplyConsumer(
collectorChannel = collectorChannel,
target = target, target = target,
responseChannel = responseChannel responseChannel = responseChannel
) )
private data class ReplyChunk( }
val delta: String,
val target: String,
val responseChannel: String?,
)
private class ReplyConsumer( private class ReplyConsumer(
private val collectorChannel: Channel<ReplyChunk>,
private val target: String, private val target: String,
private val responseChannel: String? private val responseChannel: String?
) : StreamChatMessageConsumer() { ) : StreamChatMessageConsumer {
private enum class VisibilityState { private enum class VisibilityState {
UNDECIDED, UNDECIDED,
@@ -95,22 +31,54 @@ object ReplyDispatcher {
private val rawResponse = StringBuilder() private val rawResponse = StringBuilder()
private val undecidedBuffer = StringBuilder() private val undecidedBuffer = StringBuilder()
private val visibleBuffer = StringBuilder()
private val lock = Any()
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private var flushJob: Job? = null
private var visibilityState = VisibilityState.UNDECIDED private var visibilityState = VisibilityState.UNDECIDED
private var terminalEmitted = false
override fun onDelta(delta: String) { override fun onDelta(delta: String) {
synchronized(lock) {
rawResponse.append(delta) rawResponse.append(delta)
routeDelta(delta) routeDelta(delta)
} }
}
override fun onComplete() {
synchronized(lock) {
finalizeUndecidedBuffer()
flushVisibleBufferLocked()
dispatchTerminalLocked(EventStatus.DONE)
shutdownScopeLocked()
}
}
override fun onError(exception: AgentRuntimeException) {
synchronized(lock) {
ExceptionReporterHandler.report(exception)
rawResponse.append(INTERRUPTED_MARKER)
routeDelta(INTERRUPTED_MARKER)
finalizeUndecidedBuffer()
flushVisibleBufferLocked()
dispatchTerminalLocked(EventStatus.ERROR)
shutdownScopeLocked()
}
}
override fun collectResponse(): String { override fun collectResponse(): String {
synchronized(lock) {
finalizeUndecidedBuffer() finalizeUndecidedBuffer()
flushVisibleBufferLocked()
return rawResponse.toString() return rawResponse.toString()
} }
}
private fun routeDelta(delta: String) { private fun routeDelta(delta: String) {
when (visibilityState) { when (visibilityState) {
VisibilityState.NO_REPLY -> return VisibilityState.NO_REPLY -> return
VisibilityState.VISIBLE -> flushVisible(delta) VisibilityState.VISIBLE -> appendVisibleLocked(delta)
VisibilityState.UNDECIDED -> { VisibilityState.UNDECIDED -> {
undecidedBuffer.append(delta) undecidedBuffer.append(delta)
resolveVisibility() resolveVisibility()
@@ -157,17 +125,79 @@ object ReplyDispatcher {
private fun revealBufferedContent() { private fun revealBufferedContent() {
visibilityState = VisibilityState.VISIBLE visibilityState = VisibilityState.VISIBLE
if (undecidedBuffer.isNotEmpty()) { if (undecidedBuffer.isNotEmpty()) {
flushVisible(undecidedBuffer.toString()) appendVisibleLocked(undecidedBuffer.toString())
undecidedBuffer.setLength(0) undecidedBuffer.setLength(0)
} }
} }
private fun flushVisible(delta: String) { private fun appendVisibleLocked(delta: String) {
collectorChannel.trySend(ReplyChunk(delta, target, responseChannel)).isSuccess if (delta.isEmpty()) {
return
}
visibleBuffer.append(delta)
scheduleFlushLocked()
} }
override fun consumeDelta(delta: String?) { private fun scheduleFlushLocked() {
flushJob?.cancel()
flushJob = scope.launch {
delay(AGGREGATE_WINDOW_MILLIS.milliseconds)
synchronized(lock) {
flushVisibleBufferLocked()
}
}
} }
private fun flushVisibleBufferLocked() {
flushJob?.cancel()
flushJob = null
if (visibleBuffer.isEmpty()) {
return
}
val content = visibleBuffer.toString()
visibleBuffer.setLength(0)
dispatchLocked(
ReplyEvent(
status = EventStatus.RUNNING,
target = target,
content = content,
mode = ReplyEvent.ContentMode.APPEND
)
)
}
private fun dispatchTerminalLocked(status: EventStatus) {
if (terminalEmitted) {
return
}
terminalEmitted = true
dispatchLocked(
ReplyEvent(
status = status,
target = target,
content = "",
mode = ReplyEvent.ContentMode.APPEND
)
)
}
private fun dispatchLocked(event: ReplyEvent) {
if (responseChannel == null) {
AgentRuntime.response(event)
} else {
AgentRuntime.response(event, responseChannel)
}
}
private fun shutdownScopeLocked() {
flushJob?.cancel()
flushJob = null
scope.cancel()
}
private companion object {
private const val AGGREGATE_WINDOW_MILLIS = 100L
private const val INTERRUPTED_MARKER = "\n[response interrupted due to internal exception]"
private const val NO_REPLY_MARKER = "NO_REPLY"
} }
} }