refactor(communication): remove channel assignment in ReplyDispatcher

This commit is contained in:
2026-04-08 20:49:37 +08:00
parent f233c5ce32
commit 0528890d60

View File

@@ -12,9 +12,6 @@ object ReplyDispatcher {
private const val AGGREGATE_WINDOW_MILLIS = 100L private const val AGGREGATE_WINDOW_MILLIS = 100L
// TODO 通过配置中心动态指定响应通道
private var channelName: String? = null
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val collectorChannel = Channel<ReplyChunk>(Channel.UNLIMITED) private val collectorChannel = Channel<ReplyChunk>(Channel.UNLIMITED)
@@ -32,12 +29,12 @@ object ReplyDispatcher {
} ?: break } ?: break
if (nextChunk.isClosed) { if (nextChunk.isClosed) {
flush(builder.toString(), firstChunk.target, firstChunk.channelName) flush(builder.toString(), firstChunk.target)
return@launch return@launch
} }
val chunk = nextChunk.getOrNull() ?: break val chunk = nextChunk.getOrNull() ?: break
if (chunk.target == firstChunk.target && chunk.channelName == firstChunk.channelName) { if (chunk.target == firstChunk.target) {
builder.append(chunk.delta) builder.append(chunk.delta)
} else { } else {
pendingChunk = chunk pendingChunk = chunk
@@ -45,12 +42,15 @@ object ReplyDispatcher {
} }
} }
flush(builder.toString(), firstChunk.target, firstChunk.channelName) flush(builder.toString(), firstChunk.target)
} }
} }
} }
private fun flush(content: String, target: String, channelName: String?) { /**
* flush 将推送至 AgentRuntime 的默认通道。
*/
private fun flush(content: String, target: String) {
if (content.isEmpty()) { if (content.isEmpty()) {
return return
} }
@@ -61,34 +61,27 @@ object ReplyDispatcher {
mode = Reply.ContentMode.APPEND, mode = Reply.ContentMode.APPEND,
done = false done = false
) )
if (channelName.isNullOrBlank()) {
AgentRuntime.response(event) AgentRuntime.response(event)
} else {
AgentRuntime.response(event, channelName)
}
} }
fun createConsumer(target: String): StreamChatMessageConsumer = ReplyConsumer( fun createConsumer(target: String): StreamChatMessageConsumer = ReplyConsumer(
collectorChannel = collectorChannel, collectorChannel = collectorChannel,
target = target, target = target,
channelName = channelName
) )
private data class ReplyChunk( private data class ReplyChunk(
val delta: String, val delta: String,
val target: String, val target: String,
val channelName: String?
) )
private class ReplyConsumer( private class ReplyConsumer(
private val collectorChannel: Channel<ReplyChunk>, private val collectorChannel: Channel<ReplyChunk>,
private val target: String, private val target: String,
private val channelName: String?
) : StreamChatMessageConsumer() { ) : StreamChatMessageConsumer() {
override fun consumeDelta(delta: String?) { override fun consumeDelta(delta: String?) {
if (delta != null) { if (delta != null) {
collectorChannel.trySend(ReplyChunk(delta, target, channelName)).isSuccess collectorChannel.trySend(ReplyChunk(delta, target)).isSuccess
} }
} }