refactor(action): use synchronized lock to prevent concurrent problems in executable actions executing

This commit is contained in:
2026-04-17 18:05:04 +08:00
parent 3fd90c0f5b
commit 96301dc64a
4 changed files with 209 additions and 97 deletions

View File

@@ -179,7 +179,7 @@ public class ActionCore implements StateSerializable {
return;
}
// 加锁确保同步
synchronized (executableAction.getStatus()) {
synchronized (executableAction.getExecutionLock()) {
applyInterventions(interventions, executableAction);
}
}
@@ -244,7 +244,10 @@ public class ActionCore implements StateSerializable {
if (order < executableAction.getExecutingStage())
return;
executableAction.getActionChain().computeIfAbsent(order, k -> new ArrayList<>()).addAll(actions);
List<MetaAction> stageActions = executableAction.getActionChain().computeIfAbsent(order, k -> new ArrayList<>());
synchronized (stageActions) {
stageActions.addAll(actions);
}
}
private void handleDelete(ExecutableAction executableAction, int order, List<MetaAction> actions) {

View File

@@ -84,6 +84,8 @@ sealed interface Schedulable {
sealed class ExecutableAction(
override val uuid: String = UUID.randomUUID().toString()
) : Action(uuid) {
val executionLock: Any = Any()
/**
* 行动倾向
*/

View File

@@ -133,27 +133,31 @@ class BuiltinInterventionActionProvider implements BuiltinActionProvider {
new CommunicationBlockContent(blockName, source, BlockContent.Urgency.HIGH, CommunicationBlockContent.Projection.SUPPLY) {
@Override
protected void fillXml(@NotNull Document document, @NotNull Element root) {
appendTextElement(document, root, "state", "Partner needs some help.");
appendTextElement(document, root, "action_id", actionId);
appendTextElement(document, root, "action_info", actionInfo);
appendTextElement(document, root, "demand", demand);
}
},
Set.of(ContextBlock.FocusedDomain.ACTION),
Set.of(ContextBlock.FocusedDomain.COMMUNICATION),
10,
10,
20
));
ExecutableAction executableAction = null;
try {
ExecutableAction executableAction = getExecutableAction(actionId);
executableAction = getExecutableAction(actionId);
cognitionCapability.initiateTurn(input, target);
boolean normal = executableAction.interrupt(timeout);
return normal ? target + "not answered" : target + "answered";
return normal ? target + "not resumed execution in time" : target + "answered, looking for related answer in recent-chat-messages";
} catch (Exception e) {
return "Error happened while calling turn: " + e.getLocalizedMessage();
} finally {
contextWorkspace.expire(blockName, source);
if (executableAction != null) {
executableAction.resume();
}
}
};
@@ -279,7 +283,7 @@ class BuiltinInterventionActionProvider implements BuiltinActionProvider {
null,
Map.of(
"id", "The uuid of the Action to be intervened on.",
"type", "Intervention type. Allowed values: APPEND, INSERT, REBUILD, DELETE, CANCEL.",
"type", "Intervention type. Allowed values: APPEND, INSERT, DELETE, CANCEL.",
"order", "Action chain order/stage to apply the intervention on.",
"actions", "Comma-separated actionKey list to be inserted, appended, rebuilt or deleted. Example: \"builtin::command::execute, builtin::capability::show_memory_slices\""
),

View File

@@ -58,10 +58,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
Set<ExecutableAction> recoveredActions = new HashSet<>();
recoveredActions.addAll(actionCapability.listActions(Action.Status.EXECUTING, null));
recoveredActions.addAll(actionCapability.listActions(Action.Status.INTERRUPTED, null).stream().map(executableAction -> {
executableAction.setStatus(Action.Status.EXECUTING);
return executableAction;
}).collect(Collectors.toSet()));
recoveredActions.addAll(actionCapability.listActions(Action.Status.INTERRUPTED, null).stream()
.peek(executableAction -> executableAction.setStatus(Action.Status.EXECUTING))
.collect(Collectors.toSet()));
recoveredActions.forEach(this::execute);
blockManager.emitActionRecoveredBlock(recoveredActions);
}
@@ -140,112 +139,137 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
private void handleExecutableAction(ExecutableAction executableAction) {
actionCapability.putAction(executableAction);
val source = executableAction.getSource();
val status = executableAction.getStatus();
if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) {
return;
}
val actionChain = executableAction.getActionChain();
if (actionChain.isEmpty()) {
executableAction.setStatus(Action.Status.FAILED);
executableAction.setResult("行动链为空");
val phaser = new Phaser();
if (!prepareExecutableAction(executableAction, actionChain)) {
return;
}
normalizeExecutingStage(executableAction, actionChain);
// 注册执行中行动
val phaser = new Phaser();
executableAction.setStatus(Action.Status.EXECUTING);
blockManager.emitActionLaunchedBlock(executableAction);
// 开始执行
val stageCursor = new Object() {
int stageCount;
boolean executingStageUpdated = false;
boolean stageCountUpdated = false;
void init() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
stageCount = orderList.indexOf(executableAction.getExecutingStage());
update();
}
void requestAdvance() {
if (!stageCountUpdated) {
stageCount++;
stageCountUpdated = true;
}
if (stageCount < actionChain.size() && !executingStageUpdated) {
update();
executingStageUpdated = true;
}
}
boolean next() {
executingStageUpdated = false;
stageCountUpdated = false;
return stageCount < actionChain.size();
}
void update() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
executableAction.setExecutingStage(orderList.get(stageCount));
}
};
stageCursor.init();
do {
if (closed.get()) {
val stageCursor = initStageCursor(executableAction, actionChain);
while (true) {
val stageSelection = selectCurrentStage(executableAction, actionChain);
if (stageSelection.shouldReturn()) {
return;
}
val metaActions = actionChain.get(executableAction.getExecutingStage());
val recognizerRecord = startRecognizerIfNeeded(executableAction, phaser);
val listeningRecord = executeAndListening(metaActions, phaser, executableAction, source);
phaser.awaitAdvance(listeningRecord.phase());
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
// 导致新行动的 phaser 投放阶段错乱无法阻塞的场景
// 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题
if (closed.get()) {
if (stageSelection.shouldStop()) {
break;
}
val stageExecution = runCurrentStage(executableAction, phaser, stageCursor, stageSelection.metaActions());
if (stageExecution.closed()) {
return;
}
if (!applyStageCorrectionAndAdvance(executableAction, stageCursor, stageExecution)) {
break;
}
}
finishExecutableAction(executableAction);
}
private boolean prepareExecutableAction(ExecutableAction executableAction, Map<Integer, List<MetaAction>> actionChain) {
synchronized (executableAction.getExecutionLock()) {
val status = executableAction.getStatus();
if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) {
return false;
}
if (actionChain.isEmpty()) {
executableAction.setStatus(Action.Status.FAILED);
executableAction.setResult("行动链为空");
return false;
}
normalizeExecutingStage(executableAction, actionChain);
executableAction.setStatus(Action.Status.EXECUTING);
return true;
}
}
private StageCursor initStageCursor(ExecutableAction executableAction, Map<Integer, List<MetaAction>> actionChain) {
StageCursor stageCursor = new StageCursor(executableAction, actionChain);
synchronized (executableAction.getExecutionLock()) {
stageCursor.init();
}
return stageCursor;
}
private StageSelection selectCurrentStage(ExecutableAction executableAction, Map<Integer, List<MetaAction>> actionChain) {
synchronized (executableAction.getExecutionLock()) {
if (closed.get()) {
return StageSelection.returnNow();
}
if (executableAction.getStatus() == Action.Status.FAILED) {
return StageSelection.stop();
}
return StageSelection.continueWith(actionChain.get(executableAction.getExecutingStage()));
}
}
private StageExecution runCurrentStage(
ExecutableAction executableAction,
Phaser phaser,
StageCursor stageCursor,
List<MetaAction> metaActions
) {
val recognizerRecord = startRecognizerIfNeeded(executableAction, phaser);
val listeningRecord = executeAndListening(metaActions, phaser, executableAction);
phaser.awaitAdvance(listeningRecord.phase());
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
// 导致新行动的 phaser 投放阶段错乱无法阻塞的场景
// 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题
if (closed.get()) {
return StageExecution.closed(recognizerRecord, metaActions);
}
synchronized (executableAction.getExecutionLock()) {
synchronized (listeningRecord.accepting()) {
listeningRecord.accepting().set(false);
// 立即尝试推进,本次推进中,如果前方仍有未执行 stage将执行一次阶段推进
stageCursor.requestAdvance();
}
}
blockManager.emitActionStageSettledBlock(executableAction);
blockManager.emitActionStageSettledBlock(executableAction);
return StageExecution.completed(recognizerRecord, metaActions);
}
boolean hasFailedMetaAction = hasFailedMetaAction(metaActions);
boolean shouldRunCorrector = hasFailedMetaAction;
if (!shouldRunCorrector) {
val recognizerResult = resolveRecognizerResult(recognizerRecord);
shouldRunCorrector = recognizerResult != null && recognizerResult.isNeedCorrection();
}
if (shouldRunCorrector) {
val correctorInput = assemblyHelper.buildCorrectorInput(executableAction);
actionCorrector.execute(correctorInput)
.onSuccess(correctorResult -> {
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction);
blockManager.emitActionCorrectionBlock(
executableAction,
hasFailedMetaAction ? "has_failed_meta_action" : correctorResult.getCorrectionReason(),
correctorResult.getMetaInterventionList()
);
});
}
// 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时但 corrector 执行期间发生了 actionChain 的插入事件
// 如果第一次已经推进完毕,本次将会跳过
private boolean applyStageCorrectionAndAdvance(
ExecutableAction executableAction,
StageCursor stageCursor,
StageExecution stageExecution
) {
boolean hasFailedMetaAction = hasFailedMetaAction(stageExecution.metaActions());
boolean shouldRunCorrector = hasFailedMetaAction;
if (!shouldRunCorrector) {
val recognizerResult = resolveRecognizerResult(stageExecution.recognizerRecord());
shouldRunCorrector = recognizerResult != null && recognizerResult.isNeedCorrection();
}
if (shouldRunCorrector) {
val correctorInput = assemblyHelper.buildCorrectorInput(executableAction);
actionCorrector.execute(correctorInput)
.onSuccess(correctorResult -> {
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction);
blockManager.emitActionCorrectionBlock(
executableAction,
hasFailedMetaAction ? "has_failed_meta_action" : correctorResult.getCorrectionReason(),
correctorResult.getMetaInterventionList()
);
});
}
// 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时但 corrector 执行期间发生了 actionChain 的插入事件
// 如果第一次已经推进完毕,本次将会跳过
synchronized (executableAction.getExecutionLock()) {
stageCursor.requestAdvance();
} while (stageCursor.next());
return stageCursor.next();
}
}
private void finishExecutableAction(ExecutableAction executableAction) {
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
if (executableAction instanceof SchedulableExecutableAction scheduledActionData) {
scheduledActionData.recordAndReset();
}
}
private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, Phaser phaser, ExecutableAction executableAction, String source) {
private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, Phaser phaser, ExecutableAction executableAction) {
AtomicBoolean accepting = new AtomicBoolean(true);
AtomicInteger cursor = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
@@ -303,7 +327,6 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
private void executeMetaActionWithRetry(MetaAction metaAction, ExecutableAction actionData) {
AtomicReference<String> failureReason = new AtomicReference<>("参数提取失败");
val actionKey = metaAction.getKey();
int executingStage = actionData.getExecutingStage();
boolean succeeded = false;
for (int attempt = 1; attempt <= MAX_EXTRACTOR_ATTEMPTS; attempt++) {
@@ -319,9 +342,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
}
ExtractorInput extractorInput = extractorInputResult.getOrThrow();
Result<ExtractorResult> extractorResultWrapped = paramsExtractor.execute(extractorInput).onFailure(exp -> {
failureReason.set(exp.getLocalizedMessage());
});
Result<ExtractorResult> extractorResultWrapped = paramsExtractor.execute(extractorInput).onFailure(exp -> failureReason.set(exp.getLocalizedMessage()));
if (extractorResultWrapped.exceptionOrNull() != null) {
continue;
}
@@ -545,6 +566,44 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
);
}
private enum StageSelectionType {
CONTINUE,
STOP,
RETURN
}
private record StageSelection(StageSelectionType type, List<MetaAction> metaActions) {
private static StageSelection continueWith(List<MetaAction> metaActions) {
return new StageSelection(StageSelectionType.CONTINUE, metaActions);
}
private static StageSelection stop() {
return new StageSelection(StageSelectionType.STOP, null);
}
private static StageSelection returnNow() {
return new StageSelection(StageSelectionType.RETURN, null);
}
private boolean shouldStop() {
return type == StageSelectionType.STOP;
}
private boolean shouldReturn() {
return type == StageSelectionType.RETURN;
}
}
private record StageExecution(RecognizerTaskRecord recognizerRecord, List<MetaAction> metaActions, boolean closed) {
private static StageExecution completed(RecognizerTaskRecord recognizerRecord, List<MetaAction> metaActions) {
return new StageExecution(recognizerRecord, metaActions, false);
}
private static StageExecution closed(RecognizerTaskRecord recognizerRecord, List<MetaAction> metaActions) {
return new StageExecution(recognizerRecord, metaActions, true);
}
}
private record MetaActionsListeningRecord(AtomicBoolean accepting, int phase) {
}
@@ -554,6 +613,50 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
}
}
private static final class StageCursor {
private final ExecutableAction executableAction;
private final Map<Integer, List<MetaAction>> actionChain;
private int stageCount;
private boolean executingStageUpdated;
private boolean stageCountUpdated;
private StageCursor(ExecutableAction executableAction, Map<Integer, List<MetaAction>> actionChain) {
this.executableAction = executableAction;
this.actionChain = actionChain;
}
private void init() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
stageCount = orderList.indexOf(executableAction.getExecutingStage());
update();
}
private void requestAdvance() {
if (!stageCountUpdated) {
stageCount++;
stageCountUpdated = true;
}
if (stageCount < actionChain.size() && !executingStageUpdated) {
update();
executingStageUpdated = true;
}
}
private boolean next() {
executingStageUpdated = false;
stageCountUpdated = false;
return stageCount < actionChain.size();
}
private void update() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
executableAction.setExecutingStage(orderList.get(stageCount));
}
}
@SuppressWarnings("InnerClassMayBeStatic")
private class AssemblyHelper {
private AssemblyHelper() {