refactor(action): support executing any type of Actions with virtual thread in ActionExecutor, ActionScheduler will send actionData to executor directly

This commit is contained in:
2026-03-07 15:05:51 +08:00
parent 4ee7a52f42
commit 2baa3971a8
4 changed files with 126 additions and 136 deletions

View File

@@ -50,104 +50,121 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
runnerClient = actionCapability.runnerClient();
}
/**
* 执行行动
*
* @param input ActionExecutor 输入内容
*/
public void execute(ActionExecutorInput input) {
val actions = input.getActions();
// 异步执行所有行动
for (ExecutableAction executableAction : actions) {
platformExecutor.execute(() -> {
val source = executableAction.getSource();
if (executableAction.getStatus() != Action.Status.PREPARE) {
return;
public void execute(Action action) {
virtualExecutor.execute(actionExecutionRouter(action));
}
private Runnable actionExecutionRouter(Action action) {
return () -> {
try {
switch (action) {
case ExecutableAction executableAction -> handleExecutableAction(executableAction);
case StateAction stateAction -> handleStateAction(stateAction);
default -> handleUnknownAction(action);
}
val actionChain = executableAction.getActionChain();
if (actionChain.isEmpty()) {
executableAction.setStatus(Action.Status.FAILED);
executableAction.setResult("行动链为空");
return;
} catch (Exception e) {
log.warn("Action execute failure, uuid: {}, description: {}, failure reason: {}", action.getUuid(), action.getDescription(), e.getLocalizedMessage());
action.setStatus(Action.Status.FAILED);
}
};
}
private void handleUnknownAction(Action action) {
log.warn("unknown Action type: {}", action.getClass().getSimpleName());
action.setStatus(Action.Status.FAILED);
}
private void handleStateAction(StateAction stateAction) {
stateAction.getTrigger().onTrigger();
}
private void handleExecutableAction(ExecutableAction executableAction) {
val source = executableAction.getSource();
if (executableAction.getStatus() != Action.Status.PREPARE) {
return;
}
val actionChain = executableAction.getActionChain();
if (actionChain.isEmpty()) {
executableAction.setStatus(Action.Status.FAILED);
executableAction.setResult("行动链为空");
return;
}
// 注册执行中行动
val phaser = new Phaser();
val phaserRecord = actionCapability.putPhaserRecord(phaser, executableAction);
executableAction.setStatus(Action.Status.EXECUTING);
// 开始执行
val stageCursor = new Object() {
int stageCount;
boolean executingStageUpdated;
boolean stageCountUpdated;
void init() {
stageCount = 0;
executingStageUpdated = false;
stageCountUpdated = false;
update();
}
void requestAdvance() {
if (!stageCountUpdated) {
stageCount++;
stageCountUpdated = true;
}
// 注册执行中行动
val phaser = new Phaser();
val phaserRecord = actionCapability.putPhaserRecord(phaser, executableAction);
executableAction.setStatus(Action.Status.EXECUTING);
// 开始执行
val stageCursor = new Object() {
int stageCount;
boolean executingStageUpdated;
boolean stageCountUpdated;
void init() {
stageCount = 0;
executingStageUpdated = false;
stageCountUpdated = false;
update();
}
void requestAdvance() {
if (!stageCountUpdated) {
stageCount++;
stageCountUpdated = true;
}
if (stageCount < actionChain.size() && !executingStageUpdated) {
update();
executingStageUpdated = true;
}
}
boolean next() {
executingStageUpdated = false;
stageCountUpdated = false;
return stageCount < actionChain.size();
}
void update() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
executableAction.setExecutingStage(orderList.get(stageCount));
}
};
stageCursor.init();
do {
val metaActions = actionChain.get(executableAction.getExecutingStage());
val listeningRecord = executeAndListening(metaActions, phaserRecord, source);
phaser.awaitAdvance(listeningRecord.phase());
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
// 导致新行动的 phaser 投放阶段错乱无法阻塞的场景
// 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题
synchronized (listeningRecord.accepting()) {
listeningRecord.accepting().set(false);
// 立即尝试推进,本次推进中,如果前方仍有未执行 stage将执行一次阶段推进
stageCursor.requestAdvance();
}
try {
// 针对行动链进行修正,修正需要传入执行历史、行动目标等内容
// 如果后续运行 corrector 触发频率较高,可考虑增加重试机制
val correctorInput = assemblyHelper.buildCorrectorInput(executableAction, source);
val correctorResult = actionCorrector.execute(correctorInput);
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction);
} catch (Exception ignored) {
}
// 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时但 corrector 执行期间发生了 actionChain 的插入事件
// 如果第一次已经推进完毕,本次将会跳过
stageCursor.requestAdvance();
} while (stageCursor.next());
// 结束
actionCapability.removePhaserRecord(phaser);
if (executableAction.getStatus() != Action.Status.FAILED) {
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
if (executableAction instanceof SchedulableExecutableAction scheduledActionData) {
scheduledActionData.recordAndReset();
actionScheduler.schedule(Set.of(scheduledActionData));
} else {
executableAction.setStatus(Action.Status.SUCCESS);
}
// TODO 执行过后需要回写至任务上下文recentCompletedTask同时触发自对话信号进行确认并记录以及是否通知用户触发与否需要机制进行匹配在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等)
if (stageCount < actionChain.size() && !executingStageUpdated) {
update();
executingStageUpdated = true;
}
});
}
boolean next() {
executingStageUpdated = false;
stageCountUpdated = false;
return stageCount < actionChain.size();
}
void update() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
executableAction.setExecutingStage(orderList.get(stageCount));
}
};
stageCursor.init();
do {
val metaActions = actionChain.get(executableAction.getExecutingStage());
val listeningRecord = executeAndListening(metaActions, phaserRecord, source);
phaser.awaitAdvance(listeningRecord.phase());
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
// 导致新行动的 phaser 投放阶段错乱无法阻塞的场景
// 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题
synchronized (listeningRecord.accepting()) {
listeningRecord.accepting().set(false);
// 立即尝试推进,本次推进中,如果前方仍有未执行 stage将执行一次阶段推进
stageCursor.requestAdvance();
}
try {
// 针对行动链进行修正,修正需要传入执行历史、行动目标等内容
// 如果后续运行 corrector 触发频率较高,可考虑增加重试机制
val correctorInput = assemblyHelper.buildCorrectorInput(executableAction, source);
val correctorResult = actionCorrector.execute(correctorInput);
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction);
} catch (Exception ignored) {
}
// 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时但 corrector 执行期间发生了 actionChain 的插入事件
// 如果第一次已经推进完毕,本次将会跳过
stageCursor.requestAdvance();
} while (stageCursor.next());
// 结束
actionCapability.removePhaserRecord(phaser);
if (executableAction.getStatus() != Action.Status.FAILED) {
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
if (executableAction instanceof SchedulableExecutableAction scheduledActionData) {
scheduledActionData.recordAndReset();
actionScheduler.schedule(Set.of(scheduledActionData));
} else {
executableAction.setStatus(Action.Status.SUCCESS);
}
// TODO 执行过后需要回写至任务上下文recentCompletedTask同时触发自对话信号进行确认并记录以及是否通知用户触发与否需要机制进行匹配在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等)
}
}

View File

@@ -17,7 +17,6 @@ import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.core.perceive.PerceiveCapability;
import work.slhaf.partner.module.common.module.PreRunningAbstractAgentModuleAbstract;
import work.slhaf.partner.module.modules.action.executor.ActionExecutor;
import work.slhaf.partner.module.modules.action.executor.entity.ActionExecutorInput;
import work.slhaf.partner.module.modules.action.planner.confirmer.ActionConfirmer;
import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerInput;
import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerResult;
@@ -154,8 +153,7 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract {
// execute or schedule it immediately
switch (executableAction) {
case SchedulableExecutableAction action -> actionScheduler.schedule(Set.of(action));
case ImmediateExecutableAction action ->
actionExecutor.execute(new ActionExecutorInput(Set.of(action)));
case ImmediateExecutableAction action -> actionExecutor.execute(action);
default -> log.error("unknown executable action type: {}", executableAction.getClass().getSimpleName());
}
}

View File

@@ -16,12 +16,10 @@ import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentMod
import work.slhaf.partner.api.agent.factory.component.annotation.Init
import work.slhaf.partner.api.agent.factory.component.annotation.InjectModule
import work.slhaf.partner.core.action.ActionCapability
import work.slhaf.partner.core.action.ActionCore
import work.slhaf.partner.core.action.entity.Action
import work.slhaf.partner.core.action.entity.Schedulable
import work.slhaf.partner.core.action.entity.SchedulableExecutableAction
import work.slhaf.partner.core.action.entity.StateAction
import work.slhaf.partner.module.modules.action.executor.ActionExecutor
import work.slhaf.partner.module.modules.action.executor.entity.ActionExecutorInput
import java.io.Closeable
import java.time.Duration
import java.time.ZonedDateTime
@@ -54,17 +52,8 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
.collect(Collectors.toSet())
}
val onTrigger: (Set<Schedulable>) -> Unit = { schedulableSet ->
val executableActions = mutableSetOf<SchedulableExecutableAction>()
val stateActions = mutableSetOf<StateAction>()
for (schedulable in schedulableSet) {
when (schedulable) {
is SchedulableExecutableAction -> executableActions.add(schedulable)
is StateAction -> stateActions.add(schedulable)
}
}
actionExecutor.execute(ActionExecutorInput(executableActions))
actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)
.execute { stateActions.forEach { it.trigger.onTrigger() } }
schedulableSet.filterIsInstance<Action>()
.forEach { actionExecutor.execute(it) }
}
timeWheel = TimeWheel(listScheduledActions, onTrigger)
}