diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionCorrectionRecognizer.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionCorrectionRecognizer.java new file mode 100644 index 00000000..19ec514e --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionCorrectionRecognizer.java @@ -0,0 +1,51 @@ +package work.slhaf.partner.module.modules.action.executor; + +import com.alibaba.fastjson2.JSONObject; +import lombok.val; +import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule; +import work.slhaf.partner.api.agent.factory.component.abstracts.ActivateModel; +import work.slhaf.partner.api.chat.pojo.Message; +import work.slhaf.partner.module.modules.action.executor.entity.CorrectionRecognizerInput; +import work.slhaf.partner.module.modules.action.executor.entity.CorrectionRecognizerResult; + +import java.util.List; + +/** + * 负责在行动链执行过程中判断当前进度是否异常,是否需要引入 corrector 介入。 + */ +public class ActionCorrectionRecognizer extends AbstractAgentModule.Sub implements ActivateModel { + @Override + public CorrectionRecognizerResult execute(CorrectionRecognizerInput input) { + val prompt = buildPrompt(input); + return formattedChat(List.of(new Message(Message.Character.USER, prompt)), CorrectionRecognizerResult.class); + } + + private String buildPrompt(CorrectionRecognizerInput input) { + val prompt = new JSONObject(); + prompt.put("[行动来源]", input.getSource()); + prompt.put("[行动倾向]", input.getTendency()); + prompt.put("[行动描述]", input.getDescription()); + prompt.put("[行动原因]", input.getReason()); + prompt.put("[当前阶段]", input.getCurrentStage()); + prompt.put("[当前阶段位置]", input.getCurrentStageIndex()); + prompt.put("[是否最后阶段]", input.isLastStage()); + val stageList = prompt.putArray("[当前行动链阶段列表]"); + stageList.addAll(input.getOrderedStages()); + val history = prompt.putArray("[当前阶段已执行情况]"); + if (input.getHistory() != null) { + history.addAll(input.getHistory()); + } + val metaActions = prompt.putArray("[当前阶段行动结果]"); + metaActions.addAll(input.getCurrentStageMetaActions()); + val messages = prompt.putArray("[近期对话]"); + messages.addAll(input.getRecentMessages()); + val memory = prompt.putArray("[已激活记忆]"); + memory.addAll(input.getActivatedSlices()); + return prompt.toJSONString(); + } + + @Override + public String modelKey() { + return "action_correction_recognizer"; + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java index a51b3fc8..d6ba8375 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class ActionExecutor extends AbstractAgentModule.Standalone { + private static final int MAX_EXTRACTOR_ATTEMPTS = 3; private final AssemblyHelper assemblyHelper = new AssemblyHelper(); @InjectCapability private ActionCapability actionCapability; @@ -30,9 +31,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { @InjectModule private ParamsExtractor paramsExtractor; @InjectModule - private ActionRepairer actionRepairer; - @InjectModule private ActionCorrector actionCorrector; + @InjectModule + private ActionCorrectionRecognizer actionCorrectionRecognizer; private ExecutorService virtualExecutor; private ExecutorService platformExecutor; @@ -156,6 +157,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { stageCursor.init(); do { val metaActions = actionChain.get(executableAction.getExecutingStage()); + val recognizerRecord = startRecognizerIfNeeded(executableAction, phaserRecord); val listeningRecord = executeAndListening(metaActions, phaserRecord, source); phaser.awaitAdvance(listeningRecord.phase()); // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 @@ -166,12 +168,17 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { // 立即尝试推进,本次推进中,如果前方仍有未执行 stage,将执行一次阶段推进 stageCursor.requestAdvance(); } + boolean shouldRunCorrector = hasFailedMetaAction(metaActions); try { - // 针对行动链进行修正,修正需要传入执行历史、行动目标等内容 - // 如果后续运行 corrector 触发频率较高,可考虑增加重试机制 - val correctorInput = assemblyHelper.buildCorrectorInput(executableAction, source); - val correctorResult = actionCorrector.execute(correctorInput); - actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction); + if (!shouldRunCorrector) { + val recognizerResult = resolveRecognizerResult(recognizerRecord); + shouldRunCorrector = recognizerResult != null && recognizerResult.isNeedCorrection(); + } + if (shouldRunCorrector) { + val correctorInput = assemblyHelper.buildCorrectorInput(executableAction); + val correctorResult = actionCorrector.execute(correctorInput); + actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction); + } } catch (Exception ignored) { } // 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时,但 corrector 执行期间发生了 actionChain 的插入事件 @@ -235,43 +242,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { return () -> { val actionKey = metaAction.getKey(); try { - val result = metaAction.getResult(); - do { - val actionData = phaserRecord.executableAction(); - val executingStage = actionData.getExecutingStage(); - val historyActionResults = actionData.getHistory().get(executingStage); - val additionalContext = actionData.getAdditionalContext().get(executingStage); - val extractorInput = assemblyHelper.buildExtractorInput(metaAction, source, historyActionResults, additionalContext); - val extractorResult = paramsExtractor.execute(extractorInput); - if (extractorResult.isOk()) { - metaAction.getParams().putAll(extractorResult.getParams()); - runnerClient.submit(metaAction); - val historyAction = new HistoryAction(actionKey, actionCapability.loadMetaActionInfo(actionKey).getDescription(), metaAction.getResult().getData()); - actionData.getHistory() - .computeIfAbsent(executingStage, integer -> new ArrayList<>()) - .add(historyAction); - } else { - val repairerInput = assemblyHelper.buildRepairerInput(historyActionResults, metaAction, source); - val repairerResult = actionRepairer.execute(repairerInput); - switch (repairerResult.getStatus()) { - // 如果本次修复被认为成功,则将补充的信息添加至 additionalContext - case RepairerResult.RepairerStatus.OK -> { - additionalContext.addAll(repairerResult.getFixedData()); - result.setStatus(MetaAction.Result.Status.WAITING); - } - // 此处的修复失败来自系统内部的执行失败:其余方式均不可行时将回退至当前分支 - case RepairerResult.RepairerStatus.FAILED -> { - result.setStatus(MetaAction.Result.Status.FAILED); - result.setData("行动执行失败"); - } - // 此处对应已在 repairer 内发起外部请求,故在此处进行阻塞 - case RepairerResult.RepairerStatus.ACQUIRE -> { - phaserRecord.interrupt(); - result.setStatus(MetaAction.Result.Status.WAITING); - } - } - } - } while (result.getStatus().equals(MetaAction.Result.Status.WAITING)); + executeMetaActionWithRetry(metaAction, phaserRecord, source); } catch (Exception e) { log.error("Action executing failed: {}", actionKey, e); } finally { @@ -280,6 +251,120 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { }; } + private void executeMetaActionWithRetry(MetaAction metaAction, PhaserRecord phaserRecord, String source) { + String failureReason = "参数提取失败"; + val actionData = phaserRecord.executableAction(); + val actionKey = metaAction.getKey(); + for (int attempt = 1; attempt <= MAX_EXTRACTOR_ATTEMPTS; attempt++) { + val result = metaAction.getResult(); + result.reset(); + metaAction.getParams().clear(); + + val executingStage = actionData.getExecutingStage(); + val historyActionResults = actionData.getHistory().get(executingStage); + val additionalContext = actionData.getAdditionalContext().get(executingStage); + val extractorInput = assemblyHelper.buildExtractorInput(metaAction, source, historyActionResults, additionalContext); + ExtractorResult extractorResult; + try { + extractorResult = paramsExtractor.execute(extractorInput); + } catch (Exception e) { + failureReason = buildAttemptFailureReason("参数提取异常", e.getLocalizedMessage()); + continue; + } + + if (extractorResult == null || !extractorResult.isOk()) { + failureReason = buildAttemptFailureReason("参数提取失败", null); + continue; + } + + if (extractorResult.getParams() != null) { + metaAction.getParams().putAll(extractorResult.getParams()); + } + + try { + runnerClient.submit(metaAction); + } catch (Exception e) { + failureReason = buildAttemptFailureReason("行动执行异常", e.getLocalizedMessage()); + continue; + } + + if (result.getStatus() == MetaAction.Result.Status.SUCCESS) { + val historyAction = new HistoryAction(actionKey, actionCapability.loadMetaActionInfo(actionKey).getDescription(), result.getData()); + actionData.getHistory() + .computeIfAbsent(executingStage, integer -> new ArrayList<>()) + .add(historyAction); + return; + } + + failureReason = buildAttemptFailureReason("行动执行失败", result.getData()); + } + metaAction.getResult().setStatus(MetaAction.Result.Status.FAILED); + metaAction.getResult().setData(failureReason); + } + + private RecognizerTaskRecord startRecognizerIfNeeded(ExecutableAction executableAction, PhaserRecord phaserRecord) { + if (!shouldRunCorrectionRecognizer(executableAction)) { + return RecognizerTaskRecord.disabled(); + } + val recognizerInput = assemblyHelper.buildRecognizerInput(executableAction); + val task = buildRecognizerTask(recognizerInput, phaserRecord.phaser()); + Future future = virtualExecutor.submit(task); + return new RecognizerTaskRecord(true, future); + } + + private Callable buildRecognizerTask(CorrectionRecognizerInput input, Phaser phaser) { + phaser.register(); + return () -> { + try { + return actionCorrectionRecognizer.execute(input); + } finally { + phaser.arriveAndDeregister(); + } + }; + } + + private CorrectionRecognizerResult resolveRecognizerResult(RecognizerTaskRecord record) { + if (record == null || !record.enabled() || record.future() == null) { + return null; + } + try { + if (!record.future().isDone()) { + return null; + } + return record.future().get(); + } catch (Exception e) { + return null; + } + } + + private String buildAttemptFailureReason(String prefix, String detail) { + if (detail == null || detail.isBlank()) { + return prefix; + } + return prefix + ": " + detail; + } + + private boolean hasFailedMetaAction(List metaActions) { + return metaActions.stream().anyMatch(metaAction -> metaAction.getResult().getStatus() == MetaAction.Result.Status.FAILED); + } + + private boolean shouldRunCorrectionRecognizer(ExecutableAction executableAction) { + val orderedStages = new ArrayList<>(executableAction.getActionChain().keySet()); + orderedStages.sort(Integer::compareTo); + int totalStages = orderedStages.size(); + if (totalStages < 3) { + return false; + } + int stageIndex = orderedStages.indexOf(executableAction.getExecutingStage()); + if (stageIndex < 0) { + return false; + } + if (stageIndex == totalStages - 1) { + return true; + } + return stageIndex >= 2 && (stageIndex - 2) % 2 == 0; + } + private void ensureExecutableResult(ExecutableAction executableAction, boolean failed, String failureReason) { if (hasExecutableResult(executableAction)) { return; @@ -346,22 +431,17 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { private record MetaActionsListeningRecord(AtomicBoolean accepting, int phase) { } + private record RecognizerTaskRecord(boolean enabled, Future future) { + private static RecognizerTaskRecord disabled() { + return new RecognizerTaskRecord(false, null); + } + } + @SuppressWarnings("InnerClassMayBeStatic") private class AssemblyHelper { private AssemblyHelper() { } - private RepairerInput buildRepairerInput(List historyActionsResults, MetaAction action, String userId) { - RepairerInput input = new RepairerInput(); - MetaActionInfo metaActionInfo = actionCapability.loadMetaActionInfo(action.getKey()); - input.setHistoryActionResults(historyActionsResults); - input.setParams(metaActionInfo.getParams()); - input.setRecentMessages(cognitionCapability.getChatMessages()); - input.setActionDescription(metaActionInfo.getDescription()); - input.setUserId(userId); - return input; - } - private ExtractorInput buildExtractorInput(MetaAction action, String source, List historyActionResults, List additionalContext) { ExtractorInput input = new ExtractorInput(); @@ -373,7 +453,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { return input; } - private CorrectorInput buildCorrectorInput(ExecutableAction executableAction, String source) { + private CorrectorInput buildCorrectorInput(ExecutableAction executableAction) { return CorrectorInput.builder() .tendency(executableAction.getTendency()) .source(executableAction.getSource()) @@ -385,5 +465,36 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { .activatedSlices(memoryCapability.getActivatedSlices()) .build(); } + + private CorrectionRecognizerInput buildRecognizerInput(ExecutableAction executableAction) { + val orderedStages = new ArrayList<>(executableAction.getActionChain().keySet()); + orderedStages.sort(Integer::compareTo); + int currentStageIndex = orderedStages.indexOf(executableAction.getExecutingStage()); + List currentStageMetaActions = executableAction.getActionChain() + .getOrDefault(executableAction.getExecutingStage(), List.of()) + .stream() + .map(metaAction -> CorrectionRecognizerMetaActionSnapshot.builder() + .key(metaAction.getKey()) + .name(metaAction.getName()) + .io(metaAction.getIo()) + .resultStatus(metaAction.getResult().getStatus().name()) + .resultData(metaAction.getResult().getData()) + .build()) + .toList(); + return CorrectionRecognizerInput.builder() + .tendency(executableAction.getTendency()) + .source(executableAction.getSource()) + .reason(executableAction.getReason()) + .description(executableAction.getDescription()) + .history(executableAction.getHistory().get(executableAction.getExecutingStage())) + .currentStageMetaActions(currentStageMetaActions) + .orderedStages(orderedStages) + .currentStage(executableAction.getExecutingStage()) + .currentStageIndex(currentStageIndex) + .lastStage(currentStageIndex == orderedStages.size() - 1) + .recentMessages(cognitionCapability.getChatMessages()) + .activatedSlices(memoryCapability.getActivatedSlices()) + .build(); + } } } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerInput.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerInput.java new file mode 100644 index 00000000..29de1b0e --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerInput.java @@ -0,0 +1,27 @@ +package work.slhaf.partner.module.modules.action.executor.entity; + +import lombok.Builder; +import lombok.Data; +import work.slhaf.partner.api.chat.pojo.Message; +import work.slhaf.partner.core.memory.pojo.ActivatedMemorySlice; + +import java.util.List; + +@Data +@Builder +public class CorrectionRecognizerInput { + private String tendency; + private String source; + private String reason; + private String description; + + private List history; + private List currentStageMetaActions; + private List orderedStages; + private int currentStage; + private int currentStageIndex; + private boolean lastStage; + + private List recentMessages; + private List activatedSlices; +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerMetaActionSnapshot.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerMetaActionSnapshot.java new file mode 100644 index 00000000..a77bbf53 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerMetaActionSnapshot.java @@ -0,0 +1,14 @@ +package work.slhaf.partner.module.modules.action.executor.entity; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class CorrectionRecognizerMetaActionSnapshot { + private String key; + private String name; + private boolean io; + private String resultStatus; + private String resultData; +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerResult.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerResult.java new file mode 100644 index 00000000..e356294e --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/entity/CorrectionRecognizerResult.java @@ -0,0 +1,9 @@ +package work.slhaf.partner.module.modules.action.executor.entity; + +import lombok.Data; + +@Data +public class CorrectionRecognizerResult { + private boolean needCorrection; + private String reason; +} diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutorTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutorTest.java deleted file mode 100644 index ff47c018..00000000 --- a/Partner-Core/src/test/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutorTest.java +++ /dev/null @@ -1,450 +0,0 @@ -package work.slhaf.partner.module.modules.action.dispatcher.executor; - -import com.alibaba.fastjson2.JSONObject; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import org.junit.jupiter.api.*; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import work.slhaf.partner.core.action.ActionCapability; -import work.slhaf.partner.core.action.ActionCore; -import work.slhaf.partner.core.action.entity.*; -import work.slhaf.partner.core.action.runner.RunnerClient; -import work.slhaf.partner.core.cognition.CognitionCapability; -import work.slhaf.partner.core.memory.MemoryCapability; -import work.slhaf.partner.module.modules.action.executor.ActionCorrector; -import work.slhaf.partner.module.modules.action.executor.ActionExecutor; -import work.slhaf.partner.module.modules.action.executor.ActionRepairer; -import work.slhaf.partner.module.modules.action.executor.ParamsExtractor; -import work.slhaf.partner.module.modules.action.executor.entity.CorrectorResult; -import work.slhaf.partner.module.modules.action.executor.entity.ExtractorResult; -import work.slhaf.partner.module.modules.action.executor.entity.RepairerResult; - -import java.util.*; -import java.util.concurrent.*; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; - -/** - * 测试矩阵(与文档一致): - * 1) 单行动-单阶段-单MetaAction成功(已覆盖) - * 2) 多行动并发执行(未覆盖:需并发稳定性/线程调度控制) - * 3) status 非 PREPARE 直接返回(已覆盖) - * 4) 多阶段顺序执行(已覆盖) - * 5) IO 行动使用虚拟线程池(已覆盖) - * 6) extractor 失败 -> repairer OK -> 再成功(已覆盖) - * 7) extractor 失败 -> repairer FAILED(已覆盖) - * 8) extractor 失败 -> repairer ACQUIRE 阻塞后恢复(已覆盖) - * 9) runnerClient.submit 抛异常(未覆盖:需更精细的异常吞吐与线程结束校验) - * 10) paramsExtractor.execute 抛异常(未覆盖:与 #9 类似,需更精细的异常吞吐校验) - * 11) corrector.execute 抛异常导致资源未清理(已标记已知缺陷,@Disabled) - * 12) actionChain 为空导致异常与泄漏(已标记已知缺陷,@Disabled) - * 13) metaActions 为空导致 awaitAdvance 阻塞(未覆盖:更适合集成/压测) - * 17) result 状态不更新导致循环不退出(未覆盖:更适合集成/压测) - * 18) 同 stage 多 metaAction 并发完成顺序不固定(未覆盖:更适合集成/压测) - */ -@SuppressWarnings("unused") -@Slf4j -@ExtendWith(MockitoExtension.class) -class ActionExecutorTest { - - @Mock - ActionCapability actionCapability; - @Mock - MemoryCapability memoryCapability; - @Mock - CognitionCapability cognitionCapability; - @Mock - ParamsExtractor paramsExtractor; - @Mock - ActionRepairer actionRepairer; - @Mock - ActionCorrector actionCorrector; - @Mock - RunnerClient runnerClient; - - @InjectMocks - ActionExecutor actionExecutor; - - @BeforeEach - void setUp() { - lenient().when(cognitionCapability.getChatMessages()).thenReturn(Collections.emptyList()); - lenient().when(memoryCapability.getActivatedSlices()).thenReturn(Collections.emptyList()); - lenient().when(actionCapability.putPhaserRecord(any(Phaser.class), any(ExecutableAction.class))) - .thenAnswer(inv -> new PhaserRecord(inv.getArgument(0), inv.getArgument(1))); - lenient().when(actionCapability.loadMetaActionInfo(anyString())).thenAnswer(inv -> new MetaActionInfo( - false, - null, - Collections.emptyMap(), - "desc", - Set.of(), - Set.of(), - Set.of(), - false, - new JSONObject() - )); - CorrectorResult correctorResult = new CorrectorResult(); - correctorResult.setMetaInterventionList(Collections.emptyList()); - lenient().when(actionCorrector.execute(any())).thenReturn(correctorResult); - lenient().doNothing().when(actionCapability).handleInterventions(any(), any()); - } - - // 场景1:B1 -> B3 -> B4 -> B7(成功) -> B10。目的:验证正常主路径与资源清理。 - @Test - void execute_singleAction_singleStage_success() { - ExecutorService directExecutor = new DirectExecutorService(); - stubExecutors(directExecutor, directExecutor); - - ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - - ExtractorResult extractorResult = new ExtractorResult(); - extractorResult.setOk(true); - when(paramsExtractor.execute(any())).thenReturn(extractorResult); - doAnswer(inv -> { - MetaAction metaAction = inv.getArgument(0); - metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS); - return null; - }).when(runnerClient).submit(any(MetaAction.class)); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - verify(runnerClient, times(1)).submit(any(MetaAction.class)); - verify(actionCapability, times(1)).removePhaserRecord(any(Phaser.class)); - assertEquals(ExecutableAction.Status.SUCCESS, actionData.getStatus()); - assertEquals(1, actionData.getHistory().get(0).size()); - } - - // 场景3:B1 -> B2。目的:验证非 PREPARE 不执行任何子任务。 - @Test - void execute_statusNotPrepare_shouldSkip() { - ExecutorService directExecutor = new DirectExecutorService(); - stubExecutors(directExecutor, directExecutor); - - ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - actionData.setStatus(ExecutableAction.Status.EXECUTING); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - verify(actionCapability, never()).putPhaserRecord(any(Phaser.class), any(ExecutableAction.class)); - verify(runnerClient, never()).submit(any(MetaAction.class)); - } - - // 场景4:B1 -> B3 -> B4(两轮) -> B7(成功) -> B10。目的:验证多阶段顺序执行。 - @Test - void execute_multiStage_success() { - ExecutorService platformExecutor = Executors.newFixedThreadPool(4); - ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); - stubExecutors(platformExecutor, virtualExecutor); - - Map> chain = new HashMap<>(); - chain.put(0, List.of(buildMetaAction("a1", false))); - chain.put(1, List.of(buildMetaAction("a2", false))); - ImmediateExecutableAction actionData = buildActionData(chain); - - ExtractorResult extractorResult = new ExtractorResult(); - extractorResult.setOk(true); - when(paramsExtractor.execute(any())).thenReturn(extractorResult); - doAnswer(inv -> { - MetaAction metaAction = inv.getArgument(0); - metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS); - log.info("metaAction result:{}", metaAction.getResult().getStatus()); - return null; - }).when(runnerClient).submit(any(MetaAction.class)); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - verify(runnerClient, timeout(5000).times(2)).submit(any(MetaAction.class)); - verify(actionCorrector, timeout(5000).times(2)).execute(any()); - assertEquals(2, actionData.getHistory().size()); - assertEquals(ExecutableAction.Status.SUCCESS, actionData.getStatus()); - } - - // 场景5:B4.2。目的:验证 IO 行动使用虚拟线程池。 - @Test - void execute_ioMetaAction_usesVirtualExecutor() { - ExecutorService platformExecutor = Executors.newFixedThreadPool(4); - ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); - stubExecutors(platformExecutor, virtualExecutor); - - ImmediateExecutableAction actionData = buildActionData(singleStageChain(true)); - - ExtractorResult extractorResult = new ExtractorResult(); - extractorResult.setOk(true); - lenient().when(paramsExtractor.execute(any())).thenReturn(extractorResult); - lenient().doAnswer(inv -> { - MetaAction metaAction = inv.getArgument(0); - metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS); - return null; - }).when(runnerClient).submit(any(MetaAction.class)); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - verify(actionCapability, times(1)).getExecutor(ActionCore.ExecutorType.VIRTUAL); - shutdownExecutor(virtualExecutor); - } - - // 场景6:B7.2(失败) -> repairer OK -> B7(成功)。目的:验证修复后成功与上下文追加。 - @Test - void execute_extractorFail_thenRepairOk_thenSuccess() { - ExecutorService platformExecutor = Executors.newFixedThreadPool(4); - ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); - stubExecutors(platformExecutor, virtualExecutor); - - ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - - ExtractorResult fail = new ExtractorResult(); - fail.setOk(false); - ExtractorResult ok = new ExtractorResult(); - ok.setOk(true); - when(paramsExtractor.execute(any())).thenReturn(fail, ok); - - RepairerResult repairerResult = new RepairerResult(); - repairerResult.setStatus(RepairerResult.RepairerStatus.OK); - repairerResult.setFixedData(List.of("fix-1")); - when(actionRepairer.execute(any())).thenReturn(repairerResult); - - doAnswer(inv -> { - MetaAction metaAction = inv.getArgument(0); - metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS); - return null; - }).when(runnerClient).submit(any(MetaAction.class)); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - try { - Thread.sleep(500); - } catch (InterruptedException ignored) { - } - assertEquals(1, actionData.getAdditionalContext().get(0).size()); - verify(runnerClient, timeout(5000).times(1)).submit(any(MetaAction.class)); - } - - // 场景7:B7.2(失败) -> repairer FAILED。目的:验证失败分支不提交外部执行。 - @Test - void execute_extractorFail_thenRepairFailed() { - ExecutorService platformExecutor = Executors.newFixedThreadPool(4); - ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); - stubExecutors(platformExecutor, virtualExecutor); - - ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - - ExtractorResult fail = new ExtractorResult(); - fail.setOk(false); - when(paramsExtractor.execute(any())).thenReturn(fail); - - RepairerResult repairerResult = new RepairerResult(); - repairerResult.setStatus(RepairerResult.RepairerStatus.FAILED); - when(actionRepairer.execute(any())).thenReturn(repairerResult); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - try { - Thread.sleep(500); - } catch (InterruptedException ignored) { - } - MetaAction metaAction = actionData.getActionChain().get(0).getFirst(); - assertEquals(MetaAction.Result.Status.FAILED, metaAction.getResult().getStatus()); - verify(runnerClient, never()).submit(any(MetaAction.class)); - } - - // 场景8:B7.2(ACQUIRE) -> interrupt 阻塞 -> 状态恢复 -> B7(成功)。目的:验证阻塞可恢复且不死锁。 - @Test - @Timeout(3) - void execute_extractorFail_thenAcquire_thenResume() throws Exception { - ExecutorService platformExecutor = Executors.newCachedThreadPool(); - ExecutorService virtualExecutor = Executors.newCachedThreadPool(); - stubExecutors(platformExecutor, virtualExecutor); - - ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - - ExtractorResult fail = new ExtractorResult(); - fail.setOk(false); - ExtractorResult ok = new ExtractorResult(); - ok.setOk(true); - when(paramsExtractor.execute(any())).thenReturn(fail, ok); - - RepairerResult repairerResult = new RepairerResult(); - repairerResult.setStatus(RepairerResult.RepairerStatus.ACQUIRE); - when(actionRepairer.execute(any())).thenReturn(repairerResult); - - doAnswer(inv -> { - MetaAction metaAction = inv.getArgument(0); - metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS); - return null; - }).when(runnerClient).submit(any(MetaAction.class)); - - CountDownLatch doneLatch = new CountDownLatch(1); - doAnswer(inv -> { - doneLatch.countDown(); - return null; - }).when(actionCapability).removePhaserRecord(any(Phaser.class)); - - ExecutorService resumeExecutor = Executors.newSingleThreadExecutor(); - resumeExecutor.execute(() -> { - while (actionData.getStatus() != ExecutableAction.Status.INTERRUPTED) { - try { - Thread.sleep(10); - } catch (InterruptedException ignored) { - } - } - actionData.setStatus(ExecutableAction.Status.EXECUTING); - }); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - assertTrue(doneLatch.await(2, TimeUnit.SECONDS)); - shutdownExecutor(platformExecutor); - shutdownExecutor(virtualExecutor); - shutdownExecutor(resumeExecutor); - } - - // 场景11:B4.4 异常 -> 资源未清理(已知缺陷)。目的:暴露当前行为。 - // @Disabled("known-issue: corrector 抛异常时未清理 phaser 记录") - // @Tag("known-issue") - @Test - void execute_correctorThrows_shouldLeakPhaserRecord() { - ExecutorService platformExecutor = Executors.newCachedThreadPool(); - ExecutorService virtualExecutor = Executors.newCachedThreadPool(); - stubExecutors(platformExecutor, virtualExecutor); - - ImmediateExecutableAction actionData = buildActionData(singleStageChain(false)); - - ExtractorResult ok = new ExtractorResult(); - ok.setOk(true); - lenient().when(paramsExtractor.execute(any())).thenReturn(ok); - lenient().doAnswer(inv -> { - MetaAction metaAction = inv.getArgument(0); - metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS); - return null; - }).when(runnerClient).submit(any(MetaAction.class)); - - lenient().doThrow(new RuntimeException("boom")).when(actionCorrector).execute(any()); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - try { - Thread.sleep(500); - } catch (InterruptedException ignored) { - } - verify(actionCapability).removePhaserRecord(any(Phaser.class)); - } - - // 场景12:B4.1 actionChain 为空导致异常(已知缺陷)。目的:暴露当前行为。 - @Disabled("known-issue: actionChain 为空导致 IndexOutOfBounds 与资源未清理") - @Tag("known-issue") - @Test - void execute_emptyActionChain_shouldFail() { - ExecutorService platformExecutor = Executors.newCachedThreadPool(); - ExecutorService virtualExecutor = Executors.newCachedThreadPool(); - stubExecutors(platformExecutor, virtualExecutor); - - ImmediateExecutableAction actionData = buildActionData(new HashMap<>()); - - actionExecutor.init(); - actionExecutor.execute(actionData); - - try { - Thread.sleep(500); - } catch (InterruptedException ignored) { - } - } - - private void stubExecutors(ExecutorService platformExecutor, ExecutorService virtualExecutor) { - when(actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM)).thenReturn(platformExecutor); - when(actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)).thenReturn(virtualExecutor); - when(actionCapability.runnerClient()).thenReturn(runnerClient); - } - - private ImmediateExecutableAction buildActionData(Map> actionChain) { - val immediateActionData = new ImmediateExecutableAction( - "tendency", - actionChain, - "reason", - "desc", - "source" - ); - immediateActionData.getAdditionalContext().putAll(initAdditionalContext(actionChain)); - return immediateActionData; - } - - private Map> singleStageChain(boolean io) { - Map> chain = new HashMap<>(); - chain.put(0, List.of(buildMetaAction("a1", io))); - return chain; - } - - private MetaAction buildMetaAction(String name, boolean io) { - return new MetaAction( - name, - io, - null, - MetaAction.Type.ORIGIN, - "location" - ); - } - - private Map> initAdditionalContext(Map> actionChain) { - Map> context = new HashMap<>(); - for (Integer stage : actionChain.keySet()) { - context.put(stage, new ArrayList<>()); - } - return context; - } - - private void shutdownExecutor(ExecutorService executor) { - executor.shutdownNow(); - try { - executor.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException ignored) { - } - } - - private static final class DirectExecutorService extends AbstractExecutorService { - private volatile boolean shutdown; - - @Override - public void shutdown() { - shutdown = true; - } - - @Override - public List shutdownNow() { - shutdown = true; - return Collections.emptyList(); - } - - @Override - public boolean isShutdown() { - return shutdown; - } - - @Override - public boolean isTerminated() { - return shutdown; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) { - return true; - } - - @Override - public void execute(Runnable command) { - command.run(); - } - } -}