From 3640cc21082406aac7196c03916c31a42c3c2345 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Tue, 7 Apr 2026 23:16:12 +0800 Subject: [PATCH] feat(action): support continuing executable actions after state restored --- .idea/misc.xml | 2 + .../slhaf/partner/core/action/ActionCore.java | 11 - .../action/executor/ActionExecutor.java | 44 +++- .../action/scheduler/ActionScheduler.kt | 2 +- .../action/executor/ActionExecutorTest.java | 204 ++++++++++++++++++ 5 files changed, 245 insertions(+), 18 deletions(-) create mode 100644 Partner-Core/src/test/java/work/slhaf/partner/module/action/executor/ActionExecutorTest.java diff --git a/.idea/misc.xml b/.idea/misc.xml index b94e4cca..2e852248 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -25,6 +25,8 @@ + + diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 6cd7adab..2d2d673f 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -3,7 +3,6 @@ package work.slhaf.partner.core.action; import com.alibaba.fastjson2.JSONObject; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import lombok.val; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import work.slhaf.partner.core.action.entity.ExecutableAction; @@ -57,16 +56,6 @@ public class ActionCore implements StateSerializable { // TODO 通过 Config 指定采用何种 runnerClient,当前只提供 LocalRunnerClient runnerClient = new LocalRunnerClient(existedMetaActions, virtualExecutor, baseActionPath); register(); - setupShutdownHook(); - } - - private void setupShutdownHook() { - // 将执行中的行动状态置为失败 - val executingActionSet = listActions(ExecutableAction.Status.EXECUTING, null); - for (ExecutableAction executableAction : executingActionSet) { - executableAction.setStatus(ExecutableAction.Status.FAILED); - executableAction.setResult("由于系统中断而失败"); - } } @CapabilityMethod diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java index 6ecab4fa..ec5bf34a 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java @@ -48,6 +48,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { platformExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM); runnerClient = actionCapability.runnerClient(); blockManager = new ExecutingActionBlockManager(cognitionCapability.contextWorkspace()); + + actionCapability.listActions(Action.Status.EXECUTING, null) + .forEach(this::execute); } public void execute(Action action) { @@ -115,7 +118,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { private void handleExecutableAction(ExecutableAction executableAction) { val source = executableAction.getSource(); - if (executableAction.getStatus() != Action.Status.PREPARE) { + val status = executableAction.getStatus(); + if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) { return; } val actionChain = executableAction.getActionChain(); @@ -124,6 +128,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { executableAction.setResult("行动链为空"); return; } + normalizeExecutingStage(executableAction, actionChain); // 注册执行中行动 val phaser = new Phaser(); executableAction.setStatus(Action.Status.EXECUTING); @@ -133,13 +138,13 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { // 开始执行 val stageCursor = new Object() { int stageCount; - boolean executingStageUpdated; - boolean stageCountUpdated; + boolean executingStageUpdated = false; + boolean stageCountUpdated = false; void init() { - stageCount = 0; - executingStageUpdated = false; - stageCountUpdated = false; + val orderList = new ArrayList<>(actionChain.keySet()); + orderList.sort(Integer::compareTo); + stageCount = orderList.indexOf(executableAction.getExecutingStage()); update(); } @@ -376,6 +381,33 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { return stageIndex >= 2 && (stageIndex - 2) % 2 == 0; } + private void normalizeExecutingStage(ExecutableAction executableAction, Map> actionChain) { + Integer firstStage = actionChain.keySet().stream() + .min(Integer::compareTo) + .orElse(null); + if (firstStage == null) { + return; + } + if (actionChain.containsKey(executableAction.getExecutingStage())) { + return; + } + if (executableAction.getStatus() == Action.Status.EXECUTING) { + resetExecutableActionForReplay(executableAction); + } + executableAction.setExecutingStage(firstStage); + } + + private void resetExecutableActionForReplay(ExecutableAction executableAction) { + executableAction.getHistory().clear(); + executableAction.getActionChain().values().forEach(metaActions -> metaActions.forEach(metaAction -> { + metaAction.getParams().clear(); + metaAction.getResult().reset(); + })); + if (hasExecutableResult(executableAction)) { + executableAction.setResult(""); + } + } + private void ensureExecutableResult(ExecutableAction executableAction, boolean failed, String failureReason) { if (hasExecutableResult(executableAction)) { return; diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/scheduler/ActionScheduler.kt b/Partner-Core/src/main/java/work/slhaf/partner/module/action/scheduler/ActionScheduler.kt index 57caf009..fbd2943f 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/scheduler/ActionScheduler.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/scheduler/ActionScheduler.kt @@ -55,7 +55,7 @@ class ActionScheduler : AbstractAgentModule.Standalone() { .stream() .filter { it is SchedulableExecutableAction } .map { it as SchedulableExecutableAction } - .collect(Collectors.toSet()) + .collect(Collectors.toSet()) val persisted: MutableSet = mutableSetOf() persisted.addAll(persistedExecutable) synchronized(runtimeSchedulables) { diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/action/executor/ActionExecutorTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/action/executor/ActionExecutorTest.java new file mode 100644 index 00000000..3bb524a8 --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/module/action/executor/ActionExecutorTest.java @@ -0,0 +1,204 @@ +package work.slhaf.partner.module.action.executor; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import work.slhaf.partner.core.action.ActionCapability; +import work.slhaf.partner.core.action.ActionCore; +import work.slhaf.partner.core.action.entity.*; +import work.slhaf.partner.core.action.runner.RunnerClient; +import work.slhaf.partner.core.cognition.CognitionCapability; +import work.slhaf.partner.core.cognition.ContextWorkspace; +import work.slhaf.partner.module.action.executor.entity.ExtractorResult; +import work.slhaf.partner.module.action.executor.entity.HistoryAction; + +import java.lang.reflect.Field; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +class ActionExecutorTest { + + private final List executors = new ArrayList<>(); + + private static void inject(Object target, String fieldName, Object value) throws Exception { + Field field = ActionExecutor.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private static Map> actionChain(MetaAction metaAction) { + Map> actionChain = new LinkedHashMap<>(); + actionChain.put(1, new ArrayList<>(List.of(metaAction))); + return actionChain; + } + + private static MetaAction metaAction(String name) { + return new MetaAction(name, false, null, MetaAction.Type.BUILTIN, "builtin"); + } + + @AfterEach + void tearDown() throws InterruptedException { + for (ExecutorService executor : executors) { + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + } + + @Test + void shouldResumeExecutingImmediateAndSchedulableActionsOnInit() throws Exception { + ActionCapability actionCapability = Mockito.mock(ActionCapability.class); + CognitionCapability cognitionCapability = Mockito.mock(CognitionCapability.class); + + ExecutorService virtualExecutor = registerExecutor(Executors.newFixedThreadPool(2)); + ExecutorService platformExecutor = registerExecutor(Executors.newFixedThreadPool(2)); + RunnerClient runnerClient = Mockito.mock(RunnerClient.class); + + ImmediateExecutableAction immediateAction = new ImmediateExecutableAction( + "urgent", + actionChain(metaAction("command")), + "reason", + "desc", + "planner", + "immediate-uuid" + ); + immediateAction.setStatus(Action.Status.EXECUTING); + immediateAction.setExecutingStage(1); + + SchedulableExecutableAction schedulableAction = new SchedulableExecutableAction( + "steady", + actionChain(metaAction("refresh")), + "reason", + "desc", + "scheduler", + Schedulable.ScheduleType.CYCLE, + "0 0/5 * * * ?", + "schedulable-uuid" + ); + schedulableAction.setStatus(Action.Status.EXECUTING); + schedulableAction.setExecutingStage(1); + + when(actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)).thenReturn(virtualExecutor); + when(actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM)).thenReturn(platformExecutor); + when(actionCapability.runnerClient()).thenReturn(runnerClient); + when(actionCapability.listActions(Action.Status.EXECUTING, null)).thenReturn(Set.of(immediateAction, schedulableAction)); + when(cognitionCapability.contextWorkspace()).thenReturn(new ContextWorkspace()); + + ActionExecutor actionExecutor = spy(new ActionExecutor()); + inject(actionExecutor, "actionCapability", actionCapability); + inject(actionExecutor, "cognitionCapability", cognitionCapability); + inject(actionExecutor, "paramsExtractor", Mockito.mock(ParamsExtractor.class)); + inject(actionExecutor, "actionCorrector", Mockito.mock(ActionCorrector.class)); + inject(actionExecutor, "actionCorrectionRecognizer", Mockito.mock(ActionCorrectionRecognizer.class)); + doNothing().when(actionExecutor).execute(any(Action.class)); + + actionExecutor.init(); + + verify(actionExecutor, times(1)).execute(immediateAction); + verify(actionExecutor, times(1)).execute(schedulableAction); + } + + @Test + void shouldReplayFromFirstStageWhenExecutingStageIsInvalid() throws Exception { + ActionCapability actionCapability = Mockito.mock(ActionCapability.class); + CognitionCapability cognitionCapability = Mockito.mock(CognitionCapability.class); + ParamsExtractor paramsExtractor = Mockito.mock(ParamsExtractor.class); + ActionCorrector actionCorrector = Mockito.mock(ActionCorrector.class); + ActionCorrectionRecognizer actionCorrectionRecognizer = Mockito.mock(ActionCorrectionRecognizer.class); + RunnerClient runnerClient = Mockito.mock(RunnerClient.class); + + ExecutorService virtualExecutor = registerExecutor(Executors.newFixedThreadPool(2)); + ExecutorService platformExecutor = registerExecutor(Executors.newFixedThreadPool(2)); + + when(actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)).thenReturn(virtualExecutor); + when(actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM)).thenReturn(platformExecutor); + when(actionCapability.runnerClient()).thenReturn(runnerClient); + when(actionCapability.listActions(Action.Status.EXECUTING, null)).thenReturn(Set.of()); + when(actionCapability.loadMetaActionInfo(anyString())).thenReturn(new MetaActionInfo( + false, + null, + Map.of(), + "demo action", + Set.of(), + Set.of(), + Set.of(), + false, + new com.alibaba.fastjson2.JSONObject() + )); + when(cognitionCapability.contextWorkspace()).thenReturn(new ContextWorkspace()); + + ExtractorResult extractorResult = new ExtractorResult(); + extractorResult.setOk(true); + extractorResult.setParams(Map.of("fresh", "value")); + when(paramsExtractor.execute(any())).thenReturn(extractorResult); + doAnswer(invocation -> { + MetaAction metaAction = invocation.getArgument(0); + metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS); + metaAction.getResult().setData("rerun-ok"); + return null; + }).when(runnerClient).submit(any(MetaAction.class)); + + ActionExecutor actionExecutor = new ActionExecutor(); + inject(actionExecutor, "actionCapability", actionCapability); + inject(actionExecutor, "cognitionCapability", cognitionCapability); + inject(actionExecutor, "paramsExtractor", paramsExtractor); + inject(actionExecutor, "actionCorrector", actionCorrector); + inject(actionExecutor, "actionCorrectionRecognizer", actionCorrectionRecognizer); + actionExecutor.init(); + + MetaAction metaAction = metaAction("command"); + metaAction.getParams().put("stale", "old"); + metaAction.getResult().setStatus(MetaAction.Result.Status.FAILED); + metaAction.getResult().setData("stale-meta"); + + ImmediateExecutableAction action = new ImmediateExecutableAction( + "urgent", + actionChain(metaAction), + "reason", + "desc", + "planner", + "replay-uuid" + ); + action.setStatus(Action.Status.EXECUTING); + action.setExecutingStage(99); + action.setResult("stale-result"); + action.getHistory().put(9, new ArrayList<>(List.of(new HistoryAction("old::action", "stale", "bad")))); + + actionExecutor.execute(action); + + waitUntilFinished(action); + + assertEquals(Action.Status.SUCCESS, action.getStatus()); + assertEquals(1, action.getExecutingStage()); + assertEquals("rerun-ok", action.getResult()); + assertEquals(1, action.getHistory().size()); + assertTrue(action.getHistory().containsKey(1)); + assertEquals("rerun-ok", action.getHistory().get(1).getFirst().result()); + assertEquals(Map.of("fresh", "value"), metaAction.getParams()); + assertEquals(MetaAction.Result.Status.SUCCESS, metaAction.getResult().getStatus()); + assertEquals("rerun-ok", metaAction.getResult().getData()); + } + + private void waitUntilFinished(ImmediateExecutableAction action) throws InterruptedException { + long deadline = System.currentTimeMillis() + 3000; + while (System.currentTimeMillis() < deadline) { + if (action.getStatus() == Action.Status.SUCCESS || action.getStatus() == Action.Status.FAILED) { + return; + } + Thread.sleep(20); + } + throw new AssertionError("action execution did not finish in time"); + } + + private ExecutorService registerExecutor(ExecutorService executorService) { + executors.add(executorService); + return executorService; + } +}