From 77eb9b92a434ba88da7deab26cb7a452e651c259 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Fri, 30 Jan 2026 20:10:01 +0800 Subject: [PATCH] refactor(ActionCorrector): move intervention logic from InterventionHandler into ActionCapability --- .../partner/core/action/ActionCapability.java | 3 + .../slhaf/partner/core/action/ActionCore.java | 127 +++++++++++++++ .../handler/InterventionHandler.java | 147 +----------------- 3 files changed, 132 insertions(+), 145 deletions(-) diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java index 743baea0..f7955c14 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCapability.java @@ -8,6 +8,7 @@ import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.PhaserRecord; import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import work.slhaf.partner.core.action.runner.SandboxRunnerClient; +import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; import java.util.List; import java.util.Map; @@ -49,4 +50,6 @@ public interface ActionCapability { boolean checkExists(String... actionKeys); SandboxRunnerClient runnerClient(); + + void handleInterventions(List interventions, T data); } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 9b6397db..22baffa4 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -14,6 +14,8 @@ import work.slhaf.partner.core.action.exception.ActionDataNotFoundException; import work.slhaf.partner.core.action.exception.MetaActionNotFoundException; import work.slhaf.partner.core.action.runner.RunnerClient; import work.slhaf.partner.core.action.runner.SandboxRunnerClient; +import work.slhaf.partner.module.modules.action.interventor.entity.InterventionType; +import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; import java.io.IOException; import java.util.*; @@ -259,6 +261,131 @@ public class ActionCore extends PartnerCore { return runnerClient; } + @CapabilityMethod + public void handleInterventions(List interventions, T data) { + // 加载数据 + Phaser phaser = null; + ActionData actionData = switch (data) { + case PhaserRecord record -> { + phaser = record.phaser(); + yield record.actionData(); + } + case ActionData tempData -> tempData; + default -> null; + }; + if (actionData == null) { + return; + } + + // 加锁确保同步 + synchronized (actionData) { + applyInterventions(interventions, actionData, phaser); + } + } + + private void applyInterventions(List interventions, ActionData actionData, Phaser phaser) { + boolean rebuildCleanTag = false; + + interventions.sort(Comparator.comparingInt(MetaIntervention::getOrder)); + + for (MetaIntervention intervention : interventions) { + List actions = intervention.getActions() + .stream() + .map(this::loadMetaAction) + .toList(); + + switch (intervention.getType()) { + case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions); + case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions, phaser); + case InterventionType.DELETE -> handleDelete(actionData, intervention.getOrder(), actions); + case InterventionType.CANCEL -> handleCancel(actionData); + case InterventionType.REBUILD -> { + if (!rebuildCleanTag) { + cleanActionData(actionData); + rebuildCleanTag = true; + } + handleRebuild(actionData, intervention.getOrder(), actions); + } + } + } + + } + + /** + * 在未进入执行阶段的行动单元组新增新的行动 + */ + private void handleAppend(ActionData actionData, int order, List actions) { + if (order <= actionData.getExecutingStage()) + return; + + actionData.getActionChain().put(order, actions); + } + + /** + * 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动, 如果插入位置正处于执行阶段, 则启动执行线程, 通过 Phaser 确保同步 + */ + private void handleInsert(ActionData actionData, int order, List actions, Phaser phaser) { + if (order < actionData.getExecutingStage()) + return; + + phaser.register(); + try { + Map> actionChain = actionData.getActionChain(); + actionChain.put(order, actions); + + if (order == actionData.getExecutingStage()) { + ExecutorService virtualExecutor = this.getExecutor(ExecutorType.VIRTUAL); + ExecutorService platformExecutor = this.getExecutor(ExecutorType.PLATFORM); + ExecutorService executor; + phaser.bulkRegister(actions.size()); + + for (MetaAction action : actions) { + executor = action.isIo() ? virtualExecutor : platformExecutor; + executor.execute(() -> { + try { + runnerClient.submit(action); + } finally { + phaser.arriveAndDeregister(); + } + }); + } + + } + } finally { + phaser.arriveAndDeregister(); + } + } + + private void handleDelete(ActionData actionData, int order, List actions) { + if (order <= actionData.getExecutingStage()) + return; + + Map> actionChain = actionData.getActionChain(); + if (actionChain.containsKey(order)) { + actionChain.get(order).removeAll(actions); + if (actionChain.get(order).isEmpty()) { + actionChain.remove(order); + } + } + } + + private void handleCancel(ActionData actionData) { + actionData.setStatus(ActionData.ActionStatus.FAILED); + actionData.setResult("行动取消"); + } + + private void handleRebuild(ActionData actionData, int order, List actions) { + Map> actionChain = actionData.getActionChain(); + actionChain.put(order, actions); + } + + private void cleanActionData(ActionData actionData) { + actionData.getActionChain().clear(); + actionData.setExecutingStage(0); + actionData.setStatus(ActionData.ActionStatus.PREPARE); + actionData.getHistory().clear(); + } + /** * 命中缓存且评估通过时 * diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java index 2973de07..dad7402b 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java @@ -3,27 +3,17 @@ package work.slhaf.partner.module.modules.action.interventor.handler; import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; -import work.slhaf.partner.api.agent.factory.module.annotation.Init; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCore.ExecutorType; -import work.slhaf.partner.core.action.entity.ActionData; -import work.slhaf.partner.core.action.entity.ActionData.ActionStatus; -import work.slhaf.partner.core.action.entity.MetaAction; -import work.slhaf.partner.core.action.entity.PhaserRecord; -import work.slhaf.partner.core.action.runner.SandboxRunnerClient; -import work.slhaf.partner.module.modules.action.interventor.entity.InterventionType; import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput; import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.ExecutingInterventionData; import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.InterventionData; import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.PreparedInterventionData; -import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Phaser; @Slf4j @AgentSubModule @@ -32,13 +22,6 @@ public class InterventionHandler extends AgentRunningSubModule interventions = interventionData.getInterventions(); if (interventionData instanceof ExecutingInterventionData data) { - handleInterventions(interventions, data.getRecord()); + actionCapability.handleInterventions(interventions, data.getRecord()); } else if (interventionData instanceof PreparedInterventionData data) { - handleInterventions(interventions, data.getActionData()); + actionCapability.handleInterventions(interventions, data.getActionData()); } }); }); } - - private void handleInterventions(List interventions, T data) { - // 加载数据 - Phaser phaser = null; - ActionData actionData = switch (data) { - case PhaserRecord record -> { - phaser = record.phaser(); - yield record.actionData(); - } - case ActionData tempData -> tempData; - default -> null; - }; - if (actionData == null) { - return; - } - - // 加锁确保同步 - synchronized (actionData) { - applyInterventions(interventions, actionData, phaser); - } - } - - private void applyInterventions(List interventions, ActionData actionData, Phaser phaser) { - boolean rebuildCleanTag = false; - - interventions.sort(Comparator.comparingInt(MetaIntervention::getOrder)); - - for (MetaIntervention intervention : interventions) { - List actions = intervention.getActions() - .stream() - .map(actionKey -> actionCapability.loadMetaAction(actionKey)) - .toList(); - - // TODO 需要将干预逻辑下放至 ActionCapability ,因为 ActionExecutor 中也存在干预操作 - switch (intervention.getType()) { - case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions); - case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions, phaser); - case InterventionType.DELETE -> handleDelete(actionData, intervention.getOrder(), actions); - case InterventionType.CANCEL -> handleCancel(actionData); - case InterventionType.REBUILD -> { - if (!rebuildCleanTag) { - cleanActionData(actionData); - rebuildCleanTag = true; - } - handleRebuild(actionData, intervention.getOrder(), actions); - } - } - } - - } - - /** - * 在未进入执行阶段的行动单元组新增新的行动 - */ - private void handleAppend(ActionData actionData, int order, List actions) { - if (order <= actionData.getExecutingStage()) - return; - - actionData.getActionChain().put(order, actions); - } - - /** - * 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动, 如果插入位置正处于执行阶段, 则启动执行线程, 通过 Phaser 确保同步 - */ - private void handleInsert(ActionData actionData, int order, List actions, Phaser phaser) { - if (order < actionData.getExecutingStage()) - return; - - phaser.register(); - try { - Map> actionChain = actionData.getActionChain(); - actionChain.put(order, actions); - - if (order == actionData.getExecutingStage()) { - ExecutorService virtualExecutor = actionCapability.getExecutor(ExecutorType.VIRTUAL); - ExecutorService platformExecutor = actionCapability.getExecutor(ExecutorType.PLATFORM); - ExecutorService executor; - phaser.bulkRegister(actions.size()); - - for (MetaAction action : actions) { - executor = action.isIo() ? virtualExecutor : platformExecutor; - executor.execute(() -> { - try { - runnerClient.submit(action); - } finally { - phaser.arriveAndDeregister(); - } - }); - } - - } - } finally { - phaser.arriveAndDeregister(); - } - } - - private void handleDelete(ActionData actionData, int order, List actions) { - if (order <= actionData.getExecutingStage()) - return; - - Map> actionChain = actionData.getActionChain(); - if (actionChain.containsKey(order)) { - actionChain.get(order).removeAll(actions); - if (actionChain.get(order).isEmpty()) { - actionChain.remove(order); - } - } - } - - private void handleCancel(ActionData actionData) { - actionData.setStatus(ActionStatus.FAILED); - actionData.setResult("行动取消"); - } - - private void handleRebuild(ActionData actionData, int order, List actions) { - Map> actionChain = actionData.getActionChain(); - actionChain.put(order, actions); - } - - private void cleanActionData(ActionData actionData) { - actionData.getActionChain().clear(); - actionData.setExecutingStage(0); - actionData.setStatus(ActionStatus.PREPARE); - actionData.getHistory().clear(); - } - }