diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java index fa1fe9e8..8c39699e 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java @@ -50,104 +50,121 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { runnerClient = actionCapability.runnerClient(); } - /** - * 执行行动 - * - * @param input ActionExecutor 输入内容 - */ - public void execute(ActionExecutorInput input) { - val actions = input.getActions(); - // 异步执行所有行动 - for (ExecutableAction executableAction : actions) { - platformExecutor.execute(() -> { - val source = executableAction.getSource(); - if (executableAction.getStatus() != Action.Status.PREPARE) { - return; + public void execute(Action action) { + virtualExecutor.execute(actionExecutionRouter(action)); + } + + private Runnable actionExecutionRouter(Action action) { + return () -> { + try { + switch (action) { + case ExecutableAction executableAction -> handleExecutableAction(executableAction); + case StateAction stateAction -> handleStateAction(stateAction); + default -> handleUnknownAction(action); } - val actionChain = executableAction.getActionChain(); - if (actionChain.isEmpty()) { - executableAction.setStatus(Action.Status.FAILED); - executableAction.setResult("行动链为空"); - return; + } catch (Exception e) { + log.warn("Action execute failure, uuid: {}, description: {}, failure reason: {}", action.getUuid(), action.getDescription(), e.getLocalizedMessage()); + action.setStatus(Action.Status.FAILED); + } + }; + } + + private void handleUnknownAction(Action action) { + log.warn("unknown Action type: {}", action.getClass().getSimpleName()); + action.setStatus(Action.Status.FAILED); + } + + private void handleStateAction(StateAction stateAction) { + stateAction.getTrigger().onTrigger(); + } + + private void handleExecutableAction(ExecutableAction executableAction) { + val source = executableAction.getSource(); + if (executableAction.getStatus() != Action.Status.PREPARE) { + return; + } + val actionChain = executableAction.getActionChain(); + if (actionChain.isEmpty()) { + executableAction.setStatus(Action.Status.FAILED); + executableAction.setResult("行动链为空"); + return; + } + // 注册执行中行动 + val phaser = new Phaser(); + val phaserRecord = actionCapability.putPhaserRecord(phaser, executableAction); + executableAction.setStatus(Action.Status.EXECUTING); + // 开始执行 + val stageCursor = new Object() { + int stageCount; + boolean executingStageUpdated; + boolean stageCountUpdated; + + void init() { + stageCount = 0; + executingStageUpdated = false; + stageCountUpdated = false; + update(); + } + + void requestAdvance() { + if (!stageCountUpdated) { + stageCount++; + stageCountUpdated = true; } - // 注册执行中行动 - val phaser = new Phaser(); - val phaserRecord = actionCapability.putPhaserRecord(phaser, executableAction); - executableAction.setStatus(Action.Status.EXECUTING); - // 开始执行 - val stageCursor = new Object() { - int stageCount; - boolean executingStageUpdated; - boolean stageCountUpdated; - - void init() { - stageCount = 0; - executingStageUpdated = false; - stageCountUpdated = false; - update(); - } - - void requestAdvance() { - if (!stageCountUpdated) { - stageCount++; - stageCountUpdated = true; - } - if (stageCount < actionChain.size() && !executingStageUpdated) { - update(); - executingStageUpdated = true; - } - } - - boolean next() { - executingStageUpdated = false; - stageCountUpdated = false; - return stageCount < actionChain.size(); - } - - void update() { - val orderList = new ArrayList<>(actionChain.keySet()); - orderList.sort(Integer::compareTo); - executableAction.setExecutingStage(orderList.get(stageCount)); - } - }; - stageCursor.init(); - do { - val metaActions = actionChain.get(executableAction.getExecutingStage()); - val listeningRecord = executeAndListening(metaActions, phaserRecord, source); - phaser.awaitAdvance(listeningRecord.phase()); - // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 - // 导致新行动的 phaser 投放阶段错乱无法阻塞的场景 - // 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题 - synchronized (listeningRecord.accepting()) { - listeningRecord.accepting().set(false); - // 立即尝试推进,本次推进中,如果前方仍有未执行 stage,将执行一次阶段推进 - stageCursor.requestAdvance(); - } - try { - // 针对行动链进行修正,修正需要传入执行历史、行动目标等内容 - // 如果后续运行 corrector 触发频率较高,可考虑增加重试机制 - val correctorInput = assemblyHelper.buildCorrectorInput(executableAction, source); - val correctorResult = actionCorrector.execute(correctorInput); - actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction); - } catch (Exception ignored) { - } - // 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时,但 corrector 执行期间发生了 actionChain 的插入事件 - // 如果第一次已经推进完毕,本次将会跳过 - stageCursor.requestAdvance(); - } while (stageCursor.next()); - // 结束 - actionCapability.removePhaserRecord(phaser); - if (executableAction.getStatus() != Action.Status.FAILED) { - // 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果 - if (executableAction instanceof SchedulableExecutableAction scheduledActionData) { - scheduledActionData.recordAndReset(); - actionScheduler.schedule(Set.of(scheduledActionData)); - } else { - executableAction.setStatus(Action.Status.SUCCESS); - } - // TODO 执行过后需要回写至任务上下文(recentCompletedTask),同时触发自对话信号进行确认并记录以及是否通知用户(触发与否需要机制进行匹配,在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等) + if (stageCount < actionChain.size() && !executingStageUpdated) { + update(); + executingStageUpdated = true; } - }); + } + + boolean next() { + executingStageUpdated = false; + stageCountUpdated = false; + return stageCount < actionChain.size(); + } + + void update() { + val orderList = new ArrayList<>(actionChain.keySet()); + orderList.sort(Integer::compareTo); + executableAction.setExecutingStage(orderList.get(stageCount)); + } + }; + stageCursor.init(); + do { + val metaActions = actionChain.get(executableAction.getExecutingStage()); + val listeningRecord = executeAndListening(metaActions, phaserRecord, source); + phaser.awaitAdvance(listeningRecord.phase()); + // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 + // 导致新行动的 phaser 投放阶段错乱无法阻塞的场景 + // 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题 + synchronized (listeningRecord.accepting()) { + listeningRecord.accepting().set(false); + // 立即尝试推进,本次推进中,如果前方仍有未执行 stage,将执行一次阶段推进 + stageCursor.requestAdvance(); + } + try { + // 针对行动链进行修正,修正需要传入执行历史、行动目标等内容 + // 如果后续运行 corrector 触发频率较高,可考虑增加重试机制 + val correctorInput = assemblyHelper.buildCorrectorInput(executableAction, source); + val correctorResult = actionCorrector.execute(correctorInput); + actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction); + } catch (Exception ignored) { + } + // 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时,但 corrector 执行期间发生了 actionChain 的插入事件 + // 如果第一次已经推进完毕,本次将会跳过 + stageCursor.requestAdvance(); + } while (stageCursor.next()); + // 结束 + actionCapability.removePhaserRecord(phaser); + if (executableAction.getStatus() != Action.Status.FAILED) { + // 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果 + if (executableAction instanceof SchedulableExecutableAction scheduledActionData) { + scheduledActionData.recordAndReset(); + actionScheduler.schedule(Set.of(scheduledActionData)); + } else { + executableAction.setStatus(Action.Status.SUCCESS); + } + // TODO 执行过后需要回写至任务上下文(recentCompletedTask),同时触发自对话信号进行确认并记录以及是否通知用户(触发与否需要机制进行匹配,在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等) } } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java index 96245e0d..58494687 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java @@ -17,7 +17,6 @@ import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.core.perceive.PerceiveCapability; import work.slhaf.partner.module.common.module.PreRunningAbstractAgentModuleAbstract; import work.slhaf.partner.module.modules.action.executor.ActionExecutor; -import work.slhaf.partner.module.modules.action.executor.entity.ActionExecutorInput; import work.slhaf.partner.module.modules.action.planner.confirmer.ActionConfirmer; import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerInput; import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerResult; @@ -154,8 +153,7 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { // execute or schedule it immediately switch (executableAction) { case SchedulableExecutableAction action -> actionScheduler.schedule(Set.of(action)); - case ImmediateExecutableAction action -> - actionExecutor.execute(new ActionExecutorInput(Set.of(action))); + case ImmediateExecutableAction action -> actionExecutor.execute(action); default -> log.error("unknown executable action type: {}", executableAction.getClass().getSimpleName()); } } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/scheduler/ActionScheduler.kt b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/scheduler/ActionScheduler.kt index 6be46681..979edb16 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/scheduler/ActionScheduler.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/scheduler/ActionScheduler.kt @@ -16,12 +16,10 @@ import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentMod import work.slhaf.partner.api.agent.factory.component.annotation.Init import work.slhaf.partner.api.agent.factory.component.annotation.InjectModule import work.slhaf.partner.core.action.ActionCapability -import work.slhaf.partner.core.action.ActionCore +import work.slhaf.partner.core.action.entity.Action import work.slhaf.partner.core.action.entity.Schedulable import work.slhaf.partner.core.action.entity.SchedulableExecutableAction -import work.slhaf.partner.core.action.entity.StateAction import work.slhaf.partner.module.modules.action.executor.ActionExecutor -import work.slhaf.partner.module.modules.action.executor.entity.ActionExecutorInput import java.io.Closeable import java.time.Duration import java.time.ZonedDateTime @@ -54,17 +52,8 @@ class ActionScheduler : AbstractAgentModule.Standalone() { .collect(Collectors.toSet()) } val onTrigger: (Set) -> Unit = { schedulableSet -> - val executableActions = mutableSetOf() - val stateActions = mutableSetOf() - for (schedulable in schedulableSet) { - when (schedulable) { - is SchedulableExecutableAction -> executableActions.add(schedulable) - is StateAction -> stateActions.add(schedulable) - } - } - actionExecutor.execute(ActionExecutorInput(executableActions)) - actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL) - .execute { stateActions.forEach { it.trigger.onTrigger() } } + schedulableSet.filterIsInstance() + .forEach { actionExecutor.execute(it) } } timeWheel = TimeWheel(listScheduledActions, onTrigger) } diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutorTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutorTest.java index 89cd45b9..f212e601 100644 --- a/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutorTest.java +++ b/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutorTest.java @@ -17,7 +17,6 @@ import work.slhaf.partner.module.modules.action.executor.ActionCorrector; import work.slhaf.partner.module.modules.action.executor.ActionExecutor; import work.slhaf.partner.module.modules.action.executor.ActionRepairer; import work.slhaf.partner.module.modules.action.executor.ParamsExtractor; -import work.slhaf.partner.module.modules.action.executor.entity.ActionExecutorInput; import work.slhaf.partner.module.modules.action.executor.entity.CorrectorResult; import work.slhaf.partner.module.modules.action.executor.entity.ExtractorResult; import work.slhaf.partner.module.modules.action.executor.entity.RepairerResult; @@ -97,7 +96,6 @@ class ActionExecutorTest { stubExecutors(directExecutor, directExecutor); ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - ActionExecutorInput input = buildInput("u1", actionData); ExtractorResult extractorResult = new ExtractorResult(); extractorResult.setOk(true); @@ -109,7 +107,7 @@ class ActionExecutorTest { }).when(runnerClient).submit(any(MetaAction.class)); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); verify(runnerClient, times(1)).submit(any(MetaAction.class)); verify(actionCapability, times(1)).removePhaserRecord(any(Phaser.class)); @@ -125,10 +123,9 @@ class ActionExecutorTest { ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); actionData.setStatus(ExecutableAction.Status.EXECUTING); - ActionExecutorInput input = buildInput("u1", actionData); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); verify(actionCapability, never()).putPhaserRecord(any(Phaser.class), any(ExecutableAction.class)); verify(runnerClient, never()).submit(any(MetaAction.class)); @@ -145,7 +142,6 @@ class ActionExecutorTest { chain.put(0, List.of(buildMetaAction("a1", false))); chain.put(1, List.of(buildMetaAction("a2", false))); ImmediateExecutableAction actionData = buildActionData(chain); - ActionExecutorInput input = buildInput("u1", actionData); ExtractorResult extractorResult = new ExtractorResult(); extractorResult.setOk(true); @@ -158,7 +154,7 @@ class ActionExecutorTest { }).when(runnerClient).submit(any(MetaAction.class)); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); verify(runnerClient, timeout(5000).times(2)).submit(any(MetaAction.class)); verify(actionCorrector, timeout(5000).times(2)).execute(any()); @@ -174,7 +170,6 @@ class ActionExecutorTest { stubExecutors(platformExecutor, virtualExecutor); ImmediateExecutableAction actionData = buildActionData(singleStageChain(true)); - ActionExecutorInput input = buildInput("u1", actionData); ExtractorResult extractorResult = new ExtractorResult(); extractorResult.setOk(true); @@ -186,7 +181,7 @@ class ActionExecutorTest { }).when(runnerClient).submit(any(MetaAction.class)); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); verify(actionCapability, times(1)).getExecutor(ActionCore.ExecutorType.VIRTUAL); shutdownExecutor(virtualExecutor); @@ -200,7 +195,6 @@ class ActionExecutorTest { stubExecutors(platformExecutor, virtualExecutor); ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - ActionExecutorInput input = buildInput("u1", actionData); ExtractorResult fail = new ExtractorResult(); fail.setOk(false); @@ -220,7 +214,7 @@ class ActionExecutorTest { }).when(runnerClient).submit(any(MetaAction.class)); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); try { Thread.sleep(500); @@ -238,7 +232,6 @@ class ActionExecutorTest { stubExecutors(platformExecutor, virtualExecutor); ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - ActionExecutorInput input = buildInput("u1", actionData); ExtractorResult fail = new ExtractorResult(); fail.setOk(false); @@ -249,13 +242,13 @@ class ActionExecutorTest { when(actionRepairer.execute(any())).thenReturn(repairerResult); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); try { Thread.sleep(500); } catch (InterruptedException ignored) { } - MetaAction metaAction = actionData.getActionChain().get(0).get(0); + MetaAction metaAction = actionData.getActionChain().get(0).getFirst(); assertEquals(MetaAction.Result.Status.FAILED, metaAction.getResult().getStatus()); verify(runnerClient, never()).submit(any(MetaAction.class)); } @@ -269,7 +262,6 @@ class ActionExecutorTest { stubExecutors(platformExecutor, virtualExecutor); ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - ActionExecutorInput input = buildInput("u1", actionData); ExtractorResult fail = new ExtractorResult(); fail.setOk(false); @@ -305,7 +297,7 @@ class ActionExecutorTest { }); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); assertTrue(doneLatch.await(2, TimeUnit.SECONDS)); shutdownExecutor(platformExecutor); @@ -323,7 +315,6 @@ class ActionExecutorTest { stubExecutors(platformExecutor, virtualExecutor); ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - ActionExecutorInput input = buildInput("u1", actionData); ExtractorResult ok = new ExtractorResult(); ok.setOk(true); @@ -337,7 +328,7 @@ class ActionExecutorTest { lenient().doThrow(new RuntimeException("boom")).when(actionCorrector).execute(any()); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); try { Thread.sleep(500); @@ -356,10 +347,9 @@ class ActionExecutorTest { stubExecutors(platformExecutor, virtualExecutor); ImmediateExecutableAction actionData = buildActionData(new HashMap<>()); - ActionExecutorInput input = buildInput("u1", actionData); actionExecutor.init(); - actionExecutor.execute(input); + actionExecutor.execute(actionData); try { Thread.sleep(500); @@ -373,10 +363,6 @@ class ActionExecutorTest { when(actionCapability.runnerClient()).thenReturn(runnerClient); } - private ActionExecutorInput buildInput(String userId, ImmediateExecutableAction actionData) { - return new ActionExecutorInput(Set.of(actionData)); - } - private ImmediateExecutableAction buildActionData(Map> actionChain) { val immediateActionData = new ImmediateExecutableAction( "tendency",