refactor(ActionExecutor): remove userId from ActionExecutorInput and use source

This commit is contained in:
2026-02-08 11:29:36 +08:00
parent 9f724cee5d
commit fe5a366527
4 changed files with 15 additions and 21 deletions

View File

@@ -58,7 +58,6 @@ public class ActionDispatcher extends PostRunningModule {
} }
} }
val actionExecutorInput = ActionExecutorInput.builder() val actionExecutorInput = ActionExecutorInput.builder()
.userId(userId)
.actions(immediateActions) .actions(immediateActions)
.build(); .build();
actionExecutor.execute(actionExecutorInput); actionExecutor.execute(actionExecutorInput);

View File

@@ -64,11 +64,11 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
@Override @Override
public Void execute(ActionExecutorInput input) { public Void execute(ActionExecutorInput input) {
val actions = input.getActions(); val actions = input.getActions();
val userId = input.getUserId();
// 异步执行所有行动 // 异步执行所有行动
for (ActionData actionData : actions) { for (ActionData actionData : actions) {
platformExecutor.execute(() -> { platformExecutor.execute(() -> {
if (actionData.getStatus() != ActionData.ActionStatus.PREPARE) { val source = actionData.getSource();
if (actionData.getStatus() != ActionStatus.PREPARE) {
return; return;
} }
val actionChain = actionData.getActionChain(); val actionChain = actionData.getActionChain();
@@ -80,7 +80,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
// 注册执行中行动 // 注册执行中行动
val phaser = new Phaser(); val phaser = new Phaser();
val phaserRecord = actionCapability.putPhaserRecord(phaser, actionData); val phaserRecord = actionCapability.putPhaserRecord(phaser, actionData);
actionData.setStatus(ActionData.ActionStatus.EXECUTING); actionData.setStatus(ActionStatus.EXECUTING);
// 开始执行 // 开始执行
val stageCursor = new Object() { val stageCursor = new Object() {
@@ -124,7 +124,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
do { do {
val metaActions = actionChain.get(actionData.getExecutingStage()); val metaActions = actionChain.get(actionData.getExecutingStage());
val listeningRecord = executeAndListening(metaActions, phaserRecord, userId); val listeningRecord = executeAndListening(metaActions, phaserRecord, source);
phaser.awaitAdvance(listeningRecord.phase()); phaser.awaitAdvance(listeningRecord.phase());
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
@@ -140,7 +140,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
try { try {
// 针对行动链进行修正,修正需要传入执行历史、行动目标等内容 // 针对行动链进行修正,修正需要传入执行历史、行动目标等内容
// 如果后续运行 corrector 触发频率较高,可考虑增加重试机制 // 如果后续运行 corrector 触发频率较高,可考虑增加重试机制
val correctorInput = assemblyHelper.buildCorrectorInput(actionData, userId); val correctorInput = assemblyHelper.buildCorrectorInput(actionData, source);
val correctorResult = actionCorrector.execute(correctorInput); val correctorResult = actionCorrector.execute(correctorInput);
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), actionData); actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), actionData);
} catch (Exception ignored) { } catch (Exception ignored) {
@@ -153,7 +153,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
// 结束 // 结束
actionCapability.removePhaserRecord(phaser); actionCapability.removePhaserRecord(phaser);
if (actionData.getStatus() != ActionData.ActionStatus.FAILED) { if (actionData.getStatus() != ActionStatus.FAILED) {
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果 // 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
if (actionData instanceof ScheduledActionData scheduledActionData) { if (actionData instanceof ScheduledActionData scheduledActionData) {
scheduledActionData.recordAndReset(); scheduledActionData.recordAndReset();
@@ -168,7 +168,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
} }
private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, PhaserRecord phaserRecord, String userId) { private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, PhaserRecord phaserRecord, String source) {
AtomicBoolean accepting = new AtomicBoolean(true); AtomicBoolean accepting = new AtomicBoolean(true);
AtomicInteger cursor = new AtomicInteger(); AtomicInteger cursor = new AtomicInteger();
@@ -199,7 +199,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
} }
ExecutorService executor = next.isIo() ? virtualExecutor : platformExecutor; ExecutorService executor = next.isIo() ? virtualExecutor : platformExecutor;
executor.execute(buildMataActionTask(next, phaserRecord, userId)); executor.execute(buildMataActionTask(next, phaserRecord, source));
if (first) { if (first) {
phaser.arriveAndDeregister(); phaser.arriveAndDeregister();
@@ -217,7 +217,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
return new MetaActionsListeningRecord(accepting, phase); return new MetaActionsListeningRecord(accepting, phase);
} }
private Runnable buildMataActionTask(MetaAction metaAction, PhaserRecord phaserRecord, String userId) { private Runnable buildMataActionTask(MetaAction metaAction, PhaserRecord phaserRecord, String source) {
val phaser = phaserRecord.phaser(); val phaser = phaserRecord.phaser();
phaser.register(); phaser.register();
return () -> { return () -> {
@@ -229,7 +229,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
val executingStage = actionData.getExecutingStage(); val executingStage = actionData.getExecutingStage();
val historyActionResults = actionData.getHistory().get(executingStage); val historyActionResults = actionData.getHistory().get(executingStage);
val additionalContext = actionData.getAdditionalContext().get(executingStage); val additionalContext = actionData.getAdditionalContext().get(executingStage);
val extractorInput = assemblyHelper.buildExtractorInput(metaAction, userId, historyActionResults, additionalContext); val extractorInput = assemblyHelper.buildExtractorInput(metaAction, source, historyActionResults, additionalContext);
val extractorResult = paramsExtractor.execute(extractorInput); val extractorResult = paramsExtractor.execute(extractorInput);
if (extractorResult.isOk()) { if (extractorResult.isOk()) {
@@ -240,7 +240,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
.computeIfAbsent(executingStage, integer -> new ArrayList<>()) .computeIfAbsent(executingStage, integer -> new ArrayList<>())
.add(historyAction); .add(historyAction);
} else { } else {
val repairerInput = assemblyHelper.buildRepairerInput(historyActionResults, metaAction, userId); val repairerInput = assemblyHelper.buildRepairerInput(historyActionResults, metaAction, source);
val repairerResult = actionRepairer.execute(repairerInput); val repairerResult = actionRepairer.execute(repairerInput);
switch (repairerResult.getStatus()) { switch (repairerResult.getStatus()) {
// 如果本次修复被认为成功,则将补充的信息添加至 additionalContext // 如果本次修复被认为成功,则将补充的信息添加至 additionalContext
@@ -289,10 +289,10 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
return input; return input;
} }
private ExtractorInput buildExtractorInput(MetaAction action, String userId, List<HistoryAction> historyActionResults, private ExtractorInput buildExtractorInput(MetaAction action, String source, List<HistoryAction> historyActionResults,
List<String> additionalContext) { List<String> additionalContext) {
ExtractorInput input = new ExtractorInput(); ExtractorInput input = new ExtractorInput();
input.setEvaluatedSlices(memoryCapability.getActivatedSlices(userId)); input.setEvaluatedSlices(memoryCapability.getActivatedSlices(source));
input.setRecentMessages(cognationCapability.getChatMessages()); input.setRecentMessages(cognationCapability.getChatMessages());
input.setMetaActionInfo(actionCapability.loadMetaActionInfo(action.getKey())); input.setMetaActionInfo(actionCapability.loadMetaActionInfo(action.getKey()));
input.setHistoryActionResults(historyActionResults); input.setHistoryActionResults(historyActionResults);
@@ -300,7 +300,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
return input; return input;
} }
private CorrectorInput buildCorrectorInput(ActionData actionData, String userId) { private CorrectorInput buildCorrectorInput(ActionData actionData, String source) {
return CorrectorInput.builder() return CorrectorInput.builder()
.tendency(actionData.getTendency()) .tendency(actionData.getTendency())
.source(actionData.getSource()) .source(actionData.getSource())
@@ -309,7 +309,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
.history(actionData.getHistory().get(actionData.getExecutingStage())) .history(actionData.getHistory().get(actionData.getExecutingStage()))
.status(actionData.getStatus()) .status(actionData.getStatus())
.recentMessages(cognationCapability.getChatMessages()) .recentMessages(cognationCapability.getChatMessages())
.activatedSlices(memoryCapability.getActivatedSlices(userId)) .activatedSlices(memoryCapability.getActivatedSlices(source))
.build(); .build();
} }
} }

View File

@@ -9,10 +9,6 @@ import java.util.Set;
@Data @Data
@Builder @Builder
public class ActionExecutorInput { public class ActionExecutorInput {
/**
* 用户ID
*/
private String userId;
/** /**
* 将执行的行动数据列表 * 将执行的行动数据列表
*/ */

View File

@@ -370,7 +370,6 @@ class ActionExecutorTest {
private ActionExecutorInput buildInput(String userId, ImmediateActionData actionData) { private ActionExecutorInput buildInput(String userId, ImmediateActionData actionData) {
return ActionExecutorInput.builder() return ActionExecutorInput.builder()
.userId(userId)
.actions(Set.of(actionData)) .actions(Set.of(actionData))
.build(); .build();
} }