refactor(interaction): decouple gateway IO from runtime response flow

- replace interaction adapter/input-output DTO flow with InputData and InteractionEvent
  - introduce ResponseChannel and default LogChannel for runtime response dispatch
  - let AgentGateway parse running context directly and submit turns asynchronously
  - update WebSocketGateway to emit serialized interaction events
  - simplify cognition turn initiation to fire-and-forget semantics
  - streamline running flow context source construction and runtime module execution
This commit is contained in:
2026-03-30 22:46:02 +08:00
parent def48fd0ce
commit d70054cd9b
18 changed files with 213 additions and 220 deletions

View File

@@ -1,24 +1,17 @@
package work.slhaf.partner.api.agent.runtime.interaction;
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentInputData;
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentOutputData;
import work.slhaf.partner.api.agent.runtime.interaction.data.InputData;
import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext;
public interface AgentGateway<I extends AgentInputData, O extends AgentOutputData, C extends RunningFlowContext> {
public interface AgentGateway<I extends InputData, C extends RunningFlowContext> extends ResponseChannel {
void launch();
default void receive(I inputData) {
O outputData = adapter().submit(inputData);
send(outputData);
C parsedContext = parseRunningFlowContext(inputData);
AgentRuntime.INSTANCE.submit(parsedContext);
}
void send(O outputData);
C parseRunningFlowContext(I inputData);
/**
* 通过adapter提供的receive、send方法进行与客户端的交互行为
*
* @return adapter实例
*/
AgentInteractionAdapter<I, O, C> adapter();
}

View File

@@ -1,26 +0,0 @@
package work.slhaf.partner.api.agent.runtime.interaction
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentInputData
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentOutputData
import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext
abstract class AgentInteractionAdapter<
I : AgentInputData,
O : AgentOutputData,
C : RunningFlowContext
> {
protected val runtime: AgentRuntime = AgentRuntime // 由 AgentContext 持有实例
fun submit(inputData: I): O {
val ctx = parseInputData(inputData)
val result = runtime.submit(ctx)
return parseOutputData(result)
}
protected abstract fun parseOutputData(outputContext: C): O
protected abstract fun parseInputData(inputData: I): C
}

View File

@@ -5,89 +5,86 @@ import kotlinx.coroutines.channels.Channel
import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule
import work.slhaf.partner.api.agent.factory.context.AgentContext
import work.slhaf.partner.api.agent.factory.context.ModuleContextData
import work.slhaf.partner.api.agent.runtime.interaction.data.InteractionEvent
import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext
object AgentRuntime {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val channel =
Channel<TurnRequest<RunningFlowContext>>(Channel.UNLIMITED)
private val channel = Channel<RunningFlowContext>(Channel.UNLIMITED)
private val responseChannels = mutableMapOf<String, ResponseChannel>(
LogChannel.channelName to LogChannel
)
// TODO 暂时取 log_channel 为默认回复通道,若为空则只打印信息。后续将配合配置中心替换通过配置文件进行指定
private val defaultChannel: String = "log_channel"
@Volatile
private var runningModules:
Map<Int, List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>> =
emptyMap()
private var runningModules: Map<Int, List<AbstractAgentModule.Running<RunningFlowContext>>> = emptyMap()
init {
scope.launch {
for (req in channel) {
val result = executeTurn(req.context)
req.deferred.complete(result)
for (ctx in channel) {
executeTurn(ctx)
}
}
}
fun refreshRunningModules() {
runningModules = buildRunningModules()
fun registerResponseChannel(channelName: String, responseChannel: ResponseChannel) {
responseChannels[channelName] = responseChannel
}
fun <C : RunningFlowContext> submit(context: C): C = runBlocking {
val deferred = CompletableDeferred<RunningFlowContext>()
channel.send(TurnRequest(context, deferred))
@Suppress("UNCHECKED_CAST")
(return@runBlocking deferred.await() as C)
@JvmOverloads
fun response(event: InteractionEvent, channelName: String = defaultChannel) {
val channel = responseChannels[channelName]
if (channel == null) {
responseChannels[defaultChannel]!!.response(event)
} else {
channel.response(event)
}
}
private suspend fun executeTurn(
runningFlowContext: RunningFlowContext
): RunningFlowContext {
fun <C : RunningFlowContext> submit(context: C) = runBlocking {
channel.send(context)
}
private suspend fun executeTurn(runningFlowContext: RunningFlowContext) {
if (runningModules.isEmpty()) {
refreshRunningModules()
}
try {
for (modules in runningModules.values) {
executeOrder(modules, runningFlowContext)
}
} catch (e: Exception) {
runningFlowContext.status.errors.add(e.localizedMessage)
for (modules in runningModules.values) {
executeOrder(modules, runningFlowContext)
}
return runningFlowContext
}
private fun refreshRunningModules() {
runningModules = AgentContext.modules.values
.filterIsInstance<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>()
.filter { it.enabled }
.groupBy { it.order }
.mapValues { it.value.map { contextData -> contextData.instance } }
.toSortedMap()
}
private suspend fun executeOrder(
modules: List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>,
modules: List<AbstractAgentModule.Running<RunningFlowContext>>,
runningFlowContext: RunningFlowContext
) {
coroutineScope {
val jobs = modules.map { module ->
async {
if (runningFlowContext.skippedModules.contains(module.instance.moduleName)) {
if (runningFlowContext.skippedModules.contains(module.moduleName)) {
return@async
}
module.instance.execute(runningFlowContext)
module.execute(runningFlowContext)
}
}
jobs.awaitAll()
}
}
private fun buildRunningModules():
Map<Int, List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>> {
return AgentContext.modules
.values
.filterIsInstance<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>()
.filter { it.enabled }
.groupBy { it.order }
.toSortedMap()
}
private data class TurnRequest<C : RunningFlowContext>(
val context: C,
val deferred: CompletableDeferred<RunningFlowContext>
)
}

View File

@@ -0,0 +1,33 @@
package work.slhaf.partner.api.agent.runtime.interaction
import com.alibaba.fastjson2.JSONObject
import org.slf4j.LoggerFactory
import work.slhaf.partner.api.agent.runtime.interaction.data.InteractionEvent
interface ResponseChannel {
val channelName: String
fun response(event: InteractionEvent)
fun register() {
AgentRuntime.registerResponseChannel(channelName, this)
}
}
object LogChannel : ResponseChannel {
private val log = LoggerFactory.getLogger(LogChannel::class.java)
override val channelName: String
get() = "log_channel"
init {
register()
}
override fun response(event: InteractionEvent) {
log.info(JSONObject.toJSONString(event))
}
}

View File

@@ -1,9 +0,0 @@
package work.slhaf.partner.api.agent.runtime.interaction.data;
import lombok.Data;
import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
@Data
public abstract class AgentInputData extends InteractionData {
}

View File

@@ -1,16 +0,0 @@
package work.slhaf.partner.api.agent.runtime.interaction.data;
import lombok.Data;
import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
@Data
public abstract class AgentOutputData extends InteractionData {
protected int code;
public static class StatusCode {
public static final int SUCCESS = 1;
public static final int FAILED = 0;
}
}

View File

@@ -0,0 +1,14 @@
package work.slhaf.partner.api.agent.runtime.interaction.data
open class InputData(
val source: String,
val content: String
) {
private val _meta = mutableMapOf<String, String>()
val meta: Map<String, String>
get() = _meta
fun addMeta(key: String, value: String) {
_meta[key] = value
}
}

View File

@@ -1,12 +0,0 @@
package work.slhaf.partner.api.agent.runtime.interaction.data;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public abstract class InteractionData {
protected String userInfo;
protected String content;
protected LocalDateTime dateTime;
}

View File

@@ -0,0 +1,84 @@
package work.slhaf.partner.api.agent.runtime.interaction.data
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
sealed class InteractionEvent {
/**
* event type
*/
abstract val event: Event
/**
* event sending status
*/
abstract val status: EventStatus
/**
* the target send to
*/
abstract val target: String
private val _meta = mutableMapOf<String, String>(
"datetime" to ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
)
val meta: Map<String, String>
get() = _meta
fun addMeta(key: String, value: String) {
_meta[key] = value
}
enum class Event {
REPLY,
MODULE,
SYSTEM
}
enum class EventStatus {
START,
RUNNING,
ERROR,
DONE
}
}
data class Reply(
override val status: EventStatus,
override val target: String,
val content: String,
) : InteractionEvent() {
override val event = Event.REPLY
}
data class Module(
override val status: EventStatus,
override val target: String,
val data: Data
) : InteractionEvent() {
override val event = Event.MODULE
data class Data(
val module: String,
val content: String
)
}
data class System @JvmOverloads constructor(
override val status: EventStatus,
override val target: String,
val title: String,
val content: String,
val urgency: Urgency = Urgency.NORMAL
) : InteractionEvent() {
override val event = Event.SYSTEM
enum class Urgency {
LOW,
NORMAL,
HIGH,
CRITICAL
}
}

View File

@@ -33,7 +33,6 @@ abstract class RunningFlowContext {
get() = _skippedModules
val status = Status()
val info = Info()
fun addSkippedModule(moduleName: String) {
_skippedModules.add(moduleName)