diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 2f8a3d3d..2aa4f495 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -179,7 +179,7 @@ public class ActionCore implements StateSerializable { return; } // 加锁确保同步 - synchronized (executableAction.getStatus()) { + synchronized (executableAction.getExecutionLock()) { applyInterventions(interventions, executableAction); } } @@ -244,7 +244,10 @@ public class ActionCore implements StateSerializable { if (order < executableAction.getExecutingStage()) return; - executableAction.getActionChain().computeIfAbsent(order, k -> new ArrayList<>()).addAll(actions); + List stageActions = executableAction.getActionChain().computeIfAbsent(order, k -> new ArrayList<>()); + synchronized (stageActions) { + stageActions.addAll(actions); + } } private void handleDelete(ExecutableAction executableAction, int order, List actions) { diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/Action.kt b/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/Action.kt index 5dbd0eb3..896a9c14 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/Action.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/Action.kt @@ -84,6 +84,8 @@ sealed interface Schedulable { sealed class ExecutableAction( override val uuid: String = UUID.randomUUID().toString() ) : Action(uuid) { + val executionLock: Any = Any() + /** * 行动倾向 */ diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinInterventionActionProvider.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinInterventionActionProvider.java index 6c0ea5f5..646a38e9 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinInterventionActionProvider.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/builtin/BuiltinInterventionActionProvider.java @@ -133,27 +133,31 @@ class BuiltinInterventionActionProvider implements BuiltinActionProvider { new CommunicationBlockContent(blockName, source, BlockContent.Urgency.HIGH, CommunicationBlockContent.Projection.SUPPLY) { @Override protected void fillXml(@NotNull Document document, @NotNull Element root) { + appendTextElement(document, root, "state", "Partner needs some help."); appendTextElement(document, root, "action_id", actionId); appendTextElement(document, root, "action_info", actionInfo); appendTextElement(document, root, "demand", demand); } }, - Set.of(ContextBlock.FocusedDomain.ACTION), + Set.of(ContextBlock.FocusedDomain.COMMUNICATION), 10, 10, 20 )); - + ExecutableAction executableAction = null; try { - ExecutableAction executableAction = getExecutableAction(actionId); + executableAction = getExecutableAction(actionId); cognitionCapability.initiateTurn(input, target); boolean normal = executableAction.interrupt(timeout); - return normal ? target + "not answered" : target + "answered"; + return normal ? target + "not resumed execution in time" : target + "answered, looking for related answer in recent-chat-messages"; } catch (Exception e) { return "Error happened while calling turn: " + e.getLocalizedMessage(); } finally { contextWorkspace.expire(blockName, source); + if (executableAction != null) { + executableAction.resume(); + } } }; @@ -279,7 +283,7 @@ class BuiltinInterventionActionProvider implements BuiltinActionProvider { null, Map.of( "id", "The uuid of the Action to be intervened on.", - "type", "Intervention type. Allowed values: APPEND, INSERT, REBUILD, DELETE, CANCEL.", + "type", "Intervention type. Allowed values: APPEND, INSERT, DELETE, CANCEL.", "order", "Action chain order/stage to apply the intervention on.", "actions", "Comma-separated actionKey list to be inserted, appended, rebuilt or deleted. Example: \"builtin::command::execute, builtin::capability::show_memory_slices\"" ), diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java index 8c79e802..74768e86 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java @@ -58,10 +58,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { Set recoveredActions = new HashSet<>(); recoveredActions.addAll(actionCapability.listActions(Action.Status.EXECUTING, null)); - recoveredActions.addAll(actionCapability.listActions(Action.Status.INTERRUPTED, null).stream().map(executableAction -> { - executableAction.setStatus(Action.Status.EXECUTING); - return executableAction; - }).collect(Collectors.toSet())); + recoveredActions.addAll(actionCapability.listActions(Action.Status.INTERRUPTED, null).stream() + .peek(executableAction -> executableAction.setStatus(Action.Status.EXECUTING)) + .collect(Collectors.toSet())); recoveredActions.forEach(this::execute); blockManager.emitActionRecoveredBlock(recoveredActions); } @@ -140,112 +139,137 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { private void handleExecutableAction(ExecutableAction executableAction) { actionCapability.putAction(executableAction); - val source = executableAction.getSource(); - val status = executableAction.getStatus(); - if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) { - return; - } val actionChain = executableAction.getActionChain(); - if (actionChain.isEmpty()) { - executableAction.setStatus(Action.Status.FAILED); - executableAction.setResult("行动链为空"); + val phaser = new Phaser(); + if (!prepareExecutableAction(executableAction, actionChain)) { return; } - normalizeExecutingStage(executableAction, actionChain); - // 注册执行中行动 - val phaser = new Phaser(); - executableAction.setStatus(Action.Status.EXECUTING); blockManager.emitActionLaunchedBlock(executableAction); - // 开始执行 - val stageCursor = new Object() { - int stageCount; - boolean executingStageUpdated = false; - boolean stageCountUpdated = false; - - void init() { - val orderList = new ArrayList<>(actionChain.keySet()); - orderList.sort(Integer::compareTo); - stageCount = orderList.indexOf(executableAction.getExecutingStage()); - update(); - } - - void requestAdvance() { - if (!stageCountUpdated) { - stageCount++; - stageCountUpdated = true; - } - if (stageCount < actionChain.size() && !executingStageUpdated) { - update(); - executingStageUpdated = true; - } - } - - boolean next() { - executingStageUpdated = false; - stageCountUpdated = false; - return stageCount < actionChain.size(); - } - - void update() { - val orderList = new ArrayList<>(actionChain.keySet()); - orderList.sort(Integer::compareTo); - executableAction.setExecutingStage(orderList.get(stageCount)); - } - }; - stageCursor.init(); - do { - if (closed.get()) { + val stageCursor = initStageCursor(executableAction, actionChain); + while (true) { + val stageSelection = selectCurrentStage(executableAction, actionChain); + if (stageSelection.shouldReturn()) { return; } - val metaActions = actionChain.get(executableAction.getExecutingStage()); - val recognizerRecord = startRecognizerIfNeeded(executableAction, phaser); - val listeningRecord = executeAndListening(metaActions, phaser, executableAction, source); - phaser.awaitAdvance(listeningRecord.phase()); - // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 - // 导致新行动的 phaser 投放阶段错乱无法阻塞的场景 - // 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题 - if (closed.get()) { + if (stageSelection.shouldStop()) { + break; + } + val stageExecution = runCurrentStage(executableAction, phaser, stageCursor, stageSelection.metaActions()); + if (stageExecution.closed()) { return; } + if (!applyStageCorrectionAndAdvance(executableAction, stageCursor, stageExecution)) { + break; + } + } + finishExecutableAction(executableAction); + } + + private boolean prepareExecutableAction(ExecutableAction executableAction, Map> actionChain) { + synchronized (executableAction.getExecutionLock()) { + val status = executableAction.getStatus(); + if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) { + return false; + } + if (actionChain.isEmpty()) { + executableAction.setStatus(Action.Status.FAILED); + executableAction.setResult("行动链为空"); + return false; + } + normalizeExecutingStage(executableAction, actionChain); + executableAction.setStatus(Action.Status.EXECUTING); + return true; + } + } + + private StageCursor initStageCursor(ExecutableAction executableAction, Map> actionChain) { + StageCursor stageCursor = new StageCursor(executableAction, actionChain); + synchronized (executableAction.getExecutionLock()) { + stageCursor.init(); + } + return stageCursor; + } + + private StageSelection selectCurrentStage(ExecutableAction executableAction, Map> actionChain) { + synchronized (executableAction.getExecutionLock()) { + if (closed.get()) { + return StageSelection.returnNow(); + } + if (executableAction.getStatus() == Action.Status.FAILED) { + return StageSelection.stop(); + } + return StageSelection.continueWith(actionChain.get(executableAction.getExecutingStage())); + } + } + + private StageExecution runCurrentStage( + ExecutableAction executableAction, + Phaser phaser, + StageCursor stageCursor, + List metaActions + ) { + val recognizerRecord = startRecognizerIfNeeded(executableAction, phaser); + val listeningRecord = executeAndListening(metaActions, phaser, executableAction); + phaser.awaitAdvance(listeningRecord.phase()); + // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 + // 导致新行动的 phaser 投放阶段错乱无法阻塞的场景 + // 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题 + if (closed.get()) { + return StageExecution.closed(recognizerRecord, metaActions); + } + synchronized (executableAction.getExecutionLock()) { synchronized (listeningRecord.accepting()) { listeningRecord.accepting().set(false); // 立即尝试推进,本次推进中,如果前方仍有未执行 stage,将执行一次阶段推进 stageCursor.requestAdvance(); } + } - blockManager.emitActionStageSettledBlock(executableAction); + blockManager.emitActionStageSettledBlock(executableAction); + return StageExecution.completed(recognizerRecord, metaActions); + } - boolean hasFailedMetaAction = hasFailedMetaAction(metaActions); - boolean shouldRunCorrector = hasFailedMetaAction; - if (!shouldRunCorrector) { - val recognizerResult = resolveRecognizerResult(recognizerRecord); - shouldRunCorrector = recognizerResult != null && recognizerResult.isNeedCorrection(); - } - if (shouldRunCorrector) { - val correctorInput = assemblyHelper.buildCorrectorInput(executableAction); - actionCorrector.execute(correctorInput) - .onSuccess(correctorResult -> { - actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction); - blockManager.emitActionCorrectionBlock( - executableAction, - hasFailedMetaAction ? "has_failed_meta_action" : correctorResult.getCorrectionReason(), - correctorResult.getMetaInterventionList() - ); - }); - } - // 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时,但 corrector 执行期间发生了 actionChain 的插入事件 - // 如果第一次已经推进完毕,本次将会跳过 + private boolean applyStageCorrectionAndAdvance( + ExecutableAction executableAction, + StageCursor stageCursor, + StageExecution stageExecution + ) { + boolean hasFailedMetaAction = hasFailedMetaAction(stageExecution.metaActions()); + boolean shouldRunCorrector = hasFailedMetaAction; + if (!shouldRunCorrector) { + val recognizerResult = resolveRecognizerResult(stageExecution.recognizerRecord()); + shouldRunCorrector = recognizerResult != null && recognizerResult.isNeedCorrection(); + } + if (shouldRunCorrector) { + val correctorInput = assemblyHelper.buildCorrectorInput(executableAction); + actionCorrector.execute(correctorInput) + .onSuccess(correctorResult -> { + actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction); + blockManager.emitActionCorrectionBlock( + executableAction, + hasFailedMetaAction ? "has_failed_meta_action" : correctorResult.getCorrectionReason(), + correctorResult.getMetaInterventionList() + ); + }); + } + // 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时,但 corrector 执行期间发生了 actionChain 的插入事件 + // 如果第一次已经推进完毕,本次将会跳过 + synchronized (executableAction.getExecutionLock()) { stageCursor.requestAdvance(); - } while (stageCursor.next()); + return stageCursor.next(); + } + } + + private void finishExecutableAction(ExecutableAction executableAction) { // 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果 if (executableAction instanceof SchedulableExecutableAction scheduledActionData) { scheduledActionData.recordAndReset(); } } - private MetaActionsListeningRecord executeAndListening(List metaActions, Phaser phaser, ExecutableAction executableAction, String source) { + private MetaActionsListeningRecord executeAndListening(List metaActions, Phaser phaser, ExecutableAction executableAction) { AtomicBoolean accepting = new AtomicBoolean(true); AtomicInteger cursor = new AtomicInteger(); CountDownLatch latch = new CountDownLatch(1); @@ -303,7 +327,6 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { private void executeMetaActionWithRetry(MetaAction metaAction, ExecutableAction actionData) { AtomicReference failureReason = new AtomicReference<>("参数提取失败"); - val actionKey = metaAction.getKey(); int executingStage = actionData.getExecutingStage(); boolean succeeded = false; for (int attempt = 1; attempt <= MAX_EXTRACTOR_ATTEMPTS; attempt++) { @@ -319,9 +342,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { } ExtractorInput extractorInput = extractorInputResult.getOrThrow(); - Result extractorResultWrapped = paramsExtractor.execute(extractorInput).onFailure(exp -> { - failureReason.set(exp.getLocalizedMessage()); - }); + Result extractorResultWrapped = paramsExtractor.execute(extractorInput).onFailure(exp -> failureReason.set(exp.getLocalizedMessage())); if (extractorResultWrapped.exceptionOrNull() != null) { continue; } @@ -545,6 +566,44 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { ); } + private enum StageSelectionType { + CONTINUE, + STOP, + RETURN + } + + private record StageSelection(StageSelectionType type, List metaActions) { + private static StageSelection continueWith(List metaActions) { + return new StageSelection(StageSelectionType.CONTINUE, metaActions); + } + + private static StageSelection stop() { + return new StageSelection(StageSelectionType.STOP, null); + } + + private static StageSelection returnNow() { + return new StageSelection(StageSelectionType.RETURN, null); + } + + private boolean shouldStop() { + return type == StageSelectionType.STOP; + } + + private boolean shouldReturn() { + return type == StageSelectionType.RETURN; + } + } + + private record StageExecution(RecognizerTaskRecord recognizerRecord, List metaActions, boolean closed) { + private static StageExecution completed(RecognizerTaskRecord recognizerRecord, List metaActions) { + return new StageExecution(recognizerRecord, metaActions, false); + } + + private static StageExecution closed(RecognizerTaskRecord recognizerRecord, List metaActions) { + return new StageExecution(recognizerRecord, metaActions, true); + } + } + private record MetaActionsListeningRecord(AtomicBoolean accepting, int phase) { } @@ -554,6 +613,50 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { } } + private static final class StageCursor { + private final ExecutableAction executableAction; + private final Map> actionChain; + + private int stageCount; + private boolean executingStageUpdated; + private boolean stageCountUpdated; + + private StageCursor(ExecutableAction executableAction, Map> actionChain) { + this.executableAction = executableAction; + this.actionChain = actionChain; + } + + private void init() { + val orderList = new ArrayList<>(actionChain.keySet()); + orderList.sort(Integer::compareTo); + stageCount = orderList.indexOf(executableAction.getExecutingStage()); + update(); + } + + private void requestAdvance() { + if (!stageCountUpdated) { + stageCount++; + stageCountUpdated = true; + } + if (stageCount < actionChain.size() && !executingStageUpdated) { + update(); + executingStageUpdated = true; + } + } + + private boolean next() { + executingStageUpdated = false; + stageCountUpdated = false; + return stageCount < actionChain.size(); + } + + private void update() { + val orderList = new ArrayList<>(actionChain.keySet()); + orderList.sort(Integer::compareTo); + executableAction.setExecutingStage(orderList.get(stageCount)); + } + } + @SuppressWarnings("InnerClassMayBeStatic") private class AssemblyHelper { private AssemblyHelper() {