diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 22baffa4..33abe702 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -278,12 +278,12 @@ public class ActionCore extends PartnerCore { } // 加锁确保同步 - synchronized (actionData) { - applyInterventions(interventions, actionData, phaser); + synchronized (actionData.getStatus()) { + applyInterventions(interventions, actionData); } } - private void applyInterventions(List interventions, ActionData actionData, Phaser phaser) { + private void applyInterventions(List interventions, ActionData actionData) { boolean rebuildCleanTag = false; interventions.sort(Comparator.comparingInt(MetaIntervention::getOrder)); @@ -296,7 +296,7 @@ public class ActionCore extends PartnerCore { switch (intervention.getType()) { case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions); - case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions, phaser); + case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions); case InterventionType.DELETE -> handleDelete(actionData, intervention.getOrder(), actions); case InterventionType.CANCEL -> handleCancel(actionData); case InterventionType.REBUILD -> { @@ -322,38 +322,13 @@ public class ActionCore extends PartnerCore { } /** - * 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动, 如果插入位置正处于执行阶段, 则启动执行线程, 通过 Phaser 确保同步 + * 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动 */ - private void handleInsert(ActionData actionData, int order, List actions, Phaser phaser) { + private void handleInsert(ActionData actionData, int order, List actions) { if (order < actionData.getExecutingStage()) return; - phaser.register(); - try { - Map> actionChain = actionData.getActionChain(); - actionChain.put(order, actions); - - if (order == actionData.getExecutingStage()) { - ExecutorService virtualExecutor = this.getExecutor(ExecutorType.VIRTUAL); - ExecutorService platformExecutor = this.getExecutor(ExecutorType.PLATFORM); - ExecutorService executor; - phaser.bulkRegister(actions.size()); - - for (MetaAction action : actions) { - executor = action.isIo() ? virtualExecutor : platformExecutor; - executor.execute(() -> { - try { - runnerClient.submit(action); - } finally { - phaser.arriveAndDeregister(); - } - }); - } - - } - } finally { - phaser.arriveAndDeregister(); - } + actionData.getActionChain().computeIfAbsent(order, k -> new ArrayList<>()).addAll(actions); } private void handleDelete(ActionData actionData, int order, List actions) { diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java index bdd591d1..02046471 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java @@ -18,8 +18,12 @@ import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.*; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @AgentSubModule @@ -70,35 +74,77 @@ public class ActionExecutor extends AgentRunningSubModule(actionChain.keySet()); - orderList.sort(Integer::compareTo); - val executingStage = orderList.get(stageCount); - actionData.setExecutingStage(executingStage); + val stageCursor = new Object() { + final Map> actionChain = actionData.getActionChain(); + int stageCount; + boolean executingStageUpdated; + boolean stageCountUpdated; - val metaActions = actionChain.get(executingStage); - val phase = phaser.bulkRegister(metaActions.size()); - for (MetaAction metaAction : metaActions) { - val executor = metaAction.isIo() ? virtualExecutor : platformExecutor; - executor.execute(buildMataActionTask(metaAction, phaserRecord, userId)); + void init() { + stageCount = 0; + executingStageUpdated = false; + stageCountUpdated = false; + update(); + } + + void requestAdvance() { + if (!stageCountUpdated) { + stageCount++; + stageCountUpdated = true; + } + + if (stageCount < actionChain.size() && !executingStageUpdated) { + update(); + executingStageUpdated = true; + } + } + + boolean next() { + executingStageUpdated = false; + stageCountUpdated = false; + return stageCount < actionChain.size(); + } + + void update() { + val orderList = new ArrayList<>(actionChain.keySet()); + orderList.sort(Integer::compareTo); + actionData.setExecutingStage(orderList.get(stageCount)); + } + }; + + stageCursor.init(); + do { + val actionChain = actionData.getActionChain(); + val metaActions = actionChain.get(actionData.getExecutingStage()); + + val listeningRecord = executeAndListening(metaActions, phaserRecord, userId); + phaser.awaitAdvance(listeningRecord.phase()); + + // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 + // 导致新行动的 phaser 投放阶段错乱无法阻塞的场景 + // 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题 + synchronized (listeningRecord.accepting()) { + listeningRecord.accepting().set(false); + + // 立即尝试推进,本次推进中,如果前方仍有未执行 stage,将执行一次阶段推进 + stageCursor.requestAdvance(); } - phaser.awaitAdvance(phase); // 针对行动链进行修正,修正需要传入执行历史、行动目标等内容 val correctorInput = assemblyHelper.buildCorrectorInput(actionData, userId); val correctorResult = actionCorrector.execute(correctorInput); - actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), phaserRecord); - } while (actionChain.size() > ++stageCount); + actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), actionData); + + // 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时,但 corrector 执行期间发生了 actionChain 的插入事件 + // 如果第一次已经推进完毕,本次将会跳过 + stageCursor.requestAdvance(); + } while (stageCursor.next()); // 结束 - phaser.arriveAndDeregister(); actionCapability.removePhaserRecord(phaser); if (actionData.getStatus() != ActionData.ActionStatus.FAILED) { actionData.setStatus(ActionStatus.SUCCESS); @@ -110,7 +156,58 @@ public class ActionExecutor extends AgentRunningSubModule metaActions, PhaserRecord phaserRecord, String userId) { + AtomicBoolean accepting = new AtomicBoolean(true); + AtomicInteger cursor = new AtomicInteger(); + + CountDownLatch latch = new CountDownLatch(1); + val phaser = phaserRecord.phaser(); + val phase = phaser.register(); + platformExecutor.execute(() -> { + boolean first = true; + while (accepting.get()) { + synchronized (accepting) { + MetaAction next = null; + + synchronized (metaActions) { + if (cursor.get() < metaActions.size()) { + next = metaActions.get(cursor.getAndIncrement()); + } + } + + if (next == null) { + Thread.onSpinWait(); + continue; + } + + if (phaser.getPhase() != phase) { + metaActions.remove(next); + log.warn("行动阶段已推进,丢弃该行动: {}", next); + continue; + } + + ExecutorService executor = next.isIo() ? virtualExecutor : platformExecutor; + executor.execute(buildMataActionTask(next, phaserRecord, userId)); + + if (first) { + phaser.arriveAndDeregister(); + latch.countDown(); + first = false; + } + } + } + }); + try { + // 确保执行一次,防止没来得及注册任务就已经结束 + latch.await(); + } catch (InterruptedException ignored) { + } + return new MetaActionsListeningRecord(accepting, phase); + } + private Runnable buildMataActionTask(MetaAction metaAction, PhaserRecord phaserRecord, String userId) { + val phaser = phaserRecord.phaser(); + phaser.register(); return () -> { val actionKey = metaAction.getKey(); try { @@ -158,11 +255,14 @@ public class ActionExecutor extends AgentRunningSubModule interventionDataList, Map interventionDataMap) { + private void handleInterventions(List interventionDataList, Map interventionDataMap) { val executor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM); executor.execute(() -> { for (EvaluatedInterventionData interventionData : interventionDataList) {