From d70054cd9ba230124d17f738b198b6eb6904f5f5 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Mon, 30 Mar 2026 22:46:02 +0800 Subject: [PATCH] refactor(interaction): decouple gateway IO from runtime response flow - replace interaction adapter/input-output DTO flow with InputData and InteractionEvent - introduce ResponseChannel and default LogChannel for runtime response dispatch - let AgentGateway parse running context directly and submit turns asynchronously - update WebSocketGateway to emit serialized interaction events - simplify cognition turn initiation to fire-and-forget semantics - streamline running flow context source construction and runtime module execution --- .../core/cognition/CognitionCapability.java | 2 +- .../partner/core/cognition/CognitionCore.java | 7 +- .../BuiltinCapabilityActionProvider.java | 5 +- .../PartnerInteractionAdapter.java | 40 --------- .../runtime/interaction/WebSocketGateway.java | 34 ++++---- .../interaction/data/PartnerInputData.java | 13 --- .../interaction/data/PartnerOutputData.java | 13 --- .../data/context/PartnerRunningFlowContext.kt | 24 +++--- .../runtime/interaction/AgentGateway.java | 17 ++-- .../interaction/AgentInteractionAdapter.kt | 26 ------ .../agent/runtime/interaction/AgentRuntime.kt | 83 +++++++++--------- .../runtime/interaction/ResponseChannel.kt | 33 ++++++++ .../interaction/data/AgentInputData.java | 9 -- .../interaction/data/AgentOutputData.java | 16 ---- .../runtime/interaction/data/InputData.kt | 14 ++++ .../interaction/data/InteractionData.java | 12 --- .../interaction/data/InteractionEvent.kt | 84 +++++++++++++++++++ .../interaction/flow/RunningFlowContext.kt | 1 - 18 files changed, 213 insertions(+), 220 deletions(-) delete mode 100644 Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/PartnerInteractionAdapter.java delete mode 100644 Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/data/PartnerInputData.java delete mode 100644 Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/data/PartnerOutputData.java delete mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentInteractionAdapter.kt create mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/ResponseChannel.kt delete mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/AgentInputData.java delete mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/AgentOutputData.java create mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InputData.kt delete mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InteractionData.java create mode 100644 Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InteractionEvent.kt diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCapability.java b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCapability.java index 05da02f5..3c5d5c78 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCapability.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCapability.java @@ -9,7 +9,7 @@ import java.util.concurrent.locks.Lock; @Capability("cognition") public interface CognitionCapability { - String initiateTurn(String input, String target, String... skippedModules); + void initiateTurn(String input, String target, String... skippedModules); ContextWorkspace contextWorkspace(); diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCore.java b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCore.java index 9ead2289..95888db5 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCore.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCore.java @@ -46,16 +46,15 @@ public class CognitionCore extends PartnerCore { } @CapabilityMethod - public String initiateTurn(String input, String target, String... skippedModules) { - PartnerRunningFlowContext primaryContext = PartnerRunningFlowContext.Companion.fromSelf(input); + public void initiateTurn(String input, String target, String... skippedModules) { + PartnerRunningFlowContext primaryContext = PartnerRunningFlowContext.fromSelf(input); primaryContext.setTarget(target); if (skippedModules != null) { for (String skippedModule : skippedModules) { primaryContext.addSkippedModule(skippedModule); } } - PartnerRunningFlowContext executedContext = AgentRuntime.INSTANCE.submit(primaryContext); - return executedContext.getCoreResponse().getString("text"); + AgentRuntime.INSTANCE.submit(primaryContext); } @CapabilityMethod diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCapabilityActionProvider.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCapabilityActionProvider.java index a460f2ab..e94dadbb 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCapabilityActionProvider.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinCapabilityActionProvider.java @@ -133,14 +133,15 @@ class BuiltinCapabilityActionProvider implements BuiltinActionProvider { Set.of(), false, JSONObject.of( - "answer", "The answer of the Agent Turn." + "result", "turn initiate result" ) ); Function, String> invoker = params -> { String input = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "input"); String target = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "target"); - return cognitionCapability.initiateTurn(input, target); + cognitionCapability.initiateTurn(input, target); + return "agent turn initiated"; }; return new BuiltinActionRegistry.BuiltinActionDefinition( diff --git a/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/PartnerInteractionAdapter.java b/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/PartnerInteractionAdapter.java deleted file mode 100644 index 25045f60..00000000 --- a/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/PartnerInteractionAdapter.java +++ /dev/null @@ -1,40 +0,0 @@ -package work.slhaf.partner.runtime.interaction; - -import org.jetbrains.annotations.NotNull; -import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter; -import work.slhaf.partner.runtime.interaction.data.PartnerInputData; -import work.slhaf.partner.runtime.interaction.data.PartnerOutputData; -import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; - -import java.time.ZonedDateTime; - -public class PartnerInteractionAdapter extends AgentInteractionAdapter { - @NotNull - @Override - protected PartnerOutputData parseOutputData(PartnerRunningFlowContext outputContext) { - PartnerOutputData outputData = new PartnerOutputData(); - outputData.setCode(outputContext.getStatus().getOk() ? 1 : 0); - outputData.setContent(getContent(outputContext)); - outputData.setUserInfo(outputContext.getTarget()); - outputData.setDateTime(ZonedDateTime.now().toLocalDateTime()); - return outputData; - } - - private String getContent(PartnerRunningFlowContext outputContext) { - StringBuilder str = new StringBuilder(); - str.append(outputContext.getCoreResponse().getString("text")); - if (!outputContext.getStatus().getOk()) { - str.append("\r\n").append("\r\n错误信息:\r\n").append(outputContext.getStatus().getErrors()); - } - return str.toString(); - } - - @NotNull - @Override - protected PartnerRunningFlowContext parseInputData(PartnerInputData inputData) { - return PartnerRunningFlowContext.Companion.fromUser(inputData.getUserInfo(), - inputData.getContent(), - inputData.getPlatform(), - inputData.getUserNickName()); - } -} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/WebSocketGateway.java b/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/WebSocketGateway.java index bf6f2a68..c37e9674 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/WebSocketGateway.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/WebSocketGateway.java @@ -1,6 +1,5 @@ package work.slhaf.partner.runtime.interaction; -import cn.hutool.json.JSONUtil; import com.alibaba.fastjson2.JSONObject; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -8,12 +7,12 @@ import org.java_websocket.WebSocket; import org.java_websocket.framing.Framedata; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; +import org.jetbrains.annotations.NotNull; import work.slhaf.partner.api.agent.runtime.config.AgentConfigLoader; import work.slhaf.partner.api.agent.runtime.interaction.AgentGateway; -import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter; +import work.slhaf.partner.api.agent.runtime.interaction.data.InputData; +import work.slhaf.partner.api.agent.runtime.interaction.data.InteractionEvent; import work.slhaf.partner.common.config.PartnerAgentConfigLoader; -import work.slhaf.partner.runtime.interaction.data.PartnerInputData; -import work.slhaf.partner.runtime.interaction.data.PartnerOutputData; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import java.net.InetSocketAddress; @@ -22,7 +21,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j -public class WebSocketGateway extends WebSocketServer implements AgentGateway { +public class WebSocketGateway extends WebSocketServer implements AgentGateway { private static final long HEARTBEAT_INTERVAL = 10_000; @@ -50,21 +49,23 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway { if (webSocket.isOpen()) { - webSocket.send(JSONUtil.toJsonStr(outputData)); + webSocket.send(JSONObject.toJSONString(event)); } else { log.warn("用户不在线: {}", userInfo); } }); } - @Override - public AgentInteractionAdapter adapter() { - return new PartnerInteractionAdapter(); - } - private void startHeartbeatThread() { executor.execute(() -> { while (!Thread.interrupted()) { @@ -142,8 +143,8 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway { +public interface AgentGateway extends ResponseChannel { void launch(); default void receive(I inputData) { - O outputData = adapter().submit(inputData); - send(outputData); + C parsedContext = parseRunningFlowContext(inputData); + AgentRuntime.INSTANCE.submit(parsedContext); } - void send(O outputData); + C parseRunningFlowContext(I inputData); - /** - * 通过adapter提供的receive、send方法进行与客户端的交互行为 - * - * @return adapter实例 - */ - AgentInteractionAdapter adapter(); } diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentInteractionAdapter.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentInteractionAdapter.kt deleted file mode 100644 index be015bc3..00000000 --- a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentInteractionAdapter.kt +++ /dev/null @@ -1,26 +0,0 @@ -package work.slhaf.partner.api.agent.runtime.interaction - -import work.slhaf.partner.api.agent.runtime.interaction.data.AgentInputData -import work.slhaf.partner.api.agent.runtime.interaction.data.AgentOutputData -import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext - -abstract class AgentInteractionAdapter< - I : AgentInputData, - O : AgentOutputData, - C : RunningFlowContext - > { - - protected val runtime: AgentRuntime = AgentRuntime // 由 AgentContext 持有实例 - - fun submit(inputData: I): O { - val ctx = parseInputData(inputData) - - val result = runtime.submit(ctx) - - return parseOutputData(result) - } - - protected abstract fun parseOutputData(outputContext: C): O - - protected abstract fun parseInputData(inputData: I): C -} \ No newline at end of file diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentRuntime.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentRuntime.kt index 568d0349..2101ec3f 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentRuntime.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentRuntime.kt @@ -5,89 +5,86 @@ import kotlinx.coroutines.channels.Channel import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule import work.slhaf.partner.api.agent.factory.context.AgentContext import work.slhaf.partner.api.agent.factory.context.ModuleContextData +import work.slhaf.partner.api.agent.runtime.interaction.data.InteractionEvent import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext object AgentRuntime { private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) - private val channel = - Channel>(Channel.UNLIMITED) + private val channel = Channel(Channel.UNLIMITED) + private val responseChannels = mutableMapOf( + LogChannel.channelName to LogChannel + ) + + // TODO 暂时取 log_channel 为默认回复通道,若为空则只打印信息。后续将配合配置中心替换通过配置文件进行指定 + private val defaultChannel: String = "log_channel" @Volatile - private var runningModules: - Map>>> = - emptyMap() + private var runningModules: Map>> = emptyMap() init { scope.launch { - for (req in channel) { - val result = executeTurn(req.context) - req.deferred.complete(result) + for (ctx in channel) { + executeTurn(ctx) } } } - fun refreshRunningModules() { - runningModules = buildRunningModules() + fun registerResponseChannel(channelName: String, responseChannel: ResponseChannel) { + responseChannels[channelName] = responseChannel } - fun submit(context: C): C = runBlocking { - val deferred = CompletableDeferred() - channel.send(TurnRequest(context, deferred)) - @Suppress("UNCHECKED_CAST") - (return@runBlocking deferred.await() as C) + @JvmOverloads + fun response(event: InteractionEvent, channelName: String = defaultChannel) { + val channel = responseChannels[channelName] + if (channel == null) { + responseChannels[defaultChannel]!!.response(event) + } else { + channel.response(event) + } } - private suspend fun executeTurn( - runningFlowContext: RunningFlowContext - ): RunningFlowContext { + fun submit(context: C) = runBlocking { + channel.send(context) + } + + private suspend fun executeTurn(runningFlowContext: RunningFlowContext) { if (runningModules.isEmpty()) { refreshRunningModules() } - try { - for (modules in runningModules.values) { - executeOrder(modules, runningFlowContext) - } - } catch (e: Exception) { - runningFlowContext.status.errors.add(e.localizedMessage) + for (modules in runningModules.values) { + executeOrder(modules, runningFlowContext) } - return runningFlowContext + } + + private fun refreshRunningModules() { + runningModules = AgentContext.modules.values + .filterIsInstance>>() + .filter { it.enabled } + .groupBy { it.order } + .mapValues { it.value.map { contextData -> contextData.instance } } + .toSortedMap() } private suspend fun executeOrder( - modules: List>>, + modules: List>, runningFlowContext: RunningFlowContext ) { coroutineScope { val jobs = modules.map { module -> async { - if (runningFlowContext.skippedModules.contains(module.instance.moduleName)) { + if (runningFlowContext.skippedModules.contains(module.moduleName)) { return@async } - module.instance.execute(runningFlowContext) + module.execute(runningFlowContext) } } jobs.awaitAll() } } - private fun buildRunningModules(): - Map>>> { - - return AgentContext.modules - .values - .filterIsInstance>>() - .filter { it.enabled } - .groupBy { it.order } - .toSortedMap() - } - - private data class TurnRequest( - val context: C, - val deferred: CompletableDeferred - ) } \ No newline at end of file diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/ResponseChannel.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/ResponseChannel.kt new file mode 100644 index 00000000..c0aa40e5 --- /dev/null +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/ResponseChannel.kt @@ -0,0 +1,33 @@ +package work.slhaf.partner.api.agent.runtime.interaction + +import com.alibaba.fastjson2.JSONObject +import org.slf4j.LoggerFactory +import work.slhaf.partner.api.agent.runtime.interaction.data.InteractionEvent + +interface ResponseChannel { + + val channelName: String + + fun response(event: InteractionEvent) + + fun register() { + AgentRuntime.registerResponseChannel(channelName, this) + } +} + +object LogChannel : ResponseChannel { + + private val log = LoggerFactory.getLogger(LogChannel::class.java) + + override val channelName: String + get() = "log_channel" + + init { + register() + } + + override fun response(event: InteractionEvent) { + log.info(JSONObject.toJSONString(event)) + } + +} \ No newline at end of file diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/AgentInputData.java b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/AgentInputData.java deleted file mode 100644 index b1a6f1aa..00000000 --- a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/AgentInputData.java +++ /dev/null @@ -1,9 +0,0 @@ -package work.slhaf.partner.api.agent.runtime.interaction.data; - -import lombok.Data; -import lombok.EqualsAndHashCode; - -@EqualsAndHashCode(callSuper = true) -@Data -public abstract class AgentInputData extends InteractionData { -} diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/AgentOutputData.java b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/AgentOutputData.java deleted file mode 100644 index f7de3af3..00000000 --- a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/AgentOutputData.java +++ /dev/null @@ -1,16 +0,0 @@ -package work.slhaf.partner.api.agent.runtime.interaction.data; - -import lombok.Data; -import lombok.EqualsAndHashCode; - -@EqualsAndHashCode(callSuper = true) -@Data -public abstract class AgentOutputData extends InteractionData { - - protected int code; - - public static class StatusCode { - public static final int SUCCESS = 1; - public static final int FAILED = 0; - } -} diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InputData.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InputData.kt new file mode 100644 index 00000000..636ee51d --- /dev/null +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InputData.kt @@ -0,0 +1,14 @@ +package work.slhaf.partner.api.agent.runtime.interaction.data + +open class InputData( + val source: String, + val content: String +) { + private val _meta = mutableMapOf() + val meta: Map + get() = _meta + + fun addMeta(key: String, value: String) { + _meta[key] = value + } +} \ No newline at end of file diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InteractionData.java b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InteractionData.java deleted file mode 100644 index edfeb303..00000000 --- a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InteractionData.java +++ /dev/null @@ -1,12 +0,0 @@ -package work.slhaf.partner.api.agent.runtime.interaction.data; - -import lombok.Data; - -import java.time.LocalDateTime; - -@Data -public abstract class InteractionData { - protected String userInfo; - protected String content; - protected LocalDateTime dateTime; -} diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InteractionEvent.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InteractionEvent.kt new file mode 100644 index 00000000..302310ca --- /dev/null +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/data/InteractionEvent.kt @@ -0,0 +1,84 @@ +package work.slhaf.partner.api.agent.runtime.interaction.data + +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter + +sealed class InteractionEvent { + + /** + * event type + */ + abstract val event: Event + + /** + * event sending status + */ + abstract val status: EventStatus + + /** + * the target send to + */ + abstract val target: String + + private val _meta = mutableMapOf( + "datetime" to ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + ) + val meta: Map + get() = _meta + + fun addMeta(key: String, value: String) { + _meta[key] = value + } + + enum class Event { + REPLY, + MODULE, + SYSTEM + } + + enum class EventStatus { + START, + RUNNING, + ERROR, + DONE + } + +} + +data class Reply( + override val status: EventStatus, + override val target: String, + val content: String, +) : InteractionEvent() { + override val event = Event.REPLY +} + +data class Module( + override val status: EventStatus, + override val target: String, + val data: Data +) : InteractionEvent() { + override val event = Event.MODULE + + data class Data( + val module: String, + val content: String + ) +} + +data class System @JvmOverloads constructor( + override val status: EventStatus, + override val target: String, + val title: String, + val content: String, + val urgency: Urgency = Urgency.NORMAL +) : InteractionEvent() { + override val event = Event.SYSTEM + + enum class Urgency { + LOW, + NORMAL, + HIGH, + CRITICAL + } +} diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/flow/RunningFlowContext.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/flow/RunningFlowContext.kt index 6fdb7fe0..5597489c 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/flow/RunningFlowContext.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/flow/RunningFlowContext.kt @@ -33,7 +33,6 @@ abstract class RunningFlowContext { get() = _skippedModules val status = Status() - val info = Info() fun addSkippedModule(moduleName: String) { _skippedModules.add(moduleName)