refactor(context): support assign response channel while running flow context creating

This commit is contained in:
2026-05-02 16:32:25 +08:00
parent 2b575df3f9
commit afb896e6db
6 changed files with 34 additions and 17 deletions

View File

@@ -94,7 +94,7 @@ public class CommunicationProducer extends AbstractAgentModule.Running<PartnerRu
} }
private void executeChat(PartnerRunningFlowContext runningFlowContext) { private void executeChat(PartnerRunningFlowContext runningFlowContext) {
StreamChatMessageConsumer consumer = ReplyDispatcher.INSTANCE.createConsumer(runningFlowContext.getTarget()); StreamChatMessageConsumer consumer = ReplyDispatcher.INSTANCE.createConsumer(runningFlowContext.getTarget(), runningFlowContext.getResopnseChannel());
this.streamChat(buildChatMessages(runningFlowContext), consumer) this.streamChat(buildChatMessages(runningFlowContext), consumer)
.onFailure(exception -> consumer.onDelta(INTERRUPTED_MARKER)); .onFailure(exception -> consumer.onDelta(INTERRUPTED_MARKER));
updateChatMessages(runningFlowContext, consumer.collectResponse()); updateChatMessages(runningFlowContext, consumer.collectResponse());

View File

@@ -30,7 +30,7 @@ object ReplyDispatcher {
} ?: break } ?: break
if (nextChunk.isClosed) { if (nextChunk.isClosed) {
flush(builder.toString(), firstChunk.target) flush(builder.toString(), firstChunk.target, firstChunk.responseChannel)
return@launch return@launch
} }
@@ -43,7 +43,7 @@ object ReplyDispatcher {
} }
} }
flush(builder.toString(), firstChunk.target) flush(builder.toString(), firstChunk.target, firstChunk.responseChannel)
} }
} }
} }
@@ -51,7 +51,7 @@ object ReplyDispatcher {
/** /**
* flush 将推送至 AgentRuntime 的默认通道。 * flush 将推送至 AgentRuntime 的默认通道。
*/ */
private fun flush(content: String, target: String) { private fun flush(content: String, target: String, responseChannel: String?) {
if (content.isEmpty()) { if (content.isEmpty()) {
return return
} }
@@ -62,22 +62,29 @@ object ReplyDispatcher {
mode = ReplyEvent.ContentMode.APPEND, mode = ReplyEvent.ContentMode.APPEND,
done = false done = false
) )
AgentRuntime.response(event) if (responseChannel == null) {
AgentRuntime.response(event)
}else{
AgentRuntime.response(event, responseChannel)
}
} }
fun createConsumer(target: String): StreamChatMessageConsumer = ReplyConsumer( fun createConsumer(target: String, responseChannel: String?): StreamChatMessageConsumer = ReplyConsumer(
collectorChannel = collectorChannel, collectorChannel = collectorChannel,
target = target, target = target,
responseChannel = responseChannel
) )
private data class ReplyChunk( private data class ReplyChunk(
val delta: String, val delta: String,
val target: String, val target: String,
val responseChannel: String?,
) )
private class ReplyConsumer( private class ReplyConsumer(
private val collectorChannel: Channel<ReplyChunk>, private val collectorChannel: Channel<ReplyChunk>,
private val target: String, private val target: String,
private val responseChannel: String?
) : StreamChatMessageConsumer() { ) : StreamChatMessageConsumer() {
private enum class VisibilityState { private enum class VisibilityState {
@@ -156,7 +163,7 @@ object ReplyDispatcher {
} }
private fun flushVisible(delta: String) { private fun flushVisible(delta: String) {
collectorChannel.trySend(ReplyChunk(delta, target)).isSuccess collectorChannel.trySend(ReplyChunk(delta, target, responseChannel)).isSuccess
} }
override fun consumeDelta(delta: String?) { override fun consumeDelta(delta: String?) {

View File

@@ -8,8 +8,9 @@ import work.slhaf.partner.framework.agent.interaction.flow.RunningFlowContext
class PartnerRunningFlowContext private constructor( class PartnerRunningFlowContext private constructor(
override val source: String, override val source: String,
inputs: List<InputEntry>, inputs: List<InputEntry>,
firstInputEpochMillis: Long firstInputEpochMillis: Long,
) : RunningFlowContext(inputs, firstInputEpochMillis) { responseChannel: String? = null
) : RunningFlowContext(inputs, firstInputEpochMillis, responseChannel) {
companion object { companion object {
@@ -32,12 +33,18 @@ class PartnerRunningFlowContext private constructor(
@JvmStatic @JvmStatic
@JvmOverloads @JvmOverloads
fun fromUser(userId: String, input: String, receivedAtMillis: Long = System.currentTimeMillis()) = fun fromUser(
userId: String,
input: String,
receivedAtMillis: Long = System.currentTimeMillis(),
responseChannel: String? = null
) =
PartnerRunningFlowContext( PartnerRunningFlowContext(
SourceTag.buildUserSource(userId), SourceTag.buildUserSource(userId),
listOf(InputEntry(0L, input)), listOf(InputEntry(0L, input)),
receivedAtMillis receivedAtMillis,
) responseChannel
).apply { this.target = userId }
@JvmStatic @JvmStatic
@JvmOverloads @JvmOverloads
@@ -45,7 +52,8 @@ class PartnerRunningFlowContext private constructor(
PartnerRunningFlowContext( PartnerRunningFlowContext(
SourceTag.buildAgentSource(), SourceTag.buildAgentSource(),
listOf(InputEntry(0L, input)), listOf(InputEntry(0L, input)),
receivedAtMillis receivedAtMillis,
null
).apply { ).apply {
putUserInfo(InfoKeys.PLATFORM, SOURCE_SELF_PLATFORM) putUserInfo(InfoKeys.PLATFORM, SOURCE_SELF_PLATFORM)
putUserInfo(InfoKeys.NICKNAME, SOURCE_SELF_NICKNAME) putUserInfo(InfoKeys.NICKNAME, SOURCE_SELF_NICKNAME)
@@ -56,7 +64,8 @@ class PartnerRunningFlowContext private constructor(
return PartnerRunningFlowContext( return PartnerRunningFlowContext(
source = source, source = source,
inputs = inputs, inputs = inputs,
firstInputEpochMillis = System.currentTimeMillis() firstInputEpochMillis = System.currentTimeMillis(),
resopnseChannel
) )
} }

View File

@@ -52,7 +52,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<In
@Override @Override
public PartnerRunningFlowContext parseRunningFlowContext(InputData inputData) { public PartnerRunningFlowContext parseRunningFlowContext(InputData inputData) {
PartnerRunningFlowContext context = PartnerRunningFlowContext.fromUser(inputData.getSource(), inputData.getContent()); PartnerRunningFlowContext context = PartnerRunningFlowContext.fromUser(inputData.getSource(), inputData.getContent(),System.currentTimeMillis(), getChannelName());
inputData.getMeta().forEach(context::putUserInfo); inputData.getMeta().forEach(context::putUserInfo);
return context; return context;
} }

View File

@@ -46,7 +46,7 @@ public class OnebotGateway extends WebSocketServer implements AgentGateway<Input
@Override @Override
public PartnerRunningFlowContext parseRunningFlowContext(InputData inputData) { public PartnerRunningFlowContext parseRunningFlowContext(InputData inputData) {
PartnerRunningFlowContext context = PartnerRunningFlowContext.fromUser(inputData.getSource(), inputData.getContent()); PartnerRunningFlowContext context = PartnerRunningFlowContext.fromUser(inputData.getSource(), inputData.getContent(),System.currentTimeMillis(), getChannelName());
inputData.getMeta().forEach(context::putUserInfo); inputData.getMeta().forEach(context::putUserInfo);
return context; return context;
} }

View File

@@ -12,7 +12,8 @@ import kotlin.math.min
*/ */
abstract class RunningFlowContext protected constructor( abstract class RunningFlowContext protected constructor(
inputs: List<InputEntry>, inputs: List<InputEntry>,
private var firstInputEpochMillis: Long private var firstInputEpochMillis: Long,
val resopnseChannel: String? = null
) { ) {
/** /**
* 消息来源: 由谁发出 * 消息来源: 由谁发出