refactor(executor): remove legacy PhaserRecord and related methods in ActionCapability

This commit is contained in:
2026-03-23 23:27:37 +08:00
parent 556b8a5348
commit b05ef8683d
4 changed files with 13 additions and 97 deletions

View File

@@ -12,7 +12,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
@Capability(value = "action") @Capability(value = "action")
public interface ActionCapability { public interface ActionCapability {
@@ -41,14 +40,6 @@ public interface ActionCapability {
ExecutorService getExecutor(ActionCore.ExecutorType type); ExecutorService getExecutor(ActionCore.ExecutorType type);
PhaserRecord putPhaserRecord(Phaser phaser, ExecutableAction executableAction);
void removePhaserRecord(Phaser phaser);
List<PhaserRecord> listPhaserRecords();
PhaserRecord getPhaserRecord(String tendency, String source);
MetaAction loadMetaAction(@NonNull String actionKey); MetaAction loadMetaAction(@NonNull String actionKey);
MetaActionInfo loadMetaActionInfo(@NonNull String actionKey); MetaActionInfo loadMetaActionInfo(@NonNull String actionKey);

View File

@@ -14,14 +14,16 @@ import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
import work.slhaf.partner.core.action.entity.intervention.InterventionType; import work.slhaf.partner.core.action.entity.intervention.InterventionType;
import work.slhaf.partner.core.action.entity.intervention.MetaIntervention; import work.slhaf.partner.core.action.entity.intervention.MetaIntervention;
import work.slhaf.partner.core.action.exception.ActionDataNotFoundException;
import work.slhaf.partner.core.action.exception.MetaActionNotFoundException; import work.slhaf.partner.core.action.exception.MetaActionNotFoundException;
import work.slhaf.partner.core.action.runner.RunnerClient; import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.core.action.runner.SandboxRunnerClient; import work.slhaf.partner.core.action.runner.SandboxRunnerClient;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -42,7 +44,6 @@ public class ActionCore extends PartnerCore<ActionCore> {
* 已存在的行动程序,键格式为‘<MCP-ServerName>::<Tool-Name>’,值为 MCP Server 通过 Resources 相关渠道传递的行动程序元信息 * 已存在的行动程序,键格式为‘<MCP-ServerName>::<Tool-Name>’,值为 MCP Server 通过 Resources 相关渠道传递的行动程序元信息
*/ */
private final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
private final List<PhaserRecord> phaserRecords = new ArrayList<>();
/** /**
* 持久行动池 * 持久行动池
*/ */
@@ -284,38 +285,6 @@ public class ActionCore extends PartnerCore<ActionCore> {
return existedMetaActions; return existedMetaActions;
} }
@CapabilityMethod
public synchronized PhaserRecord putPhaserRecord(Phaser phaser, ExecutableAction executableAction) {
PhaserRecord record = new PhaserRecord(phaser, executableAction);
phaserRecords.add(record);
return record;
}
@CapabilityMethod
public synchronized void removePhaserRecord(Phaser phaser) {
PhaserRecord remove = null;
for (PhaserRecord record : phaserRecords) {
if (record.phaser().equals(phaser)) {
remove = record;
}
}
if (remove != null) {
phaserRecords.remove(remove);
}
}
@CapabilityMethod
public PhaserRecord getPhaserRecord(String tendency, String source) {
for (PhaserRecord record : phaserRecords) {
ExecutableAction data = record.executableAction();
if (data.getTendency().equals(tendency) && data.getSource().equals(source)) {
return record;
}
}
throw new ActionDataNotFoundException("未找到对应的 Phaser 记录: tendency=" + tendency + ", source=" + source);
}
@CapabilityMethod @CapabilityMethod
public MetaAction loadMetaAction(@NonNull String actionKey) { public MetaAction loadMetaAction(@NonNull String actionKey) {
MetaActionInfo metaActionInfo = existedMetaActions.get(actionKey); MetaActionInfo metaActionInfo = existedMetaActions.get(actionKey);
@@ -341,11 +310,6 @@ public class ActionCore extends PartnerCore<ActionCore> {
); );
} }
@CapabilityMethod
public List<PhaserRecord> listPhaserRecords() {
return phaserRecords;
}
@CapabilityMethod @CapabilityMethod
public MetaActionInfo loadMetaActionInfo(@NonNull String actionKey) { public MetaActionInfo loadMetaActionInfo(@NonNull String actionKey) {
MetaActionInfo info = existedMetaActions.get(actionKey); MetaActionInfo info = existedMetaActions.get(actionKey);

View File

@@ -1,33 +0,0 @@
package work.slhaf.partner.core.action.entity;
import work.slhaf.partner.core.action.entity.Action.Status;
import java.util.concurrent.Phaser;
public record PhaserRecord(Phaser phaser, ExecutableAction executableAction) {
public void fail() {
executableAction.setStatus(Status.FAILED);
}
/**
* 负责将 ActionData 的状态设置为 INTERRUPTED
* 同时循环检查进行阻塞
*/
public void interrupt() {
executableAction.setStatus(Status.INTERRUPTED);
while (executableAction().getStatus() == Status.INTERRUPTED) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
}
}
/**
* 将状态重新设置为 EXECUTING ,恢复 interrupt 阻塞状态
*/
public void complete() {
executableAction().setStatus(Status.EXECUTING);
}
}

View File

@@ -116,7 +116,6 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
} }
// 注册执行中行动 // 注册执行中行动
val phaser = new Phaser(); val phaser = new Phaser();
val phaserRecord = actionCapability.putPhaserRecord(phaser, executableAction);
executableAction.setStatus(Action.Status.EXECUTING); executableAction.setStatus(Action.Status.EXECUTING);
// 开始执行 // 开始执行
val stageCursor = new Object() { val stageCursor = new Object() {
@@ -157,8 +156,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
stageCursor.init(); stageCursor.init();
do { do {
val metaActions = actionChain.get(executableAction.getExecutingStage()); val metaActions = actionChain.get(executableAction.getExecutingStage());
val recognizerRecord = startRecognizerIfNeeded(executableAction, phaserRecord); val recognizerRecord = startRecognizerIfNeeded(executableAction, phaser);
val listeningRecord = executeAndListening(metaActions, phaserRecord, source); val listeningRecord = executeAndListening(metaActions, phaser, executableAction, source);
phaser.awaitAdvance(listeningRecord.phase()); phaser.awaitAdvance(listeningRecord.phase());
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
// 导致新行动的 phaser 投放阶段错乱无法阻塞的场景 // 导致新行动的 phaser 投放阶段错乱无法阻塞的场景
@@ -185,19 +184,16 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
// 如果第一次已经推进完毕,本次将会跳过 // 如果第一次已经推进完毕,本次将会跳过
stageCursor.requestAdvance(); stageCursor.requestAdvance();
} while (stageCursor.next()); } while (stageCursor.next());
// 结束
actionCapability.removePhaserRecord(phaser);
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果 // 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
if (executableAction instanceof SchedulableExecutableAction scheduledActionData) { if (executableAction instanceof SchedulableExecutableAction scheduledActionData) {
scheduledActionData.recordAndReset(); scheduledActionData.recordAndReset();
} }
} }
private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, PhaserRecord phaserRecord, String source) { private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, Phaser phaser, ExecutableAction executableAction, String source) {
AtomicBoolean accepting = new AtomicBoolean(true); AtomicBoolean accepting = new AtomicBoolean(true);
AtomicInteger cursor = new AtomicInteger(); AtomicInteger cursor = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
val phaser = phaserRecord.phaser();
val phase = phaser.register(); val phase = phaser.register();
platformExecutor.execute(() -> { platformExecutor.execute(() -> {
boolean first = true; boolean first = true;
@@ -219,7 +215,7 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
continue; continue;
} }
ExecutorService executor = next.getIo() ? virtualExecutor : platformExecutor; ExecutorService executor = next.getIo() ? virtualExecutor : platformExecutor;
executor.execute(buildMataActionTask(next, phaserRecord, source)); executor.execute(buildMataActionTask(next, phaser, executableAction, source));
if (first) { if (first) {
phaser.arriveAndDeregister(); phaser.arriveAndDeregister();
latch.countDown(); latch.countDown();
@@ -236,13 +232,12 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
return new MetaActionsListeningRecord(accepting, phase); return new MetaActionsListeningRecord(accepting, phase);
} }
private Runnable buildMataActionTask(MetaAction metaAction, PhaserRecord phaserRecord, String source) { private Runnable buildMataActionTask(MetaAction metaAction, Phaser phaser, ExecutableAction executableAction, String source) {
val phaser = phaserRecord.phaser();
phaser.register(); phaser.register();
return () -> { return () -> {
val actionKey = metaAction.getKey(); val actionKey = metaAction.getKey();
try { try {
executeMetaActionWithRetry(metaAction, phaserRecord, source); executeMetaActionWithRetry(metaAction, executableAction, source);
} catch (Exception e) { } catch (Exception e) {
log.error("Action executing failed: {}", actionKey, e); log.error("Action executing failed: {}", actionKey, e);
} finally { } finally {
@@ -251,9 +246,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
}; };
} }
private void executeMetaActionWithRetry(MetaAction metaAction, PhaserRecord phaserRecord, String source) { private void executeMetaActionWithRetry(MetaAction metaAction, ExecutableAction actionData, String source) {
String failureReason = "参数提取失败"; String failureReason = "参数提取失败";
val actionData = phaserRecord.executableAction();
val actionKey = metaAction.getKey(); val actionKey = metaAction.getKey();
for (int attempt = 1; attempt <= MAX_EXTRACTOR_ATTEMPTS; attempt++) { for (int attempt = 1; attempt <= MAX_EXTRACTOR_ATTEMPTS; attempt++) {
val result = metaAction.getResult(); val result = metaAction.getResult();
@@ -302,12 +296,12 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
metaAction.getResult().setData(failureReason); metaAction.getResult().setData(failureReason);
} }
private RecognizerTaskRecord startRecognizerIfNeeded(ExecutableAction executableAction, PhaserRecord phaserRecord) { private RecognizerTaskRecord startRecognizerIfNeeded(ExecutableAction executableAction, Phaser phaser) {
if (!shouldRunCorrectionRecognizer(executableAction)) { if (!shouldRunCorrectionRecognizer(executableAction)) {
return RecognizerTaskRecord.disabled(); return RecognizerTaskRecord.disabled();
} }
val recognizerInput = assemblyHelper.buildRecognizerInput(executableAction); val recognizerInput = assemblyHelper.buildRecognizerInput(executableAction);
val task = buildRecognizerTask(recognizerInput, phaserRecord.phaser()); val task = buildRecognizerTask(recognizerInput, phaser);
Future<CorrectionRecognizerResult> future = virtualExecutor.submit(task); Future<CorrectionRecognizerResult> future = virtualExecutor.submit(task);
return new RecognizerTaskRecord(true, future); return new RecognizerTaskRecord(true, future);
} }