mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
refactor(framework): migrate AgentInteractionAdapter flow execution to Kotlin coroutines and remove AgentRunningFlow
This commit is contained in:
@@ -1,26 +1,61 @@
|
|||||||
package work.slhaf.partner.api.agent.runtime.interaction;
|
package work.slhaf.partner.api.agent.runtime.interaction
|
||||||
|
|
||||||
import work.slhaf.partner.api.agent.factory.component.pojo.MetaModule;
|
import kotlinx.coroutines.async
|
||||||
import work.slhaf.partner.api.agent.runtime.config.AgentConfigLoader;
|
import kotlinx.coroutines.awaitAll
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentInputData;
|
import kotlinx.coroutines.coroutineScope
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentOutputData;
|
import kotlinx.coroutines.runBlocking
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.flow.AgentRunningFlow;
|
import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.flow.entity.RunningFlowContext;
|
import work.slhaf.partner.api.agent.factory.context.AgentContext
|
||||||
|
import work.slhaf.partner.api.agent.factory.context.ModuleContextData
|
||||||
|
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentInputData
|
||||||
|
import work.slhaf.partner.api.agent.runtime.interaction.data.AgentOutputData
|
||||||
|
import work.slhaf.partner.api.agent.runtime.interaction.flow.RunningFlowContext
|
||||||
|
|
||||||
import java.util.List;
|
abstract class AgentInteractionAdapter<I : AgentInputData, O : AgentOutputData, C : RunningFlowContext> {
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public abstract class AgentInteractionAdapter<I extends AgentInputData, O extends AgentOutputData, C extends RunningFlowContext> {
|
private val runningModules =
|
||||||
|
mutableMapOf<Int, MutableList<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>>()
|
||||||
|
|
||||||
protected AgentRunningFlow<C> agentRunningFlow = new AgentRunningFlow<>();
|
fun call(runningFlowContext: C): C = runBlocking {
|
||||||
protected Map<Integer, List<MetaModule>> moduleOrderedMap = AgentConfigLoader.INSTANCE.getModuleOrderedMap();
|
if (runningModules.isEmpty()) {
|
||||||
|
AgentContext.modules
|
||||||
|
.filter { ModuleContextData.Running::class.java.isAssignableFrom(it.value.javaClass) }
|
||||||
|
.map { it.value as ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>> }
|
||||||
|
.sortedBy { it.order }
|
||||||
|
.forEach { runningModules.computeIfAbsent(it.order) { mutableListOf() }.add(it) }
|
||||||
|
}
|
||||||
|
|
||||||
public C call(C finalInputData) {
|
try {
|
||||||
return agentRunningFlow.launch(moduleOrderedMap, finalInputData);
|
for (modules in runningModules.values) {
|
||||||
|
executeOrder(modules, runningFlowContext)
|
||||||
|
}
|
||||||
|
runningFlowContext.ok = 1
|
||||||
|
} catch (e: Exception) {
|
||||||
|
runningFlowContext.ok = 0
|
||||||
|
runningFlowContext.errMsg.add(e.localizedMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
return@runBlocking runningFlowContext
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract O parseOutputData(C outputContext);
|
private suspend fun executeOrder(
|
||||||
|
modules: MutableList<ModuleContextData.Running<AbstractAgentModule.Running<RunningFlowContext>>>,
|
||||||
|
runningFlowContext: C
|
||||||
|
) {
|
||||||
|
|
||||||
protected abstract C parseInputData(I inputData);
|
coroutineScope {
|
||||||
|
|
||||||
|
val jobs = modules.map { module ->
|
||||||
|
async {
|
||||||
|
module.instance.execute(runningFlowContext)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs.awaitAll() // 任一异常会取消全部
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract fun parseOutputData(outputContext: C): O
|
||||||
|
|
||||||
|
protected abstract fun parseInputData(inputData: I): C
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,49 +0,0 @@
|
|||||||
package work.slhaf.partner.api.agent.runtime.interaction.flow;
|
|
||||||
|
|
||||||
import work.slhaf.partner.api.agent.factory.component.pojo.MetaModule;
|
|
||||||
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
|
|
||||||
import work.slhaf.partner.api.agent.runtime.exception.GlobalExceptionHandler;
|
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.flow.entity.RunningFlowContext;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Agent执行流程
|
|
||||||
*/
|
|
||||||
public class AgentRunningFlow<C extends RunningFlowContext> {
|
|
||||||
|
|
||||||
public C launch(Map<Integer, List<MetaModule>> modules, C interactionContext) {
|
|
||||||
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
|
|
||||||
//流程执行启动
|
|
||||||
for (Map.Entry<Integer, List<MetaModule>> entry : modules.entrySet()) {
|
|
||||||
List<Future<?>> futures = new ArrayList<>();
|
|
||||||
List<MetaModule> moduleList = entry.getValue();
|
|
||||||
for (MetaModule module : moduleList) {
|
|
||||||
Future<?> future = executor.submit(() -> {
|
|
||||||
module.getInstance().execute(interactionContext);
|
|
||||||
});
|
|
||||||
futures.add(future);
|
|
||||||
}
|
|
||||||
for (Future<?> future : futures) {
|
|
||||||
try {
|
|
||||||
future.get();
|
|
||||||
} catch (Exception e) {
|
|
||||||
boolean exit = GlobalExceptionHandler.INSTANCE.handle(e);
|
|
||||||
if (exit) throw new AgentRuntimeException("Agent执行出错!", e);
|
|
||||||
interactionContext.getErrMsg().add(e.getLocalizedMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
interactionContext.setOk(1);
|
|
||||||
} catch (Exception e) {
|
|
||||||
interactionContext.setOk(0);
|
|
||||||
interactionContext.getErrMsg().add(e.getLocalizedMessage());
|
|
||||||
}
|
|
||||||
return interactionContext;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user