推进行动执行模块: 调整了 ActionExecutor 以支持行动链动态修复和参数提取; 完善了 ActionRepairer、ParamsExtractor 的主要逻辑; 完善了部分数据类的内容

- 在 ActionData 中新增 additionalContext 用于存储各个执行阶段临时修复生成的上下文,同样以执行阶段为键
- 调整 ActionExecutor 的输入参数,可传入用户标识,用于执行器调用 ActionRepairer 的修复过程
- 完善了 ActionExecutor 中行动单元的执行与修复逻辑,将支持正常状态推进执行、触发自对话时阻塞当前行动单元、所有修复方式失败时将整个行动数据标为 FAILED
- 完善了 ActionExecutor 中各个DTO的构建方法
- 完善了 ParamsExtractor 中的参数提取逻辑
- 在 PhaserRecord 中新增 interrupt 和 complete 方法,将用于后续行动单元的阻塞(ActionExecutor中)与恢复(InterventionHandler中)
- 完善了 ActionRepairer 中的修复逻辑,但自对话通道的暴露方式、DynamicActionGenerator 的具体逻辑待完善
This commit is contained in:
2025-12-05 21:58:21 +08:00
parent ad973d4230
commit 6a351413a1
15 changed files with 477 additions and 87 deletions

View File

@@ -34,7 +34,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
/** /**
* 持久行动池以用户id为键存储所有状态的任务 * 持久行动池以用户id为键存储所有状态的任务
*/ */
private HashMap<String, List<ActionData>> actionPool = new HashMap<>();//TODO 考虑是否取消用户分池 private HashMap<String, List<ActionData>> actionPool = new HashMap<>();// TODO 考虑是否取消用户分池
/** /**
* 待确认任务以userId区分不同用户因为需要跨请求确认 * 待确认任务以userId区分不同用户因为需要跨请求确认
@@ -48,7 +48,8 @@ public class ActionCore extends PartnerCore<ActionCore> {
private final Lock cacheLock = new ReentrantLock(); private final Lock cacheLock = new ReentrantLock();
private final ExecutorService platformExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private final ExecutorService platformExecutor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); private final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
/** /**
@@ -129,10 +130,11 @@ public class ActionCore extends PartnerCore<ActionCore> {
return null; return null;
} }
VectorClient vectorClient = VectorClient.INSTANCE; VectorClient vectorClient = VectorClient.INSTANCE;
//计算本次输入的向量 // 计算本次输入的向量
float[] vector = vectorClient.compute(input); float[] vector = vectorClient.compute(input);
if (vector == null) return null; if (vector == null)
//与现有缓存比对,将匹配到的收集并返回 return null;
// 与现有缓存比对,将匹配到的收集并返回
return actionCache.parallelStream() return actionCache.parallelStream()
.filter(ActionCacheData::isActivated) .filter(ActionCacheData::isActivated)
.filter(data -> { .filter(data -> {
@@ -257,10 +259,10 @@ public class ActionCore extends PartnerCore<ActionCore> {
* @param inputVector 本次输入内容的语义向量 * @param inputVector 本次输入内容的语义向量
* @param vectorClient 向量客户端 * @param vectorClient 向量客户端
*/ */
private void adjustMatchAndPassed(List<CacheAdjustMetaData> matchAndPassed, float[] inputVector, String private void adjustMatchAndPassed(List<CacheAdjustMetaData> matchAndPassed, float[] inputVector, String input,
input, VectorClient vectorClient) { VectorClient vectorClient) {
matchAndPassed.forEach(adjustData -> { matchAndPassed.forEach(adjustData -> {
//获取原始缓存条目 // 获取原始缓存条目
String tendency = adjustData.getTendency(); String tendency = adjustData.getTendency();
ActionCacheData primaryCacheData = selectCacheData(tendency); ActionCacheData primaryCacheData = selectCacheData(tendency);
if (primaryCacheData == null) { if (primaryCacheData == null) {
@@ -279,7 +281,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
private void adjustMatchNotPassed(List<CacheAdjustMetaData> matchNotPassed, VectorClient vectorClient) { private void adjustMatchNotPassed(List<CacheAdjustMetaData> matchNotPassed, VectorClient vectorClient) {
List<ActionCacheData> toRemove = new ArrayList<>(); List<ActionCacheData> toRemove = new ArrayList<>();
matchNotPassed.forEach(adjustData -> { matchNotPassed.forEach(adjustData -> {
//获取原始缓存条目 // 获取原始缓存条目
String tendency = adjustData.getTendency(); String tendency = adjustData.getTendency();
ActionCacheData primaryCacheData = selectCacheData(tendency); ActionCacheData primaryCacheData = selectCacheData(tendency);
if (primaryCacheData == null) { if (primaryCacheData == null) {
@@ -299,13 +301,13 @@ public class ActionCore extends PartnerCore<ActionCore> {
/** /**
* 针对未命中但评估通过的缓存做出调整: * 针对未命中但评估通过的缓存做出调整:
* <ol> * <ol>
* <h3>如果存在缓存条目</h3> * <h3>如果存在缓存条目</h3>
* <li> * <li>
* 若已生效,但此时未匹配到则说明尚未生效或者阈值、向量{@link ActionCacheData#getInputVector()}存在问题,调低阈值,同时带权移动平均 * 若已生效,但此时未匹配到则说明尚未生效或者阈值、向量{@link ActionCacheData#getInputVector()}存在问题,调低阈值,同时带权移动平均
* </li> * </li>
* <li> * <li>
* 若未生效,则只增加计数并带权移动平均 * 若未生效,则只增加计数并带权移动平均
* </li> * </li>
* </ol> * </ol>
* 如果不存在缓存条目,则新增并填充字段 * 如果不存在缓存条目,则新增并填充字段
* *
@@ -314,10 +316,10 @@ public class ActionCore extends PartnerCore<ActionCore> {
* @param input 本次输入内容 * @param input 本次输入内容
* @param vectorClient 向量客户端 * @param vectorClient 向量客户端
*/ */
private void adjustNotMatchPassed(List<CacheAdjustMetaData> notMatchPassed, float[] inputVector, String private void adjustNotMatchPassed(List<CacheAdjustMetaData> notMatchPassed, float[] inputVector, String input,
input, VectorClient vectorClient) { VectorClient vectorClient) {
notMatchPassed.forEach(adjustData -> { notMatchPassed.forEach(adjustData -> {
//获取原始缓存条目 // 获取原始缓存条目
String tendency = adjustData.getTendency(); String tendency = adjustData.getTendency();
ActionCacheData primaryCacheData = selectCacheData(tendency); ActionCacheData primaryCacheData = selectCacheData(tendency);
float[] tendencyVector = vectorClient.compute(tendency); float[] tendencyVector = vectorClient.compute(tendency);

View File

@@ -7,15 +7,15 @@ import java.nio.file.Path;
/** /**
* 基于 Http 与 WebSocket 的沙盒执行器客户端,负责: * 基于 Http 与 WebSocket 的沙盒执行器客户端,负责:
* <ul> * <ul>
* <li> * <li>
* 发送行动单元数据 * 发送行动单元数据
* </li> * </li>
* <li> * <li>
* 实时更新获取已存在行动列表 * 实时更新获取已存在行动列表
* </li> * </li>
* <li> * <li>
* 向传入的 MetaAction 回写执行结果 * 向传入的 MetaAction 回写执行结果
* </li> * </li>
* </ul> * </ul>
*/ */
class SandboxRunnerClient { class SandboxRunnerClient {

View File

@@ -38,7 +38,10 @@ public abstract class ActionData {
*/ */
protected String result; protected String result;
protected List<JSONObject> history = new ArrayList<>(); protected List<JSONObject> history = new ArrayList<>();
/**
* 修复上下文
*/
protected Map<Integer, List<String>> additionalContext;
/** /**
* 行动原因 * 行动原因
*/ */

View File

@@ -1,15 +1,15 @@
package work.slhaf.partner.core.action.entity; package work.slhaf.partner.core.action.entity;
import work.slhaf.partner.core.action.entity.ActionData.ActionStatus;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
public record PhaserRecord(Phaser phaser, ActionData actionData) { public record PhaserRecord(Phaser phaser, ActionData actionData) {
public void fail() { public void fail() {
// TODO Auto-generated method stub actionData.setStatus(ActionStatus.FAILED);
throw new UnsupportedOperationException("Unimplemented method 'fail'");
} }
public void interrupt() { public void interrupt() {
} }

View File

@@ -11,6 +11,7 @@ import work.slhaf.partner.core.action.entity.ImmediateActionData;
import work.slhaf.partner.core.action.entity.ScheduledActionData; import work.slhaf.partner.core.action.entity.ScheduledActionData;
import work.slhaf.partner.module.common.module.PostRunningModule; import work.slhaf.partner.module.common.module.PostRunningModule;
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor; import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput;
import work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler; import work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
@@ -30,6 +31,7 @@ public class ActionDispatcher extends PostRunningModule {
private ActionScheduler actionScheduler; private ActionScheduler actionScheduler;
private ExecutorService executor; private ExecutorService executor;
private final AssemblyHelper assemblyHelper = new AssemblyHelper();
@Init @Init
public void init() { public void init() {
@@ -38,13 +40,14 @@ public class ActionDispatcher extends PostRunningModule {
@Override @Override
public void doExecute(PartnerRunningFlowContext context) { public void doExecute(PartnerRunningFlowContext context) {
//只需要处理prepared action因为pending action在用户确认后也将变为prepared action // 只需要处理prepared action因为pending action在用户确认后也将变为prepared action
//将PLANNING action放入时间轮中IMMEDIATE action直接进入并发执行流 // 将PLANNING action放入时间轮中IMMEDIATE action直接进入并发执行流
//对于将触发的PLANNING action理想做法是将执行工具做成执行链的形式模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力避免绑定固定流程 // 对于将触发的PLANNING
// action理想做法是将执行工具做成执行链的形式模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力避免绑定固定流程
executor.execute(() -> { executor.execute(() -> {
String userId = context.getUserId(); String userId = context.getUserId();
List<ActionData> preparedActions = actionCapability.listPreparedAction(userId); List<ActionData> preparedActions = actionCapability.listPreparedAction(userId);
//分类成PLANNING和IMMEDIATE两类 // 分类成PLANNING和IMMEDIATE两类
List<ScheduledActionData> scheduledActions = new ArrayList<>(); List<ScheduledActionData> scheduledActions = new ArrayList<>();
List<ImmediateActionData> immediateActions = new ArrayList<>(); List<ImmediateActionData> immediateActions = new ArrayList<>();
for (ActionData preparedAction : preparedActions) { for (ActionData preparedAction : preparedActions) {
@@ -54,7 +57,7 @@ public class ActionDispatcher extends PostRunningModule {
immediateActions.add(actionInfo); immediateActions.add(actionInfo);
} }
} }
actionExecutor.execute(immediateActions); actionExecutor.execute(assemblyHelper.buildExecutorInput(immediateActions, userId));
actionScheduler.execute(scheduledActions); actionScheduler.execute(scheduledActions);
}); });
} }
@@ -64,4 +67,15 @@ public class ActionDispatcher extends PostRunningModule {
return false; return false;
} }
@SuppressWarnings("InnerClassMayBeStatic")
private class AssemblyHelper {
public ActionExecutorInput buildExecutorInput(List<ImmediateActionData> immediateActions, String userId) {
ActionExecutorInput input = new ActionExecutorInput();
input.setImmediateActions(immediateActions);
input.setUserId(userId);
return input;
}
}
} }

View File

@@ -8,12 +8,13 @@ import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore; import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.action.entity.*;
import work.slhaf.partner.core.action.entity.ImmediateActionData; import work.slhaf.partner.core.action.entity.ActionData.ActionStatus;
import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.action.entity.PhaserRecord; import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.*; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.*;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult.RepairerStatus; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult.RepairerStatus;
import work.slhaf.partner.module.modules.action.dispatcher.executor.exception.ActionExecutingFailedException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -23,10 +24,14 @@ import java.util.concurrent.Phaser;
@Slf4j @Slf4j
@AgentSubModule @AgentSubModule
public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionData>, Void> { public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, Void> {
@InjectCapability @InjectCapability
private ActionCapability actionCapability; private ActionCapability actionCapability;
@InjectCapability
private MemoryCapability memoryCapability;
@InjectCapability
private CognationCapability cognationCapability;
@InjectModule @InjectModule
private ParamsExtractor paramsExtractor; private ParamsExtractor paramsExtractor;
@@ -47,9 +52,14 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
} }
@Override @Override
public Void execute(List<ImmediateActionData> immediateActions) { public Void execute(ActionExecutorInput input) {
List<ImmediateActionData> immediateActions = input.getImmediateActions();
String userId = input.getUserId();
for (ImmediateActionData actionData : immediateActions) { for (ImmediateActionData actionData : immediateActions) {
virtualExecutor.execute(() -> { virtualExecutor.execute(() -> {
if (actionData.getStatus() != ActionData.ActionStatus.PREPARE) {
return;
}
actionData.setStatus(ActionData.ActionStatus.EXECUTING); actionData.setStatus(ActionData.ActionStatus.EXECUTING);
Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain(); Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
List<MetaAction> virtual = new ArrayList<>(); List<MetaAction> virtual = new ArrayList<>();
@@ -61,6 +71,11 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
orderList.sort(Integer::compareTo); orderList.sort(Integer::compareTo);
try { try {
for (Integer order : orderList) { for (Integer order : orderList) {
if (actionData.getStatus().equals(ActionStatus.FAILED)) {
// 此时已经在 PhaserRecord 调用的 fail 方法中调整了 ActionData 的状态标记
// 跳出循环后仍将正常执行 phaserRecord 的移除操作
break;
}
List<MetaAction> metaActions = actionChain.get(order); List<MetaAction> metaActions = actionChain.get(order);
for (MetaAction metaAction : metaActions) { for (MetaAction metaAction : metaActions) {
// 根据io类型放入合适的列表 // 根据io类型放入合适的列表
@@ -71,42 +86,46 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
} }
} }
// 使用phaser来承担同组的动态任务新增 // 使用phaser来承担同组的动态任务新增
runGroupAction(virtual, virtualExecutor, phaserRecord); runGroupAction(virtual, userId, actionData, virtualExecutor, phaserRecord);
runGroupAction(platform, platformExecutor, phaserRecord); runGroupAction(platform, userId, actionData, platformExecutor, phaserRecord);
phaser.arriveAndAwaitAdvance(); phaser.arriveAndAwaitAdvance();
virtual.clear(); virtual.clear();
platform.clear(); platform.clear();
// 进行行动链修正
CorrectorInput correctorInput = assemblyHelper.buildCorrectorInput();
actionCorrector.execute(correctorInput);
} }
} finally { } finally {
CorrectorInput correctorInput = assemblyHelper.buildCorrectorInput();
actionCorrector.execute(correctorInput);
phaser.arriveAndDeregister(); phaser.arriveAndDeregister();
actionCapability.removePhaserRecord(phaser); actionCapability.removePhaserRecord(phaser);
if (actionData.getStatus() != ActionData.ActionStatus.FAILED) {
actionData.setStatus(ActionStatus.SUCCESS);
}
} }
}); });
} }
return null; return null;
} }
private void runGroupAction(List<MetaAction> actions, ExecutorService executor, PhaserRecord phaserRecord) { private void runGroupAction(List<MetaAction> actions, String userId, ActionData actionData,
ExecutorService executor,
PhaserRecord phaserRecord) {
Phaser phaser = phaserRecord.phaser(); Phaser phaser = phaserRecord.phaser();
phaser.bulkRegister(actions.size()); phaser.bulkRegister(actions.size());
//不可替换为增强for因为单组的行动单元集合数量是可以被外部干预的 // 不可替换为增强for因为单组的行动单元集合数量是可以被外部干预的
//noinspection ForLoopReplaceableByForEach // noinspection ForLoopReplaceableByForEach
for (int i = 0; i < actions.size(); i++) { for (int i = 0; i < actions.size(); i++) {
MetaAction action = actions.get(i); MetaAction action = actions.get(i);
executor.execute(() -> { executor.execute(() -> {
try { try {
ExtractorInput extractorInput = assemblyHelper.buildExtractorInput();
ExtractorResult extractorResult = paramsExtractor.execute(extractorInput);
// 两个循环需考虑最大次数,但为了达到最好融合,次数累计作用于 ActionRepairer 的修复策略选择上更合适 // 两个循环需考虑最大次数,但为了达到最好融合,次数累计作用于 ActionRepairer 的修复策略选择上更合适
if (!extractorResult.isOk()) { // 修复的最终结果是 action 的参数补充完整,然后能够继续行动链
// 修复的最终结果是 action 的参数补充完整,然后能够继续行动链 // 如果无法补充,则该行动行动阶段可能确实有误,实际上应当在 actionRepairer 内部进行处理(行动链调整、自对话或请求用户进行干预)
// 如果无法补充,则该行动行动阶段可能确实有误,实际上应当在 actionRepairer 内部进行处理(行动链调整、自对话或请求用户进行干预) // 所以无法补充时,行动链所属行动数据的状态需要置为 Interrupted ,等待状态变更,同时使用 Phaser 暂停(阻塞)当前行动链执行过程
// 所以无法补充时,行动链所属行动数据的状态需要置为 Interrupted ,等待状态变更,同时使用 Phaser 暂停(阻塞)当前行动链执行过程 // 这个功能应该交给 PhaserRecord 实现,尽量确保功能一致性
// 这个功能应该交给 PhaserRecord 实现,尽量确保功能一致性 setActionParams(action, phaserRecord, userId);
repairActionParams(action, phaserRecord);
}
do { do {
actionCapability.execute(action); actionCapability.execute(action);
MetaAction.Result result = action.getResult(); MetaAction.Result result = action.getResult();
@@ -116,36 +135,66 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
// 若使用Phaser作为执行线程与反思、求助等调用流程的同步协调应当需要额外维护Phaser全局字段获取到反思结果或者用户反馈后 // 若使用Phaser作为执行线程与反思、求助等调用流程的同步协调应当需要额外维护Phaser全局字段获取到反思结果或者用户反馈后
// 调用对应的phaser注册任务在ActionExecutor中动态添加任务至actionChain,同时启动异步执行 // 调用对应的phaser注册任务在ActionExecutor中动态添加任务至actionChain,同时启动异步执行
// 而且由于执行与放入的为同一个MetaAction对象所以执行结果可被当前行动链获取但virtual、executor两个列表似乎不行需要重构执行模式建议将行动链直接重构为LinkedHashMaporder为键 // 而且由于执行与放入的为同一个MetaAction对象所以执行结果可被当前行动链获取但virtual、executor两个列表似乎不行需要重构执行模式建议将行动链直接重构为LinkedHashMaporder为键
repairActionParams(action, phaserRecord); setActionParams(action, phaserRecord, userId);
} else { } else {
break; break;
} }
actionCapability.execute(action); actionCapability.execute(action);
} while (true); } while (true);
//TODO 执行结果不再需要写入特定位置,当前的 ActionCapability 内部的行动池已经足以承担这个功能,但这也就意味着行动池或许需要考虑特殊的序列化形式避免内存占用过高,同时也需要在某些模块执行时加上行动结果的挑取作为输入内容 // TODO 执行结果不再需要写入特定位置,当前的 ActionCapability
// 内部的行动池已经足以承担这个功能,但这也就意味着行动池或许需要考虑特殊的序列化形式避免内存占用过高,
// 同时也需要在某些模块执行时加上行动结果的挑取作为输入内容
} catch (ActionExecutingFailedException e) {
log.error("Action executing failed: {}", action.getKey(), e);
} finally { } finally {
phaser.arriveAndDeregister(); phaser.arriveAndDeregister();
} }
}); });
} }
} }
private void repairActionParams(MetaAction action, PhaserRecord phaserRecord) { private void setActionParams(MetaAction action, PhaserRecord phaserRecord, String userId) {
ActionData actionData = phaserRecord.actionData();
List<String> additionalContext = actionData.getAdditionalContext().get(actionData.getExecutingStage());
do { do {
RepairerInput repairerInput = assemblyHelper.buildRepairerInput(); ExtractorInput extractorInput = assemblyHelper.buildExtractorInput(action, userId, actionData,
additionalContext);
ExtractorResult extractorResult = paramsExtractor.execute(extractorInput);
if (extractorResult.isOk()) {
action.setParams(extractorResult.getParams());
break;
}
RepairerInput repairerInput = assemblyHelper.buildRepairerInput(phaserRecord, action, userId);
RepairerResult repairerResult = actionRepairer.execute(repairerInput); RepairerResult repairerResult = actionRepairer.execute(repairerInput);
switch (repairerResult.getStatus()) { switch (repairerResult.getStatus()) {
// 修复成功则直接设置参数 // 修复成功则直接设置参数
case RepairerStatus.OK -> action.setParams(repairerResult.getParams()); case RepairerStatus.OK -> additionalContext.addAll(repairerResult.getFixedData());
// 修复失败则证明行动链不可行(外部因素,如果本身即不存在满足可能,则应当通过 ADJUST 或者 ACQUIRE 方式选择取消) // 修复失败则证明行动链不可行(外部因素,如果本身即不存在满足可能,则应当通过 ADJUST 或者 ACQUIRE 方式选择取消)
case RepairerStatus.FAILED -> phaserRecord.fail(); case RepairerStatus.FAILED -> {
// 按照逻辑设定,这里将不可能步入这个分支,除非 ActionRepairer 逻辑有误 // 此处抛出执行异常runGroupAction 为并发执行同组动作,此时只是中断了一个行动单元的执行
// 那么对于其他的行动单元,也需要进行中断处理,仅靠 PhaserRecord 无法完成
// 或许需要再增加一个集合,用于记录开启的执行线程,然后统一停止
// 由于行动链的并发特性,所以只需要记录单组行动单元的执行线程,但是如果此时其他的行动单元也触发了额外的线程操作
// (例如自对话,但此时这些触发自对话的线程本身是正常状态,会被正常中断)
// 也需要避免这些内容出现异常(主要是前置行动模块处针对 ActionData 的操作),应该只需要依据 FAILED 状态阻止操作即可
// 对于修复和动态生成的行动单元执行,都是同步操作,不再需要额外处理
// 但考虑到同组行动单元的执行过程,也的确用不到那么多线程中断操作,所以只要收到干预时做好拒绝策略即可
// 此处的话,由于主要依赖 ActionData 持有的状态防止失败行动数据继续执行,所以不再需要 phaserRecord 进行额外处理
// 只需要重设 ActionData 状态即可
actionData.setStatus(ActionData.ActionStatus.FAILED);
throw new ActionExecutingFailedException("行动执行失败");
}
// 通过自对话通道发起了干预,这里需要调用 phaserRecord 进行一次阻塞
// 如果通过 phaserRecord 进行阻塞,那么在前置模块的 InterventionHandler 需要额外得知当前 ActionData
// 的内容,这点是可以做到的
// 如果在 ActionRepairer 内部调用阻塞,还是无法免除同样的逻辑,即 RepairerResult 内容需要携带干预信息,但这些内容最终是在
// ActionData 中放置的,相当于绕了一层,不太合适
case RepairerStatus.ACQUIRE -> { case RepairerStatus.ACQUIRE -> {
phaserRecord.interrupt(); phaserRecord.interrupt();
continue;
} }
} }
break;
} while (true); } while (true);
} }
@@ -155,15 +204,50 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
private AssemblyHelper() { private AssemblyHelper() {
} }
public RepairerInput buildRepairerInput() { private RepairerInput buildRepairerInput(PhaserRecord phaserRecord, MetaAction action, String userId) {
return null; RepairerInput input = new RepairerInput();
MetaActionInfo metaActionInfo = actionCapability.loadMetaActionInfo(action.getKey());
ActionData actionData = phaserRecord.actionData();
input.setHistoryActionResults(getHistoryActionResults(actionData));
input.setParams(metaActionInfo.getParams());
input.setRecentMessages(cognationCapability.getChatMessages());
input.setActionDescription(metaActionInfo.getDescription());
input.setUserId(userId);
input.setPhaserRecord(phaserRecord);
return input;
} }
public ExtractorInput buildExtractorInput() { private ExtractorInput buildExtractorInput(MetaAction action, String userId, ActionData actionData,
return null; List<String> additionalContext) {
ExtractorInput input = new ExtractorInput();
input.setEvaluatedSlices(memoryCapability.getActivatedSlices(userId));
input.setRecentMessages(cognationCapability.getChatMessages());
input.setMetaActionInfo(actionCapability.loadMetaActionInfo(action.getKey()));
input.setHistoryActionResults(getHistoryActionResults(actionData));
input.setAdditionalContext(additionalContext);
return input;
} }
public CorrectorInput buildCorrectorInput() { private List<HistoryAction> getHistoryActionResults(ActionData actionData) {
int executingStage = actionData.getExecutingStage();
if (executingStage <= 0) {
return new ArrayList<>();
}
Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
// executingStage 是当前正在执行的阶段,所以只需要获取到前一阶段的结果
return actionChain.get(executingStage - 1).stream()
.map(metaAction -> {
HistoryAction historyAction = new HistoryAction();
historyAction.setActionKey(metaAction.getKey());
historyAction
.setDescription(
actionCapability.loadMetaActionInfo(metaAction.getKey()).getDescription());
historyAction.setResult(metaAction.getResult().getData());
return historyAction;
}).toList();
}
private CorrectorInput buildCorrectorInput() {
return null; return null;
} }
} }

View File

@@ -1,31 +1,145 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor; package work.slhaf.partner.module.modules.action.dispatcher.executor;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore.ExecutorType;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerInput; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult.RepairerStatus;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* 负责识别行动链的修复 * 负责识别行动链的修复
* <ol> * <ol>
* <li> * <li>
* 可通过协调 {@link DynamicActionGenerator} 生成新的行动单元并调用,获取所需的参数信息(必要时持久化); * 可通过协调 {@link DynamicActionGenerator} 生成新的行动单元并调用,获取所需的参数信息(必要时持久化);
* </li> * </li>
* <li> * <li>
* 也可以直接调用已存在的行动程序获取信息; * 也可以直接调用已存在的行动程序获取信息;
* </li> * </li>
* <li> * <li>
* 如果上述都无法满足,将发起自对话借助干预模块进行操作或者借助自对话通道向用户发起沟通请求,该请求的目的一般为行动程序生成/调用指导或者用户侧的信息补充,后续还需要再走一遍参数修复流程 * 如果上述都无法满足,将发起自对话借助干预模块进行操作或者借助自对话通道向用户发起沟通请求,该请求的目的一般为行动程序生成/调用指导或者用户侧的信息补充,后续还需要再走一遍参数修复流程
* </li> * </li>
* </ol> * </ol>
*/ */
@Slf4j
@AgentSubModule @AgentSubModule
public class ActionRepairer extends AgentRunningSubModule<RepairerInput, RepairerResult> implements ActivateModel { public class ActionRepairer extends AgentRunningSubModule<RepairerInput, RepairerResult> implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
@InjectCapability
private CognationCapability cognationCapability;
@InjectModule
private DynamicActionGenerator dynamicActionGenerator;
private AssembleHelper assembleHelper = new AssembleHelper();
@Override @Override
public RepairerResult execute(RepairerInput data) { public RepairerResult execute(RepairerInput data) {
return null; RepairerResult result;
try {
String prompt = assembleHelper.buildPrompt(data, null);
ChatResponse response = this.singleChat(prompt);
RepairerData repairerData = JSONObject.parseObject(response.getMessage(), RepairerData.class);
result = switch (repairerData.getRepairerType()) {
case ACTION_GENERATION ->
handleActionGeneration(JSONObject.parseObject(repairerData.getData(), GeneratorInput.class));
case ACTION_INVOCATION -> handleActionInvocation(
JSONObject.parseObject(repairerData.getData(), new TypeReference<List<String>>() {
}));
case USER_INTERACTION -> handleUserInteraction(repairerData.getData());
};
if (!repairerData.getRepairerType().equals(RepairerType.USER_INTERACTION)
&& result.getStatus().equals(RepairerResult.RepairerStatus.FAILED)) {
log.warn("常规行动修复失败,将尝试自对话通道");
prompt = assembleHelper.buildPrompt(data, "常规行动修复失败,请尝试通过自对话通道获取必要的信息以完成行动参数的修复");
response = this.singleChat(prompt);
repairerData = JSONObject.parseObject(response.getMessage(), RepairerData.class);
handleUserInteraction(repairerData.getData());
}
} catch (Exception e) {
result = new RepairerResult();
result.setStatus(RepairerStatus.FAILED);
}
return result;
}
/**
* 负责根据输入内容进行行动单元的参数信息修复
*
* @param generatorInput 生成的行动单元参考内容,最好包含行动单元的执行逻辑
* @return 修复后的行动单元结果
*/
private RepairerResult handleActionGeneration(GeneratorInput generatorInput) {
RepairerResult result = new RepairerResult();
GeneratorResult generatorResult = dynamicActionGenerator.execute(generatorInput);
MetaAction tempAction = generatorResult.getTempAction();
actionCapability.execute(tempAction);
result.getFixedData().add(tempAction.getResult().getData());
return result;
}
/**
* 负责根据输入内容进行行动单元的参数信息修复
*
* @param actionKeys 需要调用的行动单元Key列表
* @return 修复后的行动单元结果
*/
private RepairerResult handleActionInvocation(List<String> actionKeys) {
RepairerResult result = new RepairerResult();
CountDownLatch latch = new CountDownLatch(actionKeys.size());
ExecutorService virtual = actionCapability.getExecutor(ExecutorType.VIRTUAL);
ExecutorService platform = actionCapability.getExecutor(ExecutorType.PLATFORM);
ExecutorService executor;
AtomicInteger failedCount = new AtomicInteger(0);
for (String key : actionKeys) {
MetaAction action = actionCapability.loadMetaAction(key);
executor = action.isIo() ? virtual : platform;
executor.execute(() -> {
try {
actionCapability.execute(action);
result.getFixedData().add(action.getResult().getData());
} catch (Exception e) {
log.error("行动单元执行失败: {}", key, e);
failedCount.incrementAndGet();
} finally {
latch.countDown();
}
});
}
if (actionKeys.size() - failedCount.get() > 0) {
result.setStatus(RepairerStatus.OK);
} else {
result.setStatus(RepairerStatus.FAILED);
}
return result;
}
private RepairerResult handleUserInteraction(String acquireContent) {
RepairerResult result = new RepairerResult();
result.setStatus(RepairerStatus.ACQUIRE);
// 发送自对话请求
return result;
} }
@Override @Override
@@ -37,4 +151,51 @@ public class ActionRepairer extends AgentRunningSubModule<RepairerInput, Repaire
public boolean withBasicPrompt() { public boolean withBasicPrompt() {
return false; return false;
} }
@SuppressWarnings("InnerClassMayBeStatic")
@Data
private class RepairerData {
private RepairerType repairerType;
private String data;
}
private enum RepairerType {
ACTION_GENERATION,
ACTION_INVOCATION,
USER_INTERACTION
}
@SuppressWarnings("InnerClassMayBeStatic")
private class AssembleHelper {
private AssembleHelper() {
}
private String buildPrompt(RepairerInput data, String specialInstruction) {
JSONObject prompt = new JSONObject();
JSONObject actionData = prompt.putObject("[本次行动信息]");
actionData.put("[行动描述]", data.getActionDescription());
JSONObject actionParamsData = actionData.putObject("[行动参数说明]");
actionParamsData.putAll(data.getParams());
JSONArray historyData = prompt.putArray("[历史行动执行结果]");
data.getHistoryActionResults().forEach(historyAction -> {
JSONObject historyItem = new JSONObject();
historyItem.put("[行动Key]", historyAction.getActionKey());
historyItem.put("[行动描述]", historyAction.getDescription());
historyItem.put("[行动结果]", historyAction.getResult());
historyData.add(historyItem);
});
JSONArray messageData = prompt.putArray("[最近消息列表]");
messageData.addAll(data.getRecentMessages());
if (specialInstruction != null) {
prompt.put("[特殊指令]", specialInstruction);
}
return prompt.toString();
}
}
} }

View File

@@ -1,20 +1,64 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor; package work.slhaf.partner.module.modules.action.dispatcher.executor;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorInput; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorResult; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.HistoryAction;
import java.util.List;
/** /**
* 负责依据输入内容进行行动单元的参数信息提取 * 负责依据输入内容进行行动单元的参数信息提取
*/ */
@Slf4j
@AgentSubModule @AgentSubModule
public class ParamsExtractor extends AgentRunningSubModule<ExtractorInput, ExtractorResult> implements ActivateModel { public class ParamsExtractor extends AgentRunningSubModule<ExtractorInput, ExtractorResult> implements ActivateModel {
@Override @Override
public ExtractorResult execute(ExtractorInput data) { public ExtractorResult execute(ExtractorInput input) {
return null; String prompt = buildPrompt(input);
ChatResponse response = this.singleChat(prompt);
ExtractorResult result;
try {
result = JSONObject.parseObject(response.getMessage(), ExtractorResult.class);
} catch (Exception e) {
log.error("ParamsExtractor解析结果失败返回内容{}", response.getMessage(), e);
result = new ExtractorResult();
result.setOk(false);
result.setParams(new String[0]);
}
return result;
}
private String buildPrompt(ExtractorInput input) {
JSONObject prompt = new JSONObject();
JSONObject actionData = prompt.putObject("[本次行动信息]");
MetaActionInfo actionInfo = input.getMetaActionInfo();
actionData.put("[行动描述]", actionInfo.getDescription());
actionData.put("[行动参数说明]", actionInfo.getParams());
JSONArray historyData = prompt.putArray("[历史行动执行结果]");
List<HistoryAction> historyActions = input.getHistoryActionResults();
for (HistoryAction historyAction : historyActions) {
JSONObject historyItem = new JSONObject();
historyItem.put("[行动Key]", historyAction.getActionKey());
historyItem.put("[行动描述]", historyAction.getDescription());
historyItem.put("[行动结果]", historyAction.getResult());
historyData.add(historyItem);
}
JSONArray messageData = prompt.putArray("[最近消息列表]");
messageData.addAll(input.getRecentMessages());
return prompt.toString();
} }
@Override @Override

View File

@@ -0,0 +1,18 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
import work.slhaf.partner.core.action.entity.ImmediateActionData;
import java.util.List;
@Data
public class ActionExecutorInput {
/**
* 用户ID
*/
private String userId;
/**
* 即时行动数据列表
*/
private List<ImmediateActionData> immediateActions;
}

View File

@@ -1,8 +1,32 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data; import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import java.util.List;
@Data @Data
public class ExtractorInput { public class ExtractorInput {
/**
* 目标 MetaActionInfo
*/
private MetaActionInfo metaActionInfo;
/**
* 可参考的记忆切片
*/
private List<EvaluatedSlice> evaluatedSlices;
/**
* 历史行动执行结果
*/
private List<HistoryAction> historyActionResults;
/**
* 最近的消息列表
*/
private List<Message> recentMessages;
/**
* 额外的上下文信息(可来自修复器等)
*/
private List<String> additionalContext;
} }

View File

@@ -1,7 +1,9 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data; import lombok.Data;
import work.slhaf.partner.core.action.entity.MetaAction;
@Data @Data
public class GeneratorResult { public class GeneratorResult {
private MetaAction tempAction;
} }

View File

@@ -0,0 +1,10 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
@Data
public class HistoryAction {
private String actionKey;
private String description;
private String result;
}

View File

@@ -1,7 +1,19 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data; import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.PhaserRecord;
import java.util.List;
import java.util.Map;
@Data @Data
public class RepairerInput { public class RepairerInput {
private String userId;
private List<Message> recentMessages;
private Map<String, String> params;
private String actionDescription;
private List<HistoryAction> historyActionResults;
private PhaserRecord phaserRecord;
} }

View File

@@ -2,6 +2,8 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data; import lombok.Data;
import java.util.List;
/** /**
* 行动修复结果,包含行动状态和修复后的参数 * 行动修复结果,包含行动状态和修复后的参数
*/ */
@@ -9,7 +11,7 @@ import lombok.Data;
public class RepairerResult { public class RepairerResult {
private RepairerStatus status; private RepairerStatus status;
private String[] params; private List<String> fixedData;
public enum RepairerStatus { public enum RepairerStatus {
/** /**

View File

@@ -0,0 +1,14 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class ActionExecutingFailedException extends AgentRuntimeException {
public ActionExecutingFailedException(String message) {
super(message);
}
public ActionExecutingFailedException(String message, Throwable cause) {
super(message, cause);
}
}