mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 08:43:02 +08:00
feat(action): support continuing executable actions after state restored
This commit is contained in:
2
.idea/misc.xml
generated
2
.idea/misc.xml
generated
@@ -25,6 +25,8 @@
|
|||||||
<writeAnnotation name="work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability" />
|
<writeAnnotation name="work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability" />
|
||||||
<writeAnnotation name="work.slhaf.partner.api.agent.factory.component.annotation.InjectModule" />
|
<writeAnnotation name="work.slhaf.partner.api.agent.factory.component.annotation.InjectModule" />
|
||||||
<writeAnnotation name="work.slhaf.partner.api.agent.factory.module.annotation.InjectModule" />
|
<writeAnnotation name="work.slhaf.partner.api.agent.factory.module.annotation.InjectModule" />
|
||||||
|
<writeAnnotation name="work.slhaf.partner.framework.agent.factory.capability.annotation.InjectCapability" />
|
||||||
|
<writeAnnotation name="work.slhaf.partner.framework.agent.factory.component.annotation.InjectModule" />
|
||||||
</writeAnnotations>
|
</writeAnnotations>
|
||||||
</component>
|
</component>
|
||||||
<component name="ExternalStorageConfigurationManager" enabled="true" />
|
<component name="ExternalStorageConfigurationManager" enabled="true" />
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package work.slhaf.partner.core.action;
|
|||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import lombok.val;
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
import work.slhaf.partner.core.action.entity.ExecutableAction;
|
import work.slhaf.partner.core.action.entity.ExecutableAction;
|
||||||
@@ -57,16 +56,6 @@ public class ActionCore implements StateSerializable {
|
|||||||
// TODO 通过 Config 指定采用何种 runnerClient,当前只提供 LocalRunnerClient
|
// TODO 通过 Config 指定采用何种 runnerClient,当前只提供 LocalRunnerClient
|
||||||
runnerClient = new LocalRunnerClient(existedMetaActions, virtualExecutor, baseActionPath);
|
runnerClient = new LocalRunnerClient(existedMetaActions, virtualExecutor, baseActionPath);
|
||||||
register();
|
register();
|
||||||
setupShutdownHook();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setupShutdownHook() {
|
|
||||||
// 将执行中的行动状态置为失败
|
|
||||||
val executingActionSet = listActions(ExecutableAction.Status.EXECUTING, null);
|
|
||||||
for (ExecutableAction executableAction : executingActionSet) {
|
|
||||||
executableAction.setStatus(ExecutableAction.Status.FAILED);
|
|
||||||
executableAction.setResult("由于系统中断而失败");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@CapabilityMethod
|
@CapabilityMethod
|
||||||
|
|||||||
@@ -48,6 +48,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
|||||||
platformExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
|
platformExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
|
||||||
runnerClient = actionCapability.runnerClient();
|
runnerClient = actionCapability.runnerClient();
|
||||||
blockManager = new ExecutingActionBlockManager(cognitionCapability.contextWorkspace());
|
blockManager = new ExecutingActionBlockManager(cognitionCapability.contextWorkspace());
|
||||||
|
|
||||||
|
actionCapability.listActions(Action.Status.EXECUTING, null)
|
||||||
|
.forEach(this::execute);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute(Action action) {
|
public void execute(Action action) {
|
||||||
@@ -115,7 +118,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
|||||||
|
|
||||||
private void handleExecutableAction(ExecutableAction executableAction) {
|
private void handleExecutableAction(ExecutableAction executableAction) {
|
||||||
val source = executableAction.getSource();
|
val source = executableAction.getSource();
|
||||||
if (executableAction.getStatus() != Action.Status.PREPARE) {
|
val status = executableAction.getStatus();
|
||||||
|
if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
val actionChain = executableAction.getActionChain();
|
val actionChain = executableAction.getActionChain();
|
||||||
@@ -124,6 +128,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
|||||||
executableAction.setResult("行动链为空");
|
executableAction.setResult("行动链为空");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
normalizeExecutingStage(executableAction, actionChain);
|
||||||
// 注册执行中行动
|
// 注册执行中行动
|
||||||
val phaser = new Phaser();
|
val phaser = new Phaser();
|
||||||
executableAction.setStatus(Action.Status.EXECUTING);
|
executableAction.setStatus(Action.Status.EXECUTING);
|
||||||
@@ -133,13 +138,13 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
|||||||
// 开始执行
|
// 开始执行
|
||||||
val stageCursor = new Object() {
|
val stageCursor = new Object() {
|
||||||
int stageCount;
|
int stageCount;
|
||||||
boolean executingStageUpdated;
|
boolean executingStageUpdated = false;
|
||||||
boolean stageCountUpdated;
|
boolean stageCountUpdated = false;
|
||||||
|
|
||||||
void init() {
|
void init() {
|
||||||
stageCount = 0;
|
val orderList = new ArrayList<>(actionChain.keySet());
|
||||||
executingStageUpdated = false;
|
orderList.sort(Integer::compareTo);
|
||||||
stageCountUpdated = false;
|
stageCount = orderList.indexOf(executableAction.getExecutingStage());
|
||||||
update();
|
update();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -376,6 +381,33 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
|||||||
return stageIndex >= 2 && (stageIndex - 2) % 2 == 0;
|
return stageIndex >= 2 && (stageIndex - 2) % 2 == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void normalizeExecutingStage(ExecutableAction executableAction, Map<Integer, List<MetaAction>> actionChain) {
|
||||||
|
Integer firstStage = actionChain.keySet().stream()
|
||||||
|
.min(Integer::compareTo)
|
||||||
|
.orElse(null);
|
||||||
|
if (firstStage == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (actionChain.containsKey(executableAction.getExecutingStage())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (executableAction.getStatus() == Action.Status.EXECUTING) {
|
||||||
|
resetExecutableActionForReplay(executableAction);
|
||||||
|
}
|
||||||
|
executableAction.setExecutingStage(firstStage);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resetExecutableActionForReplay(ExecutableAction executableAction) {
|
||||||
|
executableAction.getHistory().clear();
|
||||||
|
executableAction.getActionChain().values().forEach(metaActions -> metaActions.forEach(metaAction -> {
|
||||||
|
metaAction.getParams().clear();
|
||||||
|
metaAction.getResult().reset();
|
||||||
|
}));
|
||||||
|
if (hasExecutableResult(executableAction)) {
|
||||||
|
executableAction.setResult("");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void ensureExecutableResult(ExecutableAction executableAction, boolean failed, String failureReason) {
|
private void ensureExecutableResult(ExecutableAction executableAction, boolean failed, String failureReason) {
|
||||||
if (hasExecutableResult(executableAction)) {
|
if (hasExecutableResult(executableAction)) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
|||||||
.stream()
|
.stream()
|
||||||
.filter { it is SchedulableExecutableAction }
|
.filter { it is SchedulableExecutableAction }
|
||||||
.map { it as SchedulableExecutableAction }
|
.map { it as SchedulableExecutableAction }
|
||||||
.collect(Collectors.toSet<SchedulableExecutableAction>())
|
.collect(Collectors.toSet())
|
||||||
val persisted: MutableSet<Schedulable> = mutableSetOf()
|
val persisted: MutableSet<Schedulable> = mutableSetOf()
|
||||||
persisted.addAll(persistedExecutable)
|
persisted.addAll(persistedExecutable)
|
||||||
synchronized(runtimeSchedulables) {
|
synchronized(runtimeSchedulables) {
|
||||||
|
|||||||
@@ -0,0 +1,204 @@
|
|||||||
|
package work.slhaf.partner.module.action.executor;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import work.slhaf.partner.core.action.ActionCapability;
|
||||||
|
import work.slhaf.partner.core.action.ActionCore;
|
||||||
|
import work.slhaf.partner.core.action.entity.*;
|
||||||
|
import work.slhaf.partner.core.action.runner.RunnerClient;
|
||||||
|
import work.slhaf.partner.core.cognition.CognitionCapability;
|
||||||
|
import work.slhaf.partner.core.cognition.ContextWorkspace;
|
||||||
|
import work.slhaf.partner.module.action.executor.entity.ExtractorResult;
|
||||||
|
import work.slhaf.partner.module.action.executor.entity.HistoryAction;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
class ActionExecutorTest {
|
||||||
|
|
||||||
|
private final List<ExecutorService> executors = new ArrayList<>();
|
||||||
|
|
||||||
|
private static void inject(Object target, String fieldName, Object value) throws Exception {
|
||||||
|
Field field = ActionExecutor.class.getDeclaredField(fieldName);
|
||||||
|
field.setAccessible(true);
|
||||||
|
field.set(target, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<Integer, List<MetaAction>> actionChain(MetaAction metaAction) {
|
||||||
|
Map<Integer, List<MetaAction>> actionChain = new LinkedHashMap<>();
|
||||||
|
actionChain.put(1, new ArrayList<>(List.of(metaAction)));
|
||||||
|
return actionChain;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MetaAction metaAction(String name) {
|
||||||
|
return new MetaAction(name, false, null, MetaAction.Type.BUILTIN, "builtin");
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
void tearDown() throws InterruptedException {
|
||||||
|
for (ExecutorService executor : executors) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
executor.awaitTermination(1, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldResumeExecutingImmediateAndSchedulableActionsOnInit() throws Exception {
|
||||||
|
ActionCapability actionCapability = Mockito.mock(ActionCapability.class);
|
||||||
|
CognitionCapability cognitionCapability = Mockito.mock(CognitionCapability.class);
|
||||||
|
|
||||||
|
ExecutorService virtualExecutor = registerExecutor(Executors.newFixedThreadPool(2));
|
||||||
|
ExecutorService platformExecutor = registerExecutor(Executors.newFixedThreadPool(2));
|
||||||
|
RunnerClient runnerClient = Mockito.mock(RunnerClient.class);
|
||||||
|
|
||||||
|
ImmediateExecutableAction immediateAction = new ImmediateExecutableAction(
|
||||||
|
"urgent",
|
||||||
|
actionChain(metaAction("command")),
|
||||||
|
"reason",
|
||||||
|
"desc",
|
||||||
|
"planner",
|
||||||
|
"immediate-uuid"
|
||||||
|
);
|
||||||
|
immediateAction.setStatus(Action.Status.EXECUTING);
|
||||||
|
immediateAction.setExecutingStage(1);
|
||||||
|
|
||||||
|
SchedulableExecutableAction schedulableAction = new SchedulableExecutableAction(
|
||||||
|
"steady",
|
||||||
|
actionChain(metaAction("refresh")),
|
||||||
|
"reason",
|
||||||
|
"desc",
|
||||||
|
"scheduler",
|
||||||
|
Schedulable.ScheduleType.CYCLE,
|
||||||
|
"0 0/5 * * * ?",
|
||||||
|
"schedulable-uuid"
|
||||||
|
);
|
||||||
|
schedulableAction.setStatus(Action.Status.EXECUTING);
|
||||||
|
schedulableAction.setExecutingStage(1);
|
||||||
|
|
||||||
|
when(actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)).thenReturn(virtualExecutor);
|
||||||
|
when(actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM)).thenReturn(platformExecutor);
|
||||||
|
when(actionCapability.runnerClient()).thenReturn(runnerClient);
|
||||||
|
when(actionCapability.listActions(Action.Status.EXECUTING, null)).thenReturn(Set.of(immediateAction, schedulableAction));
|
||||||
|
when(cognitionCapability.contextWorkspace()).thenReturn(new ContextWorkspace());
|
||||||
|
|
||||||
|
ActionExecutor actionExecutor = spy(new ActionExecutor());
|
||||||
|
inject(actionExecutor, "actionCapability", actionCapability);
|
||||||
|
inject(actionExecutor, "cognitionCapability", cognitionCapability);
|
||||||
|
inject(actionExecutor, "paramsExtractor", Mockito.mock(ParamsExtractor.class));
|
||||||
|
inject(actionExecutor, "actionCorrector", Mockito.mock(ActionCorrector.class));
|
||||||
|
inject(actionExecutor, "actionCorrectionRecognizer", Mockito.mock(ActionCorrectionRecognizer.class));
|
||||||
|
doNothing().when(actionExecutor).execute(any(Action.class));
|
||||||
|
|
||||||
|
actionExecutor.init();
|
||||||
|
|
||||||
|
verify(actionExecutor, times(1)).execute(immediateAction);
|
||||||
|
verify(actionExecutor, times(1)).execute(schedulableAction);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldReplayFromFirstStageWhenExecutingStageIsInvalid() throws Exception {
|
||||||
|
ActionCapability actionCapability = Mockito.mock(ActionCapability.class);
|
||||||
|
CognitionCapability cognitionCapability = Mockito.mock(CognitionCapability.class);
|
||||||
|
ParamsExtractor paramsExtractor = Mockito.mock(ParamsExtractor.class);
|
||||||
|
ActionCorrector actionCorrector = Mockito.mock(ActionCorrector.class);
|
||||||
|
ActionCorrectionRecognizer actionCorrectionRecognizer = Mockito.mock(ActionCorrectionRecognizer.class);
|
||||||
|
RunnerClient runnerClient = Mockito.mock(RunnerClient.class);
|
||||||
|
|
||||||
|
ExecutorService virtualExecutor = registerExecutor(Executors.newFixedThreadPool(2));
|
||||||
|
ExecutorService platformExecutor = registerExecutor(Executors.newFixedThreadPool(2));
|
||||||
|
|
||||||
|
when(actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)).thenReturn(virtualExecutor);
|
||||||
|
when(actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM)).thenReturn(platformExecutor);
|
||||||
|
when(actionCapability.runnerClient()).thenReturn(runnerClient);
|
||||||
|
when(actionCapability.listActions(Action.Status.EXECUTING, null)).thenReturn(Set.of());
|
||||||
|
when(actionCapability.loadMetaActionInfo(anyString())).thenReturn(new MetaActionInfo(
|
||||||
|
false,
|
||||||
|
null,
|
||||||
|
Map.of(),
|
||||||
|
"demo action",
|
||||||
|
Set.of(),
|
||||||
|
Set.of(),
|
||||||
|
Set.of(),
|
||||||
|
false,
|
||||||
|
new com.alibaba.fastjson2.JSONObject()
|
||||||
|
));
|
||||||
|
when(cognitionCapability.contextWorkspace()).thenReturn(new ContextWorkspace());
|
||||||
|
|
||||||
|
ExtractorResult extractorResult = new ExtractorResult();
|
||||||
|
extractorResult.setOk(true);
|
||||||
|
extractorResult.setParams(Map.of("fresh", "value"));
|
||||||
|
when(paramsExtractor.execute(any())).thenReturn(extractorResult);
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
MetaAction metaAction = invocation.getArgument(0);
|
||||||
|
metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS);
|
||||||
|
metaAction.getResult().setData("rerun-ok");
|
||||||
|
return null;
|
||||||
|
}).when(runnerClient).submit(any(MetaAction.class));
|
||||||
|
|
||||||
|
ActionExecutor actionExecutor = new ActionExecutor();
|
||||||
|
inject(actionExecutor, "actionCapability", actionCapability);
|
||||||
|
inject(actionExecutor, "cognitionCapability", cognitionCapability);
|
||||||
|
inject(actionExecutor, "paramsExtractor", paramsExtractor);
|
||||||
|
inject(actionExecutor, "actionCorrector", actionCorrector);
|
||||||
|
inject(actionExecutor, "actionCorrectionRecognizer", actionCorrectionRecognizer);
|
||||||
|
actionExecutor.init();
|
||||||
|
|
||||||
|
MetaAction metaAction = metaAction("command");
|
||||||
|
metaAction.getParams().put("stale", "old");
|
||||||
|
metaAction.getResult().setStatus(MetaAction.Result.Status.FAILED);
|
||||||
|
metaAction.getResult().setData("stale-meta");
|
||||||
|
|
||||||
|
ImmediateExecutableAction action = new ImmediateExecutableAction(
|
||||||
|
"urgent",
|
||||||
|
actionChain(metaAction),
|
||||||
|
"reason",
|
||||||
|
"desc",
|
||||||
|
"planner",
|
||||||
|
"replay-uuid"
|
||||||
|
);
|
||||||
|
action.setStatus(Action.Status.EXECUTING);
|
||||||
|
action.setExecutingStage(99);
|
||||||
|
action.setResult("stale-result");
|
||||||
|
action.getHistory().put(9, new ArrayList<>(List.of(new HistoryAction("old::action", "stale", "bad"))));
|
||||||
|
|
||||||
|
actionExecutor.execute(action);
|
||||||
|
|
||||||
|
waitUntilFinished(action);
|
||||||
|
|
||||||
|
assertEquals(Action.Status.SUCCESS, action.getStatus());
|
||||||
|
assertEquals(1, action.getExecutingStage());
|
||||||
|
assertEquals("rerun-ok", action.getResult());
|
||||||
|
assertEquals(1, action.getHistory().size());
|
||||||
|
assertTrue(action.getHistory().containsKey(1));
|
||||||
|
assertEquals("rerun-ok", action.getHistory().get(1).getFirst().result());
|
||||||
|
assertEquals(Map.of("fresh", "value"), metaAction.getParams());
|
||||||
|
assertEquals(MetaAction.Result.Status.SUCCESS, metaAction.getResult().getStatus());
|
||||||
|
assertEquals("rerun-ok", metaAction.getResult().getData());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitUntilFinished(ImmediateExecutableAction action) throws InterruptedException {
|
||||||
|
long deadline = System.currentTimeMillis() + 3000;
|
||||||
|
while (System.currentTimeMillis() < deadline) {
|
||||||
|
if (action.getStatus() == Action.Status.SUCCESS || action.getStatus() == Action.Status.FAILED) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Thread.sleep(20);
|
||||||
|
}
|
||||||
|
throw new AssertionError("action execution did not finish in time");
|
||||||
|
}
|
||||||
|
|
||||||
|
private ExecutorService registerExecutor(ExecutorService executorService) {
|
||||||
|
executors.add(executorService);
|
||||||
|
return executorService;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user