diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentInteractionAdapter.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentInteractionAdapter.kt index 01b6a2ae..48df7e71 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentInteractionAdapter.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentInteractionAdapter.kt @@ -1,85 +1,29 @@ package work.slhaf.partner.api.agent.runtime.interaction -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.runBlocking -import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule -import work.slhaf.partner.api.agent.factory.context.AgentContext -import work.slhaf.partner.api.agent.factory.context.ModuleContextData import work.slhaf.partner.api.agent.runtime.interaction.data.AgentInputData import work.slhaf.partner.api.agent.runtime.interaction.data.AgentOutputData import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext -abstract class AgentInteractionAdapter { +abstract class AgentInteractionAdapter< + I : AgentInputData, + O : AgentOutputData, + C : RunningFlowContext + > { - // TODO whether to support message queue, to avoid concurrence problem in data and agent stats - - companion object { - @Volatile - private var runningModules: - Map>>> = emptyMap() - - fun refreshRunningModules() { - val newMap = buildRunningModules() - runningModules = newMap - } - - private fun buildRunningModules(): - Map>>> { - return AgentContext.modules - .values - .filterIsInstance>>() - .filter { it.enabled } - .groupBy { it.order } - .toSortedMap() - } - } + protected val runtime: AgentRuntime = AgentRuntime // 由 AgentContext 持有实例 fun submit(inputData: I): O { - val finalInputData: C = parseInputData(inputData) - val outputContext: C = call(finalInputData) - return parseOutputData(outputContext) - } + val ctx = parseInputData(inputData) - private fun call(runningFlowContext: C): C = runBlocking { - - if (runningModules.isEmpty()) { - buildRunningModules() + val result = runBlocking { + runtime.submit(ctx) } - try { - for (modules in runningModules.values) { - executeOrder(modules, runningFlowContext) - } - runningFlowContext.ok = 1 - } catch (e: Exception) { - runningFlowContext.ok = 0 - runningFlowContext.errMsg.add(e.localizedMessage) - } - - return@runBlocking runningFlowContext - } - - - private suspend fun executeOrder( - modules: List>>, - runningFlowContext: C - ) { - - coroutineScope { - - val jobs = modules.map { module -> - async { - module.instance.execute(runningFlowContext) - } - } - - jobs.awaitAll() // 任一异常会取消全部 - } + return parseOutputData(result) } protected abstract fun parseOutputData(outputContext: C): O protected abstract fun parseInputData(inputData: I): C -} +} \ No newline at end of file diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentRuntime.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentRuntime.kt new file mode 100644 index 00000000..c881be79 --- /dev/null +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/interaction/AgentRuntime.kt @@ -0,0 +1,92 @@ +package work.slhaf.partner.api.agent.runtime.interaction + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule +import work.slhaf.partner.api.agent.factory.context.AgentContext +import work.slhaf.partner.api.agent.factory.context.ModuleContextData +import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext + +object AgentRuntime { + + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + + private val channel = + Channel>(Channel.UNLIMITED) + + @Volatile + private var runningModules: + Map>>> = + emptyMap() + + init { + scope.launch { + for (req in channel) { + val result = executeTurn(req.context) + req.deferred.complete(result) + } + } + } + + fun refreshRunningModules() { + runningModules = buildRunningModules() + } + + suspend fun submit(context: C): C { + val deferred = CompletableDeferred() + channel.send(TurnRequest(context, deferred)) + @Suppress("UNCHECKED_CAST") + return deferred.await() as C + } + + private suspend fun executeTurn( + runningFlowContext: RunningFlowContext + ): RunningFlowContext { + + if (runningModules.isEmpty()) { + refreshRunningModules() + } + + try { + for (modules in runningModules.values) { + executeOrder(modules, runningFlowContext) + } + runningFlowContext.ok = 1 + } catch (e: Exception) { + runningFlowContext.ok = 0 + runningFlowContext.errMsg.add(e.localizedMessage) + } + + return runningFlowContext + } + + private suspend fun executeOrder( + modules: List>>, + runningFlowContext: RunningFlowContext + ) { + coroutineScope { + val jobs = modules.map { module -> + async { + module.instance.execute(runningFlowContext) + } + } + jobs.awaitAll() + } + } + + private fun buildRunningModules(): + Map>>> { + + return AgentContext.modules + .values + .filterIsInstance>>() + .filter { it.enabled } + .groupBy { it.order } + .toSortedMap() + } + + private data class TurnRequest( + val context: C, + val deferred: CompletableDeferred + ) +} \ No newline at end of file