refactor(ActionExecutor): update ActionChain execution, support executing and advancing correctly

This commit is contained in:
2026-02-04 00:29:42 +08:00
parent b0bb40c5f0
commit 80d7c283c5
3 changed files with 127 additions and 51 deletions

View File

@@ -278,12 +278,12 @@ public class ActionCore extends PartnerCore<ActionCore> {
} }
// 加锁确保同步 // 加锁确保同步
synchronized (actionData) { synchronized (actionData.getStatus()) {
applyInterventions(interventions, actionData, phaser); applyInterventions(interventions, actionData);
} }
} }
private void applyInterventions(List<MetaIntervention> interventions, ActionData actionData, Phaser phaser) { private void applyInterventions(List<MetaIntervention> interventions, ActionData actionData) {
boolean rebuildCleanTag = false; boolean rebuildCleanTag = false;
interventions.sort(Comparator.comparingInt(MetaIntervention::getOrder)); interventions.sort(Comparator.comparingInt(MetaIntervention::getOrder));
@@ -296,7 +296,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
switch (intervention.getType()) { switch (intervention.getType()) {
case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions); case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions);
case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions, phaser); case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions);
case InterventionType.DELETE -> handleDelete(actionData, intervention.getOrder(), actions); case InterventionType.DELETE -> handleDelete(actionData, intervention.getOrder(), actions);
case InterventionType.CANCEL -> handleCancel(actionData); case InterventionType.CANCEL -> handleCancel(actionData);
case InterventionType.REBUILD -> { case InterventionType.REBUILD -> {
@@ -322,38 +322,13 @@ public class ActionCore extends PartnerCore<ActionCore> {
} }
/** /**
* 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动, 如果插入位置正处于执行阶段, 则启动执行线程, 通过 Phaser 确保同步 * 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动
*/ */
private void handleInsert(ActionData actionData, int order, List<MetaAction> actions, Phaser phaser) { private void handleInsert(ActionData actionData, int order, List<MetaAction> actions) {
if (order < actionData.getExecutingStage()) if (order < actionData.getExecutingStage())
return; return;
phaser.register(); actionData.getActionChain().computeIfAbsent(order, k -> new ArrayList<>()).addAll(actions);
try {
Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
actionChain.put(order, actions);
if (order == actionData.getExecutingStage()) {
ExecutorService virtualExecutor = this.getExecutor(ExecutorType.VIRTUAL);
ExecutorService platformExecutor = this.getExecutor(ExecutorType.PLATFORM);
ExecutorService executor;
phaser.bulkRegister(actions.size());
for (MetaAction action : actions) {
executor = action.isIo() ? virtualExecutor : platformExecutor;
executor.execute(() -> {
try {
runnerClient.submit(action);
} finally {
phaser.arriveAndDeregister();
}
});
}
}
} finally {
phaser.arriveAndDeregister();
}
} }
private void handleDelete(ActionData actionData, int order, List<MetaAction> actions) { private void handleDelete(ActionData actionData, int order, List<MetaAction> actions) {

View File

@@ -18,8 +18,12 @@ import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
@AgentSubModule @AgentSubModule
@@ -70,35 +74,77 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
} }
// 注册执行中行动 // 注册执行中行动
val phaser = new Phaser(); val phaser = new Phaser();
phaser.register();
val phaserRecord = actionCapability.putPhaserRecord(phaser, actionData); val phaserRecord = actionCapability.putPhaserRecord(phaser, actionData);
actionData.setStatus(ActionData.ActionStatus.EXECUTING); actionData.setStatus(ActionData.ActionStatus.EXECUTING);
// 开始执行 // 开始执行
val actionChain = actionData.getActionChain(); val stageCursor = new Object() {
int stageCount = 0; final Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
do { int stageCount;
val orderList = new ArrayList<>(actionChain.keySet()); boolean executingStageUpdated;
orderList.sort(Integer::compareTo); boolean stageCountUpdated;
val executingStage = orderList.get(stageCount);
actionData.setExecutingStage(executingStage);
val metaActions = actionChain.get(executingStage); void init() {
val phase = phaser.bulkRegister(metaActions.size()); stageCount = 0;
for (MetaAction metaAction : metaActions) { executingStageUpdated = false;
val executor = metaAction.isIo() ? virtualExecutor : platformExecutor; stageCountUpdated = false;
executor.execute(buildMataActionTask(metaAction, phaserRecord, userId)); update();
}
void requestAdvance() {
if (!stageCountUpdated) {
stageCount++;
stageCountUpdated = true;
}
if (stageCount < actionChain.size() && !executingStageUpdated) {
update();
executingStageUpdated = true;
}
}
boolean next() {
executingStageUpdated = false;
stageCountUpdated = false;
return stageCount < actionChain.size();
}
void update() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
actionData.setExecutingStage(orderList.get(stageCount));
}
};
stageCursor.init();
do {
val actionChain = actionData.getActionChain();
val metaActions = actionChain.get(actionData.getExecutingStage());
val listeningRecord = executeAndListening(metaActions, phaserRecord, userId);
phaser.awaitAdvance(listeningRecord.phase());
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
// 导致新行动的 phaser 投放阶段错乱无法阻塞的场景
// 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题
synchronized (listeningRecord.accepting()) {
listeningRecord.accepting().set(false);
// 立即尝试推进,本次推进中,如果前方仍有未执行 stage将执行一次阶段推进
stageCursor.requestAdvance();
} }
phaser.awaitAdvance(phase);
// 针对行动链进行修正,修正需要传入执行历史、行动目标等内容 // 针对行动链进行修正,修正需要传入执行历史、行动目标等内容
val correctorInput = assemblyHelper.buildCorrectorInput(actionData, userId); val correctorInput = assemblyHelper.buildCorrectorInput(actionData, userId);
val correctorResult = actionCorrector.execute(correctorInput); val correctorResult = actionCorrector.execute(correctorInput);
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), phaserRecord); actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), actionData);
} while (actionChain.size() > ++stageCount);
// 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时但 corrector 执行期间发生了 actionChain 的插入事件
// 如果第一次已经推进完毕,本次将会跳过
stageCursor.requestAdvance();
} while (stageCursor.next());
// 结束 // 结束
phaser.arriveAndDeregister();
actionCapability.removePhaserRecord(phaser); actionCapability.removePhaserRecord(phaser);
if (actionData.getStatus() != ActionData.ActionStatus.FAILED) { if (actionData.getStatus() != ActionData.ActionStatus.FAILED) {
actionData.setStatus(ActionStatus.SUCCESS); actionData.setStatus(ActionStatus.SUCCESS);
@@ -110,7 +156,58 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
} }
private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, PhaserRecord phaserRecord, String userId) {
AtomicBoolean accepting = new AtomicBoolean(true);
AtomicInteger cursor = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
val phaser = phaserRecord.phaser();
val phase = phaser.register();
platformExecutor.execute(() -> {
boolean first = true;
while (accepting.get()) {
synchronized (accepting) {
MetaAction next = null;
synchronized (metaActions) {
if (cursor.get() < metaActions.size()) {
next = metaActions.get(cursor.getAndIncrement());
}
}
if (next == null) {
Thread.onSpinWait();
continue;
}
if (phaser.getPhase() != phase) {
metaActions.remove(next);
log.warn("行动阶段已推进,丢弃该行动: {}", next);
continue;
}
ExecutorService executor = next.isIo() ? virtualExecutor : platformExecutor;
executor.execute(buildMataActionTask(next, phaserRecord, userId));
if (first) {
phaser.arriveAndDeregister();
latch.countDown();
first = false;
}
}
}
});
try {
// 确保执行一次,防止没来得及注册任务就已经结束
latch.await();
} catch (InterruptedException ignored) {
}
return new MetaActionsListeningRecord(accepting, phase);
}
private Runnable buildMataActionTask(MetaAction metaAction, PhaserRecord phaserRecord, String userId) { private Runnable buildMataActionTask(MetaAction metaAction, PhaserRecord phaserRecord, String userId) {
val phaser = phaserRecord.phaser();
phaser.register();
return () -> { return () -> {
val actionKey = metaAction.getKey(); val actionKey = metaAction.getKey();
try { try {
@@ -158,11 +255,14 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
} catch (Exception e) { } catch (Exception e) {
log.error("Action executing failed: {}", actionKey, e); log.error("Action executing failed: {}", actionKey, e);
} finally { } finally {
phaserRecord.phaser().arriveAndDeregister(); phaser.arriveAndDeregister();
} }
}; };
} }
private record MetaActionsListeningRecord(AtomicBoolean accepting, int phase) {
}
@SuppressWarnings("InnerClassMayBeStatic") @SuppressWarnings("InnerClassMayBeStatic")
private class AssemblyHelper { private class AssemblyHelper {

View File

@@ -9,6 +9,7 @@ import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore; import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.module.common.module.PreRunningModule; import work.slhaf.partner.module.common.module.PreRunningModule;
@@ -94,7 +95,7 @@ public class ActionInterventor extends PreRunningModule implements ActivateModel
} }
private void handleInterventions(List<EvaluatedInterventionData> interventionDataList, Map<String, ?> interventionDataMap) { private void handleInterventions(List<EvaluatedInterventionData> interventionDataList, Map<String, ActionData> interventionDataMap) {
val executor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM); val executor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
executor.execute(() -> { executor.execute(() -> {
for (EvaluatedInterventionData interventionData : interventionDataList) { for (EvaluatedInterventionData interventionData : interventionDataList) {