From 6a351413a152fe13bd1d9617d04bf985c39d5cda Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Fri, 5 Dec 2025 21:58:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E8=BF=9B=E8=A1=8C=E5=8A=A8=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E6=A8=A1=E5=9D=97:=20=E8=B0=83=E6=95=B4=E4=BA=86=20Ac?= =?UTF-8?q?tionExecutor=20=E4=BB=A5=E6=94=AF=E6=8C=81=E8=A1=8C=E5=8A=A8?= =?UTF-8?q?=E9=93=BE=E5=8A=A8=E6=80=81=E4=BF=AE=E5=A4=8D=E5=92=8C=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E6=8F=90=E5=8F=96;=20=E5=AE=8C=E5=96=84=E4=BA=86=20Ac?= =?UTF-8?q?tionRepairer=E3=80=81ParamsExtractor=20=E7=9A=84=E4=B8=BB?= =?UTF-8?q?=E8=A6=81=E9=80=BB=E8=BE=91;=20=E5=AE=8C=E5=96=84=E4=BA=86?= =?UTF-8?q?=E9=83=A8=E5=88=86=E6=95=B0=E6=8D=AE=E7=B1=BB=E7=9A=84=E5=86=85?= =?UTF-8?q?=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 ActionData 中新增 additionalContext 用于存储各个执行阶段临时修复生成的上下文,同样以执行阶段为键 - 调整 ActionExecutor 的输入参数,可传入用户标识,用于执行器调用 ActionRepairer 的修复过程 - 完善了 ActionExecutor 中行动单元的执行与修复逻辑,将支持正常状态推进执行、触发自对话时阻塞当前行动单元、所有修复方式失败时将整个行动数据标为 FAILED - 完善了 ActionExecutor 中各个DTO的构建方法 - 完善了 ParamsExtractor 中的参数提取逻辑 - 在 PhaserRecord 中新增 interrupt 和 complete 方法,将用于后续行动单元的阻塞(ActionExecutor中)与恢复(InterventionHandler中) - 完善了 ActionRepairer 中的修复逻辑,但自对话通道的暴露方式、DynamicActionGenerator 的具体逻辑待完善 --- .../slhaf/partner/core/action/ActionCore.java | 40 ++-- .../core/action/SandboxRunnerClient.java | 18 +- .../core/action/entity/ActionData.java | 5 +- .../core/action/entity/PhaserRecord.java | 6 +- .../action/dispatcher/ActionDispatcher.java | 24 ++- .../dispatcher/executor/ActionExecutor.java | 156 +++++++++++---- .../dispatcher/executor/ActionRepairer.java | 181 +++++++++++++++++- .../dispatcher/executor/ParamsExtractor.java | 48 ++++- .../executor/entity/ActionExecutorInput.java | 18 ++ .../executor/entity/ExtractorInput.java | 26 ++- .../executor/entity/GeneratorResult.java | 2 + .../executor/entity/HistoryAction.java | 10 + .../executor/entity/RepairerInput.java | 12 ++ .../executor/entity/RepairerResult.java | 4 +- .../ActionExecutingFailedException.java | 14 ++ 15 files changed, 477 insertions(+), 87 deletions(-) create mode 100644 Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ActionExecutorInput.java create mode 100644 Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/HistoryAction.java create mode 100644 Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/exception/ActionExecutingFailedException.java diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java index cd519cf3..f1a34332 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -34,7 +34,7 @@ public class ActionCore extends PartnerCore { /** * 持久行动池,以用户id为键存储所有状态的任务 */ - private HashMap> actionPool = new HashMap<>();//TODO 考虑是否取消用户分池 + private HashMap> actionPool = new HashMap<>();// TODO 考虑是否取消用户分池 /** * 待确认任务,以userId区分不同用户,因为需要跨请求确认 @@ -48,7 +48,8 @@ public class ActionCore extends PartnerCore { private final Lock cacheLock = new ReentrantLock(); - private final ExecutorService platformExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + private final ExecutorService platformExecutor = Executors + .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); /** @@ -129,10 +130,11 @@ public class ActionCore extends PartnerCore { return null; } VectorClient vectorClient = VectorClient.INSTANCE; - //计算本次输入的向量 + // 计算本次输入的向量 float[] vector = vectorClient.compute(input); - if (vector == null) return null; - //与现有缓存比对,将匹配到的收集并返回 + if (vector == null) + return null; + // 与现有缓存比对,将匹配到的收集并返回 return actionCache.parallelStream() .filter(ActionCacheData::isActivated) .filter(data -> { @@ -257,10 +259,10 @@ public class ActionCore extends PartnerCore { * @param inputVector 本次输入内容的语义向量 * @param vectorClient 向量客户端 */ - private void adjustMatchAndPassed(List matchAndPassed, float[] inputVector, String - input, VectorClient vectorClient) { + private void adjustMatchAndPassed(List matchAndPassed, float[] inputVector, String input, + VectorClient vectorClient) { matchAndPassed.forEach(adjustData -> { - //获取原始缓存条目 + // 获取原始缓存条目 String tendency = adjustData.getTendency(); ActionCacheData primaryCacheData = selectCacheData(tendency); if (primaryCacheData == null) { @@ -279,7 +281,7 @@ public class ActionCore extends PartnerCore { private void adjustMatchNotPassed(List matchNotPassed, VectorClient vectorClient) { List toRemove = new ArrayList<>(); matchNotPassed.forEach(adjustData -> { - //获取原始缓存条目 + // 获取原始缓存条目 String tendency = adjustData.getTendency(); ActionCacheData primaryCacheData = selectCacheData(tendency); if (primaryCacheData == null) { @@ -299,13 +301,13 @@ public class ActionCore extends PartnerCore { /** * 针对未命中但评估通过的缓存做出调整: *
    - *

    如果存在缓存条目

    - *
  1. - * 若已生效,但此时未匹配到则说明尚未生效或者阈值、向量{@link ActionCacheData#getInputVector()}存在问题,调低阈值,同时带权移动平均 - *
  2. - *
  3. - * 若未生效,则只增加计数并带权移动平均 - *
  4. + *

    如果存在缓存条目

    + *
  5. + * 若已生效,但此时未匹配到则说明尚未生效或者阈值、向量{@link ActionCacheData#getInputVector()}存在问题,调低阈值,同时带权移动平均 + *
  6. + *
  7. + * 若未生效,则只增加计数并带权移动平均 + *
  8. *
* 如果不存在缓存条目,则新增并填充字段 * @@ -314,10 +316,10 @@ public class ActionCore extends PartnerCore { * @param input 本次输入内容 * @param vectorClient 向量客户端 */ - private void adjustNotMatchPassed(List notMatchPassed, float[] inputVector, String - input, VectorClient vectorClient) { + private void adjustNotMatchPassed(List notMatchPassed, float[] inputVector, String input, + VectorClient vectorClient) { notMatchPassed.forEach(adjustData -> { - //获取原始缓存条目 + // 获取原始缓存条目 String tendency = adjustData.getTendency(); ActionCacheData primaryCacheData = selectCacheData(tendency); float[] tendencyVector = vectorClient.compute(tendency); diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/SandboxRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/SandboxRunnerClient.java index bb2ff561..c13f7539 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/SandboxRunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/SandboxRunnerClient.java @@ -7,15 +7,15 @@ import java.nio.file.Path; /** * 基于 Http 与 WebSocket 的沙盒执行器客户端,负责: *
    - *
  • - * 发送行动单元数据 - *
  • - *
  • - * 实时更新获取已存在行动列表 - *
  • - *
  • - * 向传入的 MetaAction 回写执行结果 - *
  • + *
  • + * 发送行动单元数据 + *
  • + *
  • + * 实时更新获取已存在行动列表 + *
  • + *
  • + * 向传入的 MetaAction 回写执行结果 + *
  • *
*/ class SandboxRunnerClient { diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java index c56336b4..cecb17ff 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java @@ -38,7 +38,10 @@ public abstract class ActionData { */ protected String result; protected List history = new ArrayList<>(); - + /** + * 修复上下文 + */ + protected Map> additionalContext; /** * 行动原因 */ diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/PhaserRecord.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/PhaserRecord.java index 7b7b5c88..cc3d1805 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/PhaserRecord.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/PhaserRecord.java @@ -1,15 +1,15 @@ package work.slhaf.partner.core.action.entity; +import work.slhaf.partner.core.action.entity.ActionData.ActionStatus; + import java.util.concurrent.Phaser; public record PhaserRecord(Phaser phaser, ActionData actionData) { public void fail() { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'fail'"); + actionData.setStatus(ActionStatus.FAILED); } - public void interrupt() { } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/ActionDispatcher.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/ActionDispatcher.java index 4565b22b..7d611a59 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/ActionDispatcher.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/ActionDispatcher.java @@ -11,6 +11,7 @@ import work.slhaf.partner.core.action.entity.ImmediateActionData; import work.slhaf.partner.core.action.entity.ScheduledActionData; import work.slhaf.partner.module.common.module.PostRunningModule; import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor; +import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput; import work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; @@ -30,6 +31,7 @@ public class ActionDispatcher extends PostRunningModule { private ActionScheduler actionScheduler; private ExecutorService executor; + private final AssemblyHelper assemblyHelper = new AssemblyHelper(); @Init public void init() { @@ -38,13 +40,14 @@ public class ActionDispatcher extends PostRunningModule { @Override public void doExecute(PartnerRunningFlowContext context) { - //只需要处理prepared action,因为pending action在用户确认后也将变为prepared action - //将PLANNING action放入时间轮中,IMMEDIATE action直接进入并发执行流 - //对于将触发的PLANNING action,理想做法是将执行工具做成执行链的形式,模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力,避免绑定固定流程 + // 只需要处理prepared action,因为pending action在用户确认后也将变为prepared action + // 将PLANNING action放入时间轮中,IMMEDIATE action直接进入并发执行流 + // 对于将触发的PLANNING + // action,理想做法是将执行工具做成执行链的形式,模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力,避免绑定固定流程 executor.execute(() -> { String userId = context.getUserId(); List preparedActions = actionCapability.listPreparedAction(userId); - //分类成PLANNING和IMMEDIATE两类 + // 分类成PLANNING和IMMEDIATE两类 List scheduledActions = new ArrayList<>(); List immediateActions = new ArrayList<>(); for (ActionData preparedAction : preparedActions) { @@ -54,7 +57,7 @@ public class ActionDispatcher extends PostRunningModule { immediateActions.add(actionInfo); } } - actionExecutor.execute(immediateActions); + actionExecutor.execute(assemblyHelper.buildExecutorInput(immediateActions, userId)); actionScheduler.execute(scheduledActions); }); } @@ -64,4 +67,15 @@ public class ActionDispatcher extends PostRunningModule { return false; } + @SuppressWarnings("InnerClassMayBeStatic") + private class AssemblyHelper { + + public ActionExecutorInput buildExecutorInput(List immediateActions, String userId) { + ActionExecutorInput input = new ActionExecutorInput(); + input.setImmediateActions(immediateActions); + input.setUserId(userId); + return input; + } + + } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java index 0332a6f8..54de088a 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java @@ -8,12 +8,13 @@ import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCore; -import work.slhaf.partner.core.action.entity.ActionData; -import work.slhaf.partner.core.action.entity.ImmediateActionData; -import work.slhaf.partner.core.action.entity.MetaAction; -import work.slhaf.partner.core.action.entity.PhaserRecord; +import work.slhaf.partner.core.action.entity.*; +import work.slhaf.partner.core.action.entity.ActionData.ActionStatus; +import work.slhaf.partner.core.cognation.CognationCapability; +import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.*; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult.RepairerStatus; +import work.slhaf.partner.module.modules.action.dispatcher.executor.exception.ActionExecutingFailedException; import java.util.ArrayList; import java.util.List; @@ -23,10 +24,14 @@ import java.util.concurrent.Phaser; @Slf4j @AgentSubModule -public class ActionExecutor extends AgentRunningSubModule, Void> { +public class ActionExecutor extends AgentRunningSubModule { @InjectCapability private ActionCapability actionCapability; + @InjectCapability + private MemoryCapability memoryCapability; + @InjectCapability + private CognationCapability cognationCapability; @InjectModule private ParamsExtractor paramsExtractor; @@ -47,9 +52,14 @@ public class ActionExecutor extends AgentRunningSubModule immediateActions) { + public Void execute(ActionExecutorInput input) { + List immediateActions = input.getImmediateActions(); + String userId = input.getUserId(); for (ImmediateActionData actionData : immediateActions) { virtualExecutor.execute(() -> { + if (actionData.getStatus() != ActionData.ActionStatus.PREPARE) { + return; + } actionData.setStatus(ActionData.ActionStatus.EXECUTING); Map> actionChain = actionData.getActionChain(); List virtual = new ArrayList<>(); @@ -61,6 +71,11 @@ public class ActionExecutor extends AgentRunningSubModule metaActions = actionChain.get(order); for (MetaAction metaAction : metaActions) { // 根据io类型放入合适的列表 @@ -71,42 +86,46 @@ public class ActionExecutor extends AgentRunningSubModule actions, ExecutorService executor, PhaserRecord phaserRecord) { + private void runGroupAction(List actions, String userId, ActionData actionData, + ExecutorService executor, + PhaserRecord phaserRecord) { Phaser phaser = phaserRecord.phaser(); phaser.bulkRegister(actions.size()); - //不可替换为增强for,因为单组的行动单元集合数量是可以被外部干预的 - //noinspection ForLoopReplaceableByForEach + // 不可替换为增强for,因为单组的行动单元集合数量是可以被外部干预的 + // noinspection ForLoopReplaceableByForEach for (int i = 0; i < actions.size(); i++) { MetaAction action = actions.get(i); executor.execute(() -> { try { - ExtractorInput extractorInput = assemblyHelper.buildExtractorInput(); - ExtractorResult extractorResult = paramsExtractor.execute(extractorInput); // 两个循环需考虑最大次数,但为了达到最好融合,次数累计作用于 ActionRepairer 的修复策略选择上更合适 - if (!extractorResult.isOk()) { - // 修复的最终结果是 action 的参数补充完整,然后能够继续行动链 - // 如果无法补充,则该行动行动阶段可能确实有误,实际上应当在 actionRepairer 内部进行处理(行动链调整、自对话或请求用户进行干预) - // 所以无法补充时,行动链所属行动数据的状态需要置为 Interrupted ,等待状态变更,同时使用 Phaser 暂停(阻塞)当前行动链执行过程 - // 这个功能应该交给 PhaserRecord 实现,尽量确保功能一致性 - repairActionParams(action, phaserRecord); - } + // 修复的最终结果是 action 的参数补充完整,然后能够继续行动链 + // 如果无法补充,则该行动行动阶段可能确实有误,实际上应当在 actionRepairer 内部进行处理(行动链调整、自对话或请求用户进行干预) + // 所以无法补充时,行动链所属行动数据的状态需要置为 Interrupted ,等待状态变更,同时使用 Phaser 暂停(阻塞)当前行动链执行过程 + // 这个功能应该交给 PhaserRecord 实现,尽量确保功能一致性 + setActionParams(action, phaserRecord, userId); do { actionCapability.execute(action); MetaAction.Result result = action.getResult(); @@ -116,36 +135,66 @@ public class ActionExecutor extends AgentRunningSubModule additionalContext = actionData.getAdditionalContext().get(actionData.getExecutingStage()); do { - RepairerInput repairerInput = assemblyHelper.buildRepairerInput(); + ExtractorInput extractorInput = assemblyHelper.buildExtractorInput(action, userId, actionData, + additionalContext); + ExtractorResult extractorResult = paramsExtractor.execute(extractorInput); + if (extractorResult.isOk()) { + action.setParams(extractorResult.getParams()); + break; + } + RepairerInput repairerInput = assemblyHelper.buildRepairerInput(phaserRecord, action, userId); RepairerResult repairerResult = actionRepairer.execute(repairerInput); switch (repairerResult.getStatus()) { // 修复成功则直接设置参数 - case RepairerStatus.OK -> action.setParams(repairerResult.getParams()); + case RepairerStatus.OK -> additionalContext.addAll(repairerResult.getFixedData()); // 修复失败则证明行动链不可行(外部因素,如果本身即不存在满足可能,则应当通过 ADJUST 或者 ACQUIRE 方式选择取消) - case RepairerStatus.FAILED -> phaserRecord.fail(); - // 按照逻辑设定,这里将不可能步入这个分支,除非 ActionRepairer 逻辑有误 + case RepairerStatus.FAILED -> { + // 此处抛出执行异常,runGroupAction 为并发执行同组动作,此时只是中断了一个行动单元的执行 + // 那么对于其他的行动单元,也需要进行中断处理,仅靠 PhaserRecord 无法完成 + // 或许需要再增加一个集合,用于记录开启的执行线程,然后统一停止 + // 由于行动链的并发特性,所以只需要记录单组行动单元的执行线程,但是如果此时其他的行动单元也触发了额外的线程操作 + // (例如自对话,但此时这些触发自对话的线程本身是正常状态,会被正常中断) + // 也需要避免这些内容出现异常(主要是前置行动模块处针对 ActionData 的操作),应该只需要依据 FAILED 状态阻止操作即可 + // 对于修复和动态生成的行动单元执行,都是同步操作,不再需要额外处理 + // 但考虑到同组行动单元的执行过程,也的确用不到那么多线程中断操作,所以只要收到干预时做好拒绝策略即可 + // 此处的话,由于主要依赖 ActionData 持有的状态防止失败行动数据继续执行,所以不再需要 phaserRecord 进行额外处理 + // 只需要重设 ActionData 状态即可 + actionData.setStatus(ActionData.ActionStatus.FAILED); + throw new ActionExecutingFailedException("行动执行失败"); + } + // 通过自对话通道发起了干预,这里需要调用 phaserRecord 进行一次阻塞 + // 如果通过 phaserRecord 进行阻塞,那么在前置模块的 InterventionHandler 需要额外得知当前 ActionData + // 的内容,这点是可以做到的 + // 如果在 ActionRepairer 内部调用阻塞,还是无法免除同样的逻辑,即 RepairerResult 内容需要携带干预信息,但这些内容最终是在 + // ActionData 中放置的,相当于绕了一层,不太合适 case RepairerStatus.ACQUIRE -> { phaserRecord.interrupt(); - continue; } } - break; } while (true); } @@ -155,15 +204,50 @@ public class ActionExecutor extends AgentRunningSubModule additionalContext) { + ExtractorInput input = new ExtractorInput(); + input.setEvaluatedSlices(memoryCapability.getActivatedSlices(userId)); + input.setRecentMessages(cognationCapability.getChatMessages()); + input.setMetaActionInfo(actionCapability.loadMetaActionInfo(action.getKey())); + input.setHistoryActionResults(getHistoryActionResults(actionData)); + input.setAdditionalContext(additionalContext); + return input; } - public CorrectorInput buildCorrectorInput() { + private List getHistoryActionResults(ActionData actionData) { + int executingStage = actionData.getExecutingStage(); + if (executingStage <= 0) { + return new ArrayList<>(); + } + Map> actionChain = actionData.getActionChain(); + // executingStage 是当前正在执行的阶段,所以只需要获取到前一阶段的结果 + return actionChain.get(executingStage - 1).stream() + .map(metaAction -> { + HistoryAction historyAction = new HistoryAction(); + historyAction.setActionKey(metaAction.getKey()); + historyAction + .setDescription( + actionCapability.loadMetaActionInfo(metaAction.getKey()).getDescription()); + historyAction.setResult(metaAction.getResult().getData()); + return historyAction; + }).toList(); + } + + private CorrectorInput buildCorrectorInput() { return null; } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionRepairer.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionRepairer.java index 315d92c1..3c5ea80a 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionRepairer.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionRepairer.java @@ -1,31 +1,145 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson2.TypeReference; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; +import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; +import work.slhaf.partner.api.chat.pojo.ChatResponse; +import work.slhaf.partner.core.action.ActionCapability; +import work.slhaf.partner.core.action.ActionCore.ExecutorType; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.cognation.CognationCapability; +import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput; +import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerInput; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult; +import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult.RepairerStatus; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; /** * 负责识别行动链的修复 *
    - *
  1. - * 可通过协调 {@link DynamicActionGenerator} 生成新的行动单元并调用,获取所需的参数信息(必要时持久化); - *
  2. - *
  3. - * 也可以直接调用已存在的行动程序获取信息; - *
  4. - *
  5. - * 如果上述都无法满足,将发起自对话借助干预模块进行操作或者借助自对话通道向用户发起沟通请求,该请求的目的一般为行动程序生成/调用指导或者用户侧的信息补充,后续还需要再走一遍参数修复流程 - *
  6. + *
  7. + * 可通过协调 {@link DynamicActionGenerator} 生成新的行动单元并调用,获取所需的参数信息(必要时持久化); + *
  8. + *
  9. + * 也可以直接调用已存在的行动程序获取信息; + *
  10. + *
  11. + * 如果上述都无法满足,将发起自对话借助干预模块进行操作或者借助自对话通道向用户发起沟通请求,该请求的目的一般为行动程序生成/调用指导或者用户侧的信息补充,后续还需要再走一遍参数修复流程 + *
  12. *
*/ +@Slf4j @AgentSubModule public class ActionRepairer extends AgentRunningSubModule implements ActivateModel { + @InjectCapability + private ActionCapability actionCapability; + @InjectCapability + private CognationCapability cognationCapability; + + @InjectModule + private DynamicActionGenerator dynamicActionGenerator; + + private AssembleHelper assembleHelper = new AssembleHelper(); + @Override public RepairerResult execute(RepairerInput data) { - return null; + RepairerResult result; + try { + String prompt = assembleHelper.buildPrompt(data, null); + ChatResponse response = this.singleChat(prompt); + RepairerData repairerData = JSONObject.parseObject(response.getMessage(), RepairerData.class); + result = switch (repairerData.getRepairerType()) { + case ACTION_GENERATION -> + handleActionGeneration(JSONObject.parseObject(repairerData.getData(), GeneratorInput.class)); + case ACTION_INVOCATION -> handleActionInvocation( + JSONObject.parseObject(repairerData.getData(), new TypeReference>() { + })); + case USER_INTERACTION -> handleUserInteraction(repairerData.getData()); + }; + if (!repairerData.getRepairerType().equals(RepairerType.USER_INTERACTION) + && result.getStatus().equals(RepairerResult.RepairerStatus.FAILED)) { + log.warn("常规行动修复失败,将尝试自对话通道"); + prompt = assembleHelper.buildPrompt(data, "常规行动修复失败,请尝试通过自对话通道获取必要的信息以完成行动参数的修复"); + response = this.singleChat(prompt); + repairerData = JSONObject.parseObject(response.getMessage(), RepairerData.class); + handleUserInteraction(repairerData.getData()); + } + } catch (Exception e) { + result = new RepairerResult(); + result.setStatus(RepairerStatus.FAILED); + } + return result; + } + + /** + * 负责根据输入内容进行行动单元的参数信息修复 + * + * @param generatorInput 生成的行动单元参考内容,最好包含行动单元的执行逻辑 + * @return 修复后的行动单元结果 + */ + private RepairerResult handleActionGeneration(GeneratorInput generatorInput) { + RepairerResult result = new RepairerResult(); + GeneratorResult generatorResult = dynamicActionGenerator.execute(generatorInput); + MetaAction tempAction = generatorResult.getTempAction(); + actionCapability.execute(tempAction); + result.getFixedData().add(tempAction.getResult().getData()); + return result; + } + + /** + * 负责根据输入内容进行行动单元的参数信息修复 + * + * @param actionKeys 需要调用的行动单元Key列表 + * @return 修复后的行动单元结果 + */ + private RepairerResult handleActionInvocation(List actionKeys) { + RepairerResult result = new RepairerResult(); + CountDownLatch latch = new CountDownLatch(actionKeys.size()); + ExecutorService virtual = actionCapability.getExecutor(ExecutorType.VIRTUAL); + ExecutorService platform = actionCapability.getExecutor(ExecutorType.PLATFORM); + ExecutorService executor; + AtomicInteger failedCount = new AtomicInteger(0); + for (String key : actionKeys) { + MetaAction action = actionCapability.loadMetaAction(key); + executor = action.isIo() ? virtual : platform; + executor.execute(() -> { + try { + actionCapability.execute(action); + result.getFixedData().add(action.getResult().getData()); + } catch (Exception e) { + log.error("行动单元执行失败: {}", key, e); + failedCount.incrementAndGet(); + } finally { + latch.countDown(); + } + }); + } + if (actionKeys.size() - failedCount.get() > 0) { + result.setStatus(RepairerStatus.OK); + } else { + result.setStatus(RepairerStatus.FAILED); + } + return result; + } + + private RepairerResult handleUserInteraction(String acquireContent) { + RepairerResult result = new RepairerResult(); + result.setStatus(RepairerStatus.ACQUIRE); + // 发送自对话请求 + return result; } @Override @@ -37,4 +151,51 @@ public class ActionRepairer extends AgentRunningSubModule { + JSONObject historyItem = new JSONObject(); + historyItem.put("[行动Key]", historyAction.getActionKey()); + historyItem.put("[行动描述]", historyAction.getDescription()); + historyItem.put("[行动结果]", historyAction.getResult()); + historyData.add(historyItem); + }); + + JSONArray messageData = prompt.putArray("[最近消息列表]"); + messageData.addAll(data.getRecentMessages()); + + if (specialInstruction != null) { + prompt.put("[特殊指令]", specialInstruction); + } + + return prompt.toString(); + } + + } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ParamsExtractor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ParamsExtractor.java index d3eba6ef..44dfa680 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ParamsExtractor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ParamsExtractor.java @@ -1,20 +1,64 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; +import work.slhaf.partner.api.chat.pojo.ChatResponse; +import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorInput; import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorResult; +import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.HistoryAction; + +import java.util.List; /** * 负责依据输入内容进行行动单元的参数信息提取 */ +@Slf4j @AgentSubModule public class ParamsExtractor extends AgentRunningSubModule implements ActivateModel { @Override - public ExtractorResult execute(ExtractorInput data) { - return null; + public ExtractorResult execute(ExtractorInput input) { + String prompt = buildPrompt(input); + ChatResponse response = this.singleChat(prompt); + ExtractorResult result; + try { + result = JSONObject.parseObject(response.getMessage(), ExtractorResult.class); + } catch (Exception e) { + log.error("ParamsExtractor解析结果失败,返回内容:{}", response.getMessage(), e); + result = new ExtractorResult(); + result.setOk(false); + result.setParams(new String[0]); + } + return result; + } + + private String buildPrompt(ExtractorInput input) { + JSONObject prompt = new JSONObject(); + + JSONObject actionData = prompt.putObject("[本次行动信息]"); + MetaActionInfo actionInfo = input.getMetaActionInfo(); + actionData.put("[行动描述]", actionInfo.getDescription()); + actionData.put("[行动参数说明]", actionInfo.getParams()); + + JSONArray historyData = prompt.putArray("[历史行动执行结果]"); + List historyActions = input.getHistoryActionResults(); + for (HistoryAction historyAction : historyActions) { + JSONObject historyItem = new JSONObject(); + historyItem.put("[行动Key]", historyAction.getActionKey()); + historyItem.put("[行动描述]", historyAction.getDescription()); + historyItem.put("[行动结果]", historyAction.getResult()); + historyData.add(historyItem); + } + + JSONArray messageData = prompt.putArray("[最近消息列表]"); + messageData.addAll(input.getRecentMessages()); + + return prompt.toString(); } @Override diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ActionExecutorInput.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ActionExecutorInput.java new file mode 100644 index 00000000..c77f88dc --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ActionExecutorInput.java @@ -0,0 +1,18 @@ +package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; + +import lombok.Data; +import work.slhaf.partner.core.action.entity.ImmediateActionData; + +import java.util.List; + +@Data +public class ActionExecutorInput { + /** + * 用户ID + */ + private String userId; + /** + * 即时行动数据列表 + */ + private List immediateActions; +} diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ExtractorInput.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ExtractorInput.java index 826c1f06..4a99b457 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ExtractorInput.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/ExtractorInput.java @@ -1,8 +1,32 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; import lombok.Data; +import work.slhaf.partner.api.chat.pojo.Message; +import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; + +import java.util.List; @Data public class ExtractorInput { - + /** + * 目标 MetaActionInfo + */ + private MetaActionInfo metaActionInfo; + /** + * 可参考的记忆切片 + */ + private List evaluatedSlices; + /** + * 历史行动执行结果 + */ + private List historyActionResults; + /** + * 最近的消息列表 + */ + private List recentMessages; + /** + * 额外的上下文信息(可来自修复器等) + */ + private List additionalContext; } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/GeneratorResult.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/GeneratorResult.java index 6290856d..0b8572fb 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/GeneratorResult.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/GeneratorResult.java @@ -1,7 +1,9 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; import lombok.Data; +import work.slhaf.partner.core.action.entity.MetaAction; @Data public class GeneratorResult { + private MetaAction tempAction; } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/HistoryAction.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/HistoryAction.java new file mode 100644 index 00000000..4d0b9a0e --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/HistoryAction.java @@ -0,0 +1,10 @@ +package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; + +import lombok.Data; + +@Data +public class HistoryAction { + private String actionKey; + private String description; + private String result; +} diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/RepairerInput.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/RepairerInput.java index 92ffcc6f..02e59433 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/RepairerInput.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/RepairerInput.java @@ -1,7 +1,19 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; import lombok.Data; +import work.slhaf.partner.api.chat.pojo.Message; +import work.slhaf.partner.core.action.entity.PhaserRecord; + +import java.util.List; +import java.util.Map; @Data public class RepairerInput { + + private String userId; + private List recentMessages; + private Map params; + private String actionDescription; + private List historyActionResults; + private PhaserRecord phaserRecord; } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/RepairerResult.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/RepairerResult.java index 7f046b6d..c6e2c999 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/RepairerResult.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/entity/RepairerResult.java @@ -2,6 +2,8 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor.entity; import lombok.Data; +import java.util.List; + /** * 行动修复结果,包含行动状态和修复后的参数 */ @@ -9,7 +11,7 @@ import lombok.Data; public class RepairerResult { private RepairerStatus status; - private String[] params; + private List fixedData; public enum RepairerStatus { /** diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/exception/ActionExecutingFailedException.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/exception/ActionExecutingFailedException.java new file mode 100644 index 00000000..f10273d1 --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/exception/ActionExecutingFailedException.java @@ -0,0 +1,14 @@ +package work.slhaf.partner.module.modules.action.dispatcher.executor.exception; + +import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException; + +public class ActionExecutingFailedException extends AgentRuntimeException { + + public ActionExecutingFailedException(String message) { + super(message); + } + + public ActionExecutingFailedException(String message, Throwable cause) { + super(message, cause); + } +}