From afb896e6db9efc1459988c44ac1e1aa67cd22c7c Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sat, 2 May 2026 16:32:25 +0800 Subject: [PATCH] refactor(context): support assign response channel while running flow context creating --- .../communication/CommunicationProducer.java | 2 +- .../module/communication/ReplyDispatcher.kt | 19 ++++++++++----- .../runtime/PartnerRunningFlowContext.kt | 23 +++++++++++++------ .../runtime/gateway/WebSocketGateway.java | 2 +- .../onebot/gateway/OnebotGateway.java | 2 +- .../interaction/flow/RunningFlowContext.kt | 3 ++- 6 files changed, 34 insertions(+), 17 deletions(-) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java index 46336043..0c9d2878 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java @@ -94,7 +94,7 @@ public class CommunicationProducer extends AbstractAgentModule.Running consumer.onDelta(INTERRUPTED_MARKER)); updateChatMessages(runningFlowContext, consumer.collectResponse()); diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt index 5e2b1d9d..dbdeb2b4 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt @@ -30,7 +30,7 @@ object ReplyDispatcher { } ?: break if (nextChunk.isClosed) { - flush(builder.toString(), firstChunk.target) + flush(builder.toString(), firstChunk.target, firstChunk.responseChannel) return@launch } @@ -43,7 +43,7 @@ object ReplyDispatcher { } } - flush(builder.toString(), firstChunk.target) + flush(builder.toString(), firstChunk.target, firstChunk.responseChannel) } } } @@ -51,7 +51,7 @@ object ReplyDispatcher { /** * flush 将推送至 AgentRuntime 的默认通道。 */ - private fun flush(content: String, target: String) { + private fun flush(content: String, target: String, responseChannel: String?) { if (content.isEmpty()) { return } @@ -62,22 +62,29 @@ object ReplyDispatcher { mode = ReplyEvent.ContentMode.APPEND, done = false ) - AgentRuntime.response(event) + if (responseChannel == null) { + AgentRuntime.response(event) + }else{ + AgentRuntime.response(event, responseChannel) + } } - fun createConsumer(target: String): StreamChatMessageConsumer = ReplyConsumer( + fun createConsumer(target: String, responseChannel: String?): StreamChatMessageConsumer = ReplyConsumer( collectorChannel = collectorChannel, target = target, + responseChannel = responseChannel ) private data class ReplyChunk( val delta: String, val target: String, + val responseChannel: String?, ) private class ReplyConsumer( private val collectorChannel: Channel, private val target: String, + private val responseChannel: String? ) : StreamChatMessageConsumer() { private enum class VisibilityState { @@ -156,7 +163,7 @@ object ReplyDispatcher { } private fun flushVisible(delta: String) { - collectorChannel.trySend(ReplyChunk(delta, target)).isSuccess + collectorChannel.trySend(ReplyChunk(delta, target, responseChannel)).isSuccess } override fun consumeDelta(delta: String?) { diff --git a/Partner-Core/src/main/java/work/slhaf/partner/runtime/PartnerRunningFlowContext.kt b/Partner-Core/src/main/java/work/slhaf/partner/runtime/PartnerRunningFlowContext.kt index 4f1f1c8f..b81c74fe 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/runtime/PartnerRunningFlowContext.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/runtime/PartnerRunningFlowContext.kt @@ -8,8 +8,9 @@ import work.slhaf.partner.framework.agent.interaction.flow.RunningFlowContext class PartnerRunningFlowContext private constructor( override val source: String, inputs: List, - firstInputEpochMillis: Long -) : RunningFlowContext(inputs, firstInputEpochMillis) { + firstInputEpochMillis: Long, + responseChannel: String? = null +) : RunningFlowContext(inputs, firstInputEpochMillis, responseChannel) { companion object { @@ -32,12 +33,18 @@ class PartnerRunningFlowContext private constructor( @JvmStatic @JvmOverloads - fun fromUser(userId: String, input: String, receivedAtMillis: Long = System.currentTimeMillis()) = + fun fromUser( + userId: String, + input: String, + receivedAtMillis: Long = System.currentTimeMillis(), + responseChannel: String? = null + ) = PartnerRunningFlowContext( SourceTag.buildUserSource(userId), listOf(InputEntry(0L, input)), - receivedAtMillis - ) + receivedAtMillis, + responseChannel + ).apply { this.target = userId } @JvmStatic @JvmOverloads @@ -45,7 +52,8 @@ class PartnerRunningFlowContext private constructor( PartnerRunningFlowContext( SourceTag.buildAgentSource(), listOf(InputEntry(0L, input)), - receivedAtMillis + receivedAtMillis, + null ).apply { putUserInfo(InfoKeys.PLATFORM, SOURCE_SELF_PLATFORM) putUserInfo(InfoKeys.NICKNAME, SOURCE_SELF_NICKNAME) @@ -56,7 +64,8 @@ class PartnerRunningFlowContext private constructor( return PartnerRunningFlowContext( source = source, inputs = inputs, - firstInputEpochMillis = System.currentTimeMillis() + firstInputEpochMillis = System.currentTimeMillis(), + resopnseChannel ) } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/runtime/gateway/WebSocketGateway.java b/Partner-Core/src/main/java/work/slhaf/partner/runtime/gateway/WebSocketGateway.java index dd87d928..237fd88d 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/runtime/gateway/WebSocketGateway.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/runtime/gateway/WebSocketGateway.java @@ -52,7 +52,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway, - private var firstInputEpochMillis: Long + private var firstInputEpochMillis: Long, + val resopnseChannel: String? = null ) { /** * 消息来源: 由谁发出