diff --git a/.idea/misc.xml b/.idea/misc.xml
index b94e4cca..2e852248 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -25,6 +25,8 @@
+
+
diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java
index 6cd7adab..2d2d673f 100644
--- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java
+++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java
@@ -3,7 +3,6 @@ package work.slhaf.partner.core.action;
import com.alibaba.fastjson2.JSONObject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import lombok.val;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import work.slhaf.partner.core.action.entity.ExecutableAction;
@@ -57,16 +56,6 @@ public class ActionCore implements StateSerializable {
// TODO 通过 Config 指定采用何种 runnerClient,当前只提供 LocalRunnerClient
runnerClient = new LocalRunnerClient(existedMetaActions, virtualExecutor, baseActionPath);
register();
- setupShutdownHook();
- }
-
- private void setupShutdownHook() {
- // 将执行中的行动状态置为失败
- val executingActionSet = listActions(ExecutableAction.Status.EXECUTING, null);
- for (ExecutableAction executableAction : executingActionSet) {
- executableAction.setStatus(ExecutableAction.Status.FAILED);
- executableAction.setResult("由于系统中断而失败");
- }
}
@CapabilityMethod
diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java
index 6ecab4fa..ec5bf34a 100644
--- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java
+++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java
@@ -48,6 +48,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
platformExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
runnerClient = actionCapability.runnerClient();
blockManager = new ExecutingActionBlockManager(cognitionCapability.contextWorkspace());
+
+ actionCapability.listActions(Action.Status.EXECUTING, null)
+ .forEach(this::execute);
}
public void execute(Action action) {
@@ -115,7 +118,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
private void handleExecutableAction(ExecutableAction executableAction) {
val source = executableAction.getSource();
- if (executableAction.getStatus() != Action.Status.PREPARE) {
+ val status = executableAction.getStatus();
+ if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) {
return;
}
val actionChain = executableAction.getActionChain();
@@ -124,6 +128,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
executableAction.setResult("行动链为空");
return;
}
+ normalizeExecutingStage(executableAction, actionChain);
// 注册执行中行动
val phaser = new Phaser();
executableAction.setStatus(Action.Status.EXECUTING);
@@ -133,13 +138,13 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
// 开始执行
val stageCursor = new Object() {
int stageCount;
- boolean executingStageUpdated;
- boolean stageCountUpdated;
+ boolean executingStageUpdated = false;
+ boolean stageCountUpdated = false;
void init() {
- stageCount = 0;
- executingStageUpdated = false;
- stageCountUpdated = false;
+ val orderList = new ArrayList<>(actionChain.keySet());
+ orderList.sort(Integer::compareTo);
+ stageCount = orderList.indexOf(executableAction.getExecutingStage());
update();
}
@@ -376,6 +381,33 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
return stageIndex >= 2 && (stageIndex - 2) % 2 == 0;
}
+ private void normalizeExecutingStage(ExecutableAction executableAction, Map> actionChain) {
+ Integer firstStage = actionChain.keySet().stream()
+ .min(Integer::compareTo)
+ .orElse(null);
+ if (firstStage == null) {
+ return;
+ }
+ if (actionChain.containsKey(executableAction.getExecutingStage())) {
+ return;
+ }
+ if (executableAction.getStatus() == Action.Status.EXECUTING) {
+ resetExecutableActionForReplay(executableAction);
+ }
+ executableAction.setExecutingStage(firstStage);
+ }
+
+ private void resetExecutableActionForReplay(ExecutableAction executableAction) {
+ executableAction.getHistory().clear();
+ executableAction.getActionChain().values().forEach(metaActions -> metaActions.forEach(metaAction -> {
+ metaAction.getParams().clear();
+ metaAction.getResult().reset();
+ }));
+ if (hasExecutableResult(executableAction)) {
+ executableAction.setResult("");
+ }
+ }
+
private void ensureExecutableResult(ExecutableAction executableAction, boolean failed, String failureReason) {
if (hasExecutableResult(executableAction)) {
return;
diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/scheduler/ActionScheduler.kt b/Partner-Core/src/main/java/work/slhaf/partner/module/action/scheduler/ActionScheduler.kt
index 57caf009..fbd2943f 100644
--- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/scheduler/ActionScheduler.kt
+++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/scheduler/ActionScheduler.kt
@@ -55,7 +55,7 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
.stream()
.filter { it is SchedulableExecutableAction }
.map { it as SchedulableExecutableAction }
- .collect(Collectors.toSet())
+ .collect(Collectors.toSet())
val persisted: MutableSet = mutableSetOf()
persisted.addAll(persistedExecutable)
synchronized(runtimeSchedulables) {
diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/action/executor/ActionExecutorTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/action/executor/ActionExecutorTest.java
new file mode 100644
index 00000000..3bb524a8
--- /dev/null
+++ b/Partner-Core/src/test/java/work/slhaf/partner/module/action/executor/ActionExecutorTest.java
@@ -0,0 +1,204 @@
+package work.slhaf.partner.module.action.executor;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import work.slhaf.partner.core.action.ActionCapability;
+import work.slhaf.partner.core.action.ActionCore;
+import work.slhaf.partner.core.action.entity.*;
+import work.slhaf.partner.core.action.runner.RunnerClient;
+import work.slhaf.partner.core.cognition.CognitionCapability;
+import work.slhaf.partner.core.cognition.ContextWorkspace;
+import work.slhaf.partner.module.action.executor.entity.ExtractorResult;
+import work.slhaf.partner.module.action.executor.entity.HistoryAction;
+
+import java.lang.reflect.Field;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+class ActionExecutorTest {
+
+ private final List executors = new ArrayList<>();
+
+ private static void inject(Object target, String fieldName, Object value) throws Exception {
+ Field field = ActionExecutor.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ private static Map> actionChain(MetaAction metaAction) {
+ Map> actionChain = new LinkedHashMap<>();
+ actionChain.put(1, new ArrayList<>(List.of(metaAction)));
+ return actionChain;
+ }
+
+ private static MetaAction metaAction(String name) {
+ return new MetaAction(name, false, null, MetaAction.Type.BUILTIN, "builtin");
+ }
+
+ @AfterEach
+ void tearDown() throws InterruptedException {
+ for (ExecutorService executor : executors) {
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ void shouldResumeExecutingImmediateAndSchedulableActionsOnInit() throws Exception {
+ ActionCapability actionCapability = Mockito.mock(ActionCapability.class);
+ CognitionCapability cognitionCapability = Mockito.mock(CognitionCapability.class);
+
+ ExecutorService virtualExecutor = registerExecutor(Executors.newFixedThreadPool(2));
+ ExecutorService platformExecutor = registerExecutor(Executors.newFixedThreadPool(2));
+ RunnerClient runnerClient = Mockito.mock(RunnerClient.class);
+
+ ImmediateExecutableAction immediateAction = new ImmediateExecutableAction(
+ "urgent",
+ actionChain(metaAction("command")),
+ "reason",
+ "desc",
+ "planner",
+ "immediate-uuid"
+ );
+ immediateAction.setStatus(Action.Status.EXECUTING);
+ immediateAction.setExecutingStage(1);
+
+ SchedulableExecutableAction schedulableAction = new SchedulableExecutableAction(
+ "steady",
+ actionChain(metaAction("refresh")),
+ "reason",
+ "desc",
+ "scheduler",
+ Schedulable.ScheduleType.CYCLE,
+ "0 0/5 * * * ?",
+ "schedulable-uuid"
+ );
+ schedulableAction.setStatus(Action.Status.EXECUTING);
+ schedulableAction.setExecutingStage(1);
+
+ when(actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)).thenReturn(virtualExecutor);
+ when(actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM)).thenReturn(platformExecutor);
+ when(actionCapability.runnerClient()).thenReturn(runnerClient);
+ when(actionCapability.listActions(Action.Status.EXECUTING, null)).thenReturn(Set.of(immediateAction, schedulableAction));
+ when(cognitionCapability.contextWorkspace()).thenReturn(new ContextWorkspace());
+
+ ActionExecutor actionExecutor = spy(new ActionExecutor());
+ inject(actionExecutor, "actionCapability", actionCapability);
+ inject(actionExecutor, "cognitionCapability", cognitionCapability);
+ inject(actionExecutor, "paramsExtractor", Mockito.mock(ParamsExtractor.class));
+ inject(actionExecutor, "actionCorrector", Mockito.mock(ActionCorrector.class));
+ inject(actionExecutor, "actionCorrectionRecognizer", Mockito.mock(ActionCorrectionRecognizer.class));
+ doNothing().when(actionExecutor).execute(any(Action.class));
+
+ actionExecutor.init();
+
+ verify(actionExecutor, times(1)).execute(immediateAction);
+ verify(actionExecutor, times(1)).execute(schedulableAction);
+ }
+
+ @Test
+ void shouldReplayFromFirstStageWhenExecutingStageIsInvalid() throws Exception {
+ ActionCapability actionCapability = Mockito.mock(ActionCapability.class);
+ CognitionCapability cognitionCapability = Mockito.mock(CognitionCapability.class);
+ ParamsExtractor paramsExtractor = Mockito.mock(ParamsExtractor.class);
+ ActionCorrector actionCorrector = Mockito.mock(ActionCorrector.class);
+ ActionCorrectionRecognizer actionCorrectionRecognizer = Mockito.mock(ActionCorrectionRecognizer.class);
+ RunnerClient runnerClient = Mockito.mock(RunnerClient.class);
+
+ ExecutorService virtualExecutor = registerExecutor(Executors.newFixedThreadPool(2));
+ ExecutorService platformExecutor = registerExecutor(Executors.newFixedThreadPool(2));
+
+ when(actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)).thenReturn(virtualExecutor);
+ when(actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM)).thenReturn(platformExecutor);
+ when(actionCapability.runnerClient()).thenReturn(runnerClient);
+ when(actionCapability.listActions(Action.Status.EXECUTING, null)).thenReturn(Set.of());
+ when(actionCapability.loadMetaActionInfo(anyString())).thenReturn(new MetaActionInfo(
+ false,
+ null,
+ Map.of(),
+ "demo action",
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false,
+ new com.alibaba.fastjson2.JSONObject()
+ ));
+ when(cognitionCapability.contextWorkspace()).thenReturn(new ContextWorkspace());
+
+ ExtractorResult extractorResult = new ExtractorResult();
+ extractorResult.setOk(true);
+ extractorResult.setParams(Map.of("fresh", "value"));
+ when(paramsExtractor.execute(any())).thenReturn(extractorResult);
+ doAnswer(invocation -> {
+ MetaAction metaAction = invocation.getArgument(0);
+ metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS);
+ metaAction.getResult().setData("rerun-ok");
+ return null;
+ }).when(runnerClient).submit(any(MetaAction.class));
+
+ ActionExecutor actionExecutor = new ActionExecutor();
+ inject(actionExecutor, "actionCapability", actionCapability);
+ inject(actionExecutor, "cognitionCapability", cognitionCapability);
+ inject(actionExecutor, "paramsExtractor", paramsExtractor);
+ inject(actionExecutor, "actionCorrector", actionCorrector);
+ inject(actionExecutor, "actionCorrectionRecognizer", actionCorrectionRecognizer);
+ actionExecutor.init();
+
+ MetaAction metaAction = metaAction("command");
+ metaAction.getParams().put("stale", "old");
+ metaAction.getResult().setStatus(MetaAction.Result.Status.FAILED);
+ metaAction.getResult().setData("stale-meta");
+
+ ImmediateExecutableAction action = new ImmediateExecutableAction(
+ "urgent",
+ actionChain(metaAction),
+ "reason",
+ "desc",
+ "planner",
+ "replay-uuid"
+ );
+ action.setStatus(Action.Status.EXECUTING);
+ action.setExecutingStage(99);
+ action.setResult("stale-result");
+ action.getHistory().put(9, new ArrayList<>(List.of(new HistoryAction("old::action", "stale", "bad"))));
+
+ actionExecutor.execute(action);
+
+ waitUntilFinished(action);
+
+ assertEquals(Action.Status.SUCCESS, action.getStatus());
+ assertEquals(1, action.getExecutingStage());
+ assertEquals("rerun-ok", action.getResult());
+ assertEquals(1, action.getHistory().size());
+ assertTrue(action.getHistory().containsKey(1));
+ assertEquals("rerun-ok", action.getHistory().get(1).getFirst().result());
+ assertEquals(Map.of("fresh", "value"), metaAction.getParams());
+ assertEquals(MetaAction.Result.Status.SUCCESS, metaAction.getResult().getStatus());
+ assertEquals("rerun-ok", metaAction.getResult().getData());
+ }
+
+ private void waitUntilFinished(ImmediateExecutableAction action) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + 3000;
+ while (System.currentTimeMillis() < deadline) {
+ if (action.getStatus() == Action.Status.SUCCESS || action.getStatus() == Action.Status.FAILED) {
+ return;
+ }
+ Thread.sleep(20);
+ }
+ throw new AssertionError("action execution did not finish in time");
+ }
+
+ private ExecutorService registerExecutor(ExecutorService executorService) {
+ executors.add(executorService);
+ return executorService;
+ }
+}