diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCapability.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCapability.java index 8f7c0643..aa36ef71 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCapability.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCapability.java @@ -12,7 +12,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Phaser; @Capability(value = "action") public interface ActionCapability { @@ -41,14 +40,6 @@ public interface ActionCapability { ExecutorService getExecutor(ActionCore.ExecutorType type); - PhaserRecord putPhaserRecord(Phaser phaser, ExecutableAction executableAction); - - void removePhaserRecord(Phaser phaser); - - List listPhaserRecords(); - - PhaserRecord getPhaserRecord(String tendency, String source); - MetaAction loadMetaAction(@NonNull String actionKey); MetaActionInfo loadMetaActionInfo(@NonNull String actionKey); diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 753e5f91..bd76504c 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -14,14 +14,16 @@ import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData; import work.slhaf.partner.core.action.entity.intervention.InterventionType; import work.slhaf.partner.core.action.entity.intervention.MetaIntervention; -import work.slhaf.partner.core.action.exception.ActionDataNotFoundException; import work.slhaf.partner.core.action.exception.MetaActionNotFoundException; import work.slhaf.partner.core.action.runner.RunnerClient; import work.slhaf.partner.core.action.runner.SandboxRunnerClient; import java.io.IOException; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -42,7 +44,6 @@ public class ActionCore extends PartnerCore { * 已存在的行动程序,键格式为‘::’,值为 MCP Server 通过 Resources 相关渠道传递的行动程序元信息 */ private final ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); - private final List phaserRecords = new ArrayList<>(); /** * 持久行动池 */ @@ -284,38 +285,6 @@ public class ActionCore extends PartnerCore { return existedMetaActions; } - @CapabilityMethod - public synchronized PhaserRecord putPhaserRecord(Phaser phaser, ExecutableAction executableAction) { - PhaserRecord record = new PhaserRecord(phaser, executableAction); - phaserRecords.add(record); - return record; - } - - @CapabilityMethod - public synchronized void removePhaserRecord(Phaser phaser) { - PhaserRecord remove = null; - for (PhaserRecord record : phaserRecords) { - if (record.phaser().equals(phaser)) { - remove = record; - } - } - - if (remove != null) { - phaserRecords.remove(remove); - } - } - - @CapabilityMethod - public PhaserRecord getPhaserRecord(String tendency, String source) { - for (PhaserRecord record : phaserRecords) { - ExecutableAction data = record.executableAction(); - if (data.getTendency().equals(tendency) && data.getSource().equals(source)) { - return record; - } - } - throw new ActionDataNotFoundException("未找到对应的 Phaser 记录: tendency=" + tendency + ", source=" + source); - } - @CapabilityMethod public MetaAction loadMetaAction(@NonNull String actionKey) { MetaActionInfo metaActionInfo = existedMetaActions.get(actionKey); @@ -341,11 +310,6 @@ public class ActionCore extends PartnerCore { ); } - @CapabilityMethod - public List listPhaserRecords() { - return phaserRecords; - } - @CapabilityMethod public MetaActionInfo loadMetaActionInfo(@NonNull String actionKey) { MetaActionInfo info = existedMetaActions.get(actionKey); diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/PhaserRecord.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/PhaserRecord.java deleted file mode 100644 index 6b2bcc8c..00000000 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/PhaserRecord.java +++ /dev/null @@ -1,33 +0,0 @@ -package work.slhaf.partner.core.action.entity; - -import work.slhaf.partner.core.action.entity.Action.Status; - -import java.util.concurrent.Phaser; - -public record PhaserRecord(Phaser phaser, ExecutableAction executableAction) { - - public void fail() { - executableAction.setStatus(Status.FAILED); - } - - /** - * 负责将 ActionData 的状态设置为 INTERRUPTED - * 同时循环检查进行阻塞 - */ - public void interrupt() { - executableAction.setStatus(Status.INTERRUPTED); - while (executableAction().getStatus() == Status.INTERRUPTED) { - try { - Thread.sleep(500); - } catch (InterruptedException ignored) { - } - } - } - - /** - * 将状态重新设置为 EXECUTING ,恢复 interrupt 阻塞状态 - */ - public void complete() { - executableAction().setStatus(Status.EXECUTING); - } -} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java index d6ba8375..1fe9ad18 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java @@ -116,7 +116,6 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { } // 注册执行中行动 val phaser = new Phaser(); - val phaserRecord = actionCapability.putPhaserRecord(phaser, executableAction); executableAction.setStatus(Action.Status.EXECUTING); // 开始执行 val stageCursor = new Object() { @@ -157,8 +156,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { stageCursor.init(); do { val metaActions = actionChain.get(executableAction.getExecutingStage()); - val recognizerRecord = startRecognizerIfNeeded(executableAction, phaserRecord); - val listeningRecord = executeAndListening(metaActions, phaserRecord, source); + val recognizerRecord = startRecognizerIfNeeded(executableAction, phaser); + val listeningRecord = executeAndListening(metaActions, phaser, executableAction, source); phaser.awaitAdvance(listeningRecord.phase()); // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 // 导致新行动的 phaser 投放阶段错乱无法阻塞的场景 @@ -185,19 +184,16 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { // 如果第一次已经推进完毕,本次将会跳过 stageCursor.requestAdvance(); } while (stageCursor.next()); - // 结束 - actionCapability.removePhaserRecord(phaser); // 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果 if (executableAction instanceof SchedulableExecutableAction scheduledActionData) { scheduledActionData.recordAndReset(); } } - private MetaActionsListeningRecord executeAndListening(List metaActions, PhaserRecord phaserRecord, String source) { + private MetaActionsListeningRecord executeAndListening(List metaActions, Phaser phaser, ExecutableAction executableAction, String source) { AtomicBoolean accepting = new AtomicBoolean(true); AtomicInteger cursor = new AtomicInteger(); CountDownLatch latch = new CountDownLatch(1); - val phaser = phaserRecord.phaser(); val phase = phaser.register(); platformExecutor.execute(() -> { boolean first = true; @@ -219,7 +215,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { continue; } ExecutorService executor = next.getIo() ? virtualExecutor : platformExecutor; - executor.execute(buildMataActionTask(next, phaserRecord, source)); + executor.execute(buildMataActionTask(next, phaser, executableAction, source)); if (first) { phaser.arriveAndDeregister(); latch.countDown(); @@ -236,13 +232,12 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { return new MetaActionsListeningRecord(accepting, phase); } - private Runnable buildMataActionTask(MetaAction metaAction, PhaserRecord phaserRecord, String source) { - val phaser = phaserRecord.phaser(); + private Runnable buildMataActionTask(MetaAction metaAction, Phaser phaser, ExecutableAction executableAction, String source) { phaser.register(); return () -> { val actionKey = metaAction.getKey(); try { - executeMetaActionWithRetry(metaAction, phaserRecord, source); + executeMetaActionWithRetry(metaAction, executableAction, source); } catch (Exception e) { log.error("Action executing failed: {}", actionKey, e); } finally { @@ -251,9 +246,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { }; } - private void executeMetaActionWithRetry(MetaAction metaAction, PhaserRecord phaserRecord, String source) { + private void executeMetaActionWithRetry(MetaAction metaAction, ExecutableAction actionData, String source) { String failureReason = "参数提取失败"; - val actionData = phaserRecord.executableAction(); val actionKey = metaAction.getKey(); for (int attempt = 1; attempt <= MAX_EXTRACTOR_ATTEMPTS; attempt++) { val result = metaAction.getResult(); @@ -302,12 +296,12 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { metaAction.getResult().setData(failureReason); } - private RecognizerTaskRecord startRecognizerIfNeeded(ExecutableAction executableAction, PhaserRecord phaserRecord) { + private RecognizerTaskRecord startRecognizerIfNeeded(ExecutableAction executableAction, Phaser phaser) { if (!shouldRunCorrectionRecognizer(executableAction)) { return RecognizerTaskRecord.disabled(); } val recognizerInput = assemblyHelper.buildRecognizerInput(executableAction); - val task = buildRecognizerTask(recognizerInput, phaserRecord.phaser()); + val task = buildRecognizerTask(recognizerInput, phaser); Future future = virtualExecutor.submit(task); return new RecognizerTaskRecord(true, future); }