mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
refactor(framework): extract interaction execution into AgentRuntime and delegate from AgentInteractionAdapter
This commit is contained in:
@@ -1,82 +1,26 @@
|
|||||||
package work.slhaf.partner.api.agent.runtime.interaction
|
package work.slhaf.partner.api.agent.runtime.interaction
|
||||||
|
|
||||||
import kotlinx.coroutines.async
|
|
||||||
import kotlinx.coroutines.awaitAll
|
|
||||||
import kotlinx.coroutines.coroutineScope
|
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule
|
|
||||||
import work.slhaf.partner.api.agent.factory.context.AgentContext
|
|
||||||
import work.slhaf.partner.api.agent.factory.context.ModuleContextData
|
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentInputData
|
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentInputData
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentOutputData
|
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentOutputData
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext
|
import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext
|
||||||
|
|
||||||
abstract class AgentInteractionAdapter<I : AgentInputData, O : AgentOutputData, C : RunningFlowContext> {
|
abstract class AgentInteractionAdapter<
|
||||||
|
I : AgentInputData,
|
||||||
|
O : AgentOutputData,
|
||||||
|
C : RunningFlowContext
|
||||||
|
> {
|
||||||
|
|
||||||
// TODO whether to support message queue, to avoid concurrence problem in data and agent stats
|
protected val runtime: AgentRuntime = AgentRuntime // 由 AgentContext 持有实例
|
||||||
|
|
||||||
companion object {
|
|
||||||
@Volatile
|
|
||||||
private var runningModules:
|
|
||||||
Map<Int, List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>> = emptyMap()
|
|
||||||
|
|
||||||
fun refreshRunningModules() {
|
|
||||||
val newMap = buildRunningModules()
|
|
||||||
runningModules = newMap
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun buildRunningModules():
|
|
||||||
Map<Int, List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>> {
|
|
||||||
return AgentContext.modules
|
|
||||||
.values
|
|
||||||
.filterIsInstance<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>()
|
|
||||||
.filter { it.enabled }
|
|
||||||
.groupBy { it.order }
|
|
||||||
.toSortedMap()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun submit(inputData: I): O {
|
fun submit(inputData: I): O {
|
||||||
val finalInputData: C = parseInputData(inputData)
|
val ctx = parseInputData(inputData)
|
||||||
val outputContext: C = call(finalInputData)
|
|
||||||
return parseOutputData(outputContext)
|
val result = runBlocking {
|
||||||
|
runtime.submit(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun call(runningFlowContext: C): C = runBlocking {
|
return parseOutputData(result)
|
||||||
|
|
||||||
if (runningModules.isEmpty()) {
|
|
||||||
buildRunningModules()
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
for (modules in runningModules.values) {
|
|
||||||
executeOrder(modules, runningFlowContext)
|
|
||||||
}
|
|
||||||
runningFlowContext.ok = 1
|
|
||||||
} catch (e: Exception) {
|
|
||||||
runningFlowContext.ok = 0
|
|
||||||
runningFlowContext.errMsg.add(e.localizedMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
return@runBlocking runningFlowContext
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private suspend fun executeOrder(
|
|
||||||
modules: List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>,
|
|
||||||
runningFlowContext: C
|
|
||||||
) {
|
|
||||||
|
|
||||||
coroutineScope {
|
|
||||||
|
|
||||||
val jobs = modules.map { module ->
|
|
||||||
async {
|
|
||||||
module.instance.execute(runningFlowContext)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jobs.awaitAll() // 任一异常会取消全部
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract fun parseOutputData(outputContext: C): O
|
protected abstract fun parseOutputData(outputContext: C): O
|
||||||
|
|||||||
@@ -0,0 +1,92 @@
|
|||||||
|
package work.slhaf.partner.api.agent.runtime.interaction
|
||||||
|
|
||||||
|
import kotlinx.coroutines.*
|
||||||
|
import kotlinx.coroutines.channels.Channel
|
||||||
|
import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule
|
||||||
|
import work.slhaf.partner.api.agent.factory.context.AgentContext
|
||||||
|
import work.slhaf.partner.api.agent.factory.context.ModuleContextData
|
||||||
|
import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext
|
||||||
|
|
||||||
|
object AgentRuntime {
|
||||||
|
|
||||||
|
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
|
||||||
|
|
||||||
|
private val channel =
|
||||||
|
Channel<TurnRequest<RunningFlowContext>>(Channel.UNLIMITED)
|
||||||
|
|
||||||
|
@Volatile
|
||||||
|
private var runningModules:
|
||||||
|
Map<Int, List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>> =
|
||||||
|
emptyMap()
|
||||||
|
|
||||||
|
init {
|
||||||
|
scope.launch {
|
||||||
|
for (req in channel) {
|
||||||
|
val result = executeTurn(req.context)
|
||||||
|
req.deferred.complete(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun refreshRunningModules() {
|
||||||
|
runningModules = buildRunningModules()
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun <C : RunningFlowContext> submit(context: C): C {
|
||||||
|
val deferred = CompletableDeferred<RunningFlowContext>()
|
||||||
|
channel.send(TurnRequest(context, deferred))
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
|
return deferred.await() as C
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun executeTurn(
|
||||||
|
runningFlowContext: RunningFlowContext
|
||||||
|
): RunningFlowContext {
|
||||||
|
|
||||||
|
if (runningModules.isEmpty()) {
|
||||||
|
refreshRunningModules()
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (modules in runningModules.values) {
|
||||||
|
executeOrder(modules, runningFlowContext)
|
||||||
|
}
|
||||||
|
runningFlowContext.ok = 1
|
||||||
|
} catch (e: Exception) {
|
||||||
|
runningFlowContext.ok = 0
|
||||||
|
runningFlowContext.errMsg.add(e.localizedMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
return runningFlowContext
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun executeOrder(
|
||||||
|
modules: List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>,
|
||||||
|
runningFlowContext: RunningFlowContext
|
||||||
|
) {
|
||||||
|
coroutineScope {
|
||||||
|
val jobs = modules.map { module ->
|
||||||
|
async {
|
||||||
|
module.instance.execute(runningFlowContext)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jobs.awaitAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun buildRunningModules():
|
||||||
|
Map<Int, List<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>> {
|
||||||
|
|
||||||
|
return AgentContext.modules
|
||||||
|
.values
|
||||||
|
.filterIsInstance<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>()
|
||||||
|
.filter { it.enabled }
|
||||||
|
.groupBy { it.order }
|
||||||
|
.toSortedMap()
|
||||||
|
}
|
||||||
|
|
||||||
|
private data class TurnRequest<C : RunningFlowContext>(
|
||||||
|
val context: C,
|
||||||
|
val deferred: CompletableDeferred<RunningFlowContext>
|
||||||
|
)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user