From 14a57f0be64622452a70ede2f518c505100e89dd Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sat, 29 Nov 2025 20:56:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E8=BF=9B=E8=A1=8C=E5=8A=A8=E5=B9=B2?= =?UTF-8?q?=E9=A2=84=E6=A8=A1=E5=9D=97,=E5=89=8D=E7=BD=AE=E9=83=A8?= =?UTF-8?q?=E5=88=86=E9=80=BB=E8=BE=91=E5=B7=B2=E5=9F=BA=E6=9C=AC=E5=AE=8C?= =?UTF-8?q?=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在`ActionData`中添加必要注释、新增`executingStage`字段表示当前执行阶段、移除了`WAITING`的状态类型 - 调整并修正了`ActionExecutor`中的`Phaser`阻塞逻辑 - 完善了`ActionInterventor`中`识别 -> 评估 -> 异步执行`的干预逻辑,并将干预结果以 Prompt 形式回写至流程上下文,作为主模块的已知内容 - 调整了干预模块内部的各个数据类的字段结构,适配干预流程 - 完善了`InterventionEvaluator`、`InterventionHandler`、`InterventionRecognizer`等必需的干预子模块 --- .../main/java/work/slhaf/partner/Main.java | 2 +- .../slhaf/partner/core/action/ActionCore.java | 2 +- .../core/action/entity/ActionData.java | 37 ++- .../dispatcher/executor/ActionExecutor.java | 43 ++-- .../action/interventor/ActionInterventor.java | 232 ++++++++++++------ .../entity/InterventionType.java | 2 +- .../interventor/entity/MetaIntervention.java | 21 ++ .../evaluator/InterventionEvaluator.java | 87 ++++--- .../evaluator/entity/EvaluatorInput.java | 4 +- .../evaluator/entity/EvaluatorResult.java | 17 +- .../handler/InterventionHandler.java | 176 ++++++++++++- .../handler/entity/HandlerInput.java | 38 ++- .../recognizer/InterventionRecognizer.java | 69 ++++-- .../recognizer/entity/RecognizerInput.java | 2 + .../recognizer/entity/RecognizerResult.java | 11 +- .../modules/action/planner/ActionPlanner.java | 4 +- 16 files changed, 549 insertions(+), 198 deletions(-) rename Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/{handler => }/entity/InterventionType.java (87%) create mode 100644 Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/entity/MetaIntervention.java diff --git a/Partner-Main/src/main/java/work/slhaf/partner/Main.java b/Partner-Main/src/main/java/work/slhaf/partner/Main.java index 5d5e27a0..318b385c 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/Main.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/Main.java @@ -12,7 +12,7 @@ public class Main { .setAgentConfigManager(PartnerAgentConfigManager.class) .setGateway(WebSocketGateway.class) .setAgentExceptionCallback(PartnerExceptionCallback.class) - .addAfterLaunchRunners(() -> VectorClient.load()) + .addAfterLaunchRunners(VectorClient::load) .launch(); } } \ No newline at end of file diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 91906b8f..c70813c4 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -33,7 +33,7 @@ public class ActionCore extends PartnerCore { /** * 持久行动池,以用户id为键存储所有状态的任务 */ - private HashMap> actionPool = new HashMap<>(); + private HashMap> actionPool = new HashMap<>();//TODO 考虑是否取消用户分池 /** * 待确认任务,以userId区分不同用户,因为需要跨请求确认 diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java index eadcf315..6ba26af9 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/ActionData.java @@ -1,7 +1,9 @@ package work.slhaf.partner.core.action.entity; +import cn.hutool.json.JSONObject; import lombok.Data; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -10,16 +12,47 @@ import java.util.List; */ @Data public abstract class ActionData { + /** + * 行动ID + */ protected String uuid; + /** + * 行动倾向 + */ protected String tendency; + + /** + * 行动状态 + */ protected ActionStatus status; - protected LinkedHashMap> actionChain; + /** + * 行动链 + */ + protected LinkedHashMap> actionChain = new LinkedHashMap<>(); + /** + * 行动阶段(当前阶段) + */ + protected int executingStage; + /** + * 行动结果 + */ protected String result; + protected List history = new ArrayList<>(); + + /** + * 行动原因 + */ protected String reason; + /** + * 行动描述 + */ protected String description; + /** + * 行动来源 + */ protected String source; public enum ActionStatus { - SUCCESS, FAILED, EXECUTING, WAITING, PREPARE + SUCCESS, FAILED, EXECUTING, PREPARE } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java index f50dc434..a1f0f16a 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/executor/ActionExecutor.java @@ -51,50 +51,55 @@ public class ActionExecutor extends AgentRunningSubModule { - for (MetaAction metaAction : v) { - // 根据io类型放入合适的列表 - if (metaAction.isIo()) { - virtual.add(metaAction); - } else { - platform.add(metaAction); + try { + actionChain.forEach((k, v) -> { + for (MetaAction metaAction : v) { + // 根据io类型放入合适的列表 + if (metaAction.isIo()) { + virtual.add(metaAction); + } else { + platform.add(metaAction); + } } - } - runGroupAction(virtual, platform, actionChain, phaser); - virtual.clear(); - platform.clear(); - phaser.arriveAndAwaitAdvance(); - }); - actionCapability.removePhaserRecord(phaser); + runGroupAction(virtual, platform, actionChain, phaser); + phaser.arriveAndAwaitAdvance(); + virtual.clear(); + platform.clear(); + }); + } finally { + phaser.arriveAndDeregister(); + actionCapability.removePhaserRecord(phaser); + } }); } // 使用phaser来承担同组的动态任务新增 - private void runGroupAction(List virtual, List platform, LinkedHashMap> actionChain, Phaser phaser) { + private void runGroupAction(List virtual, List platform, + LinkedHashMap> actionChain, Phaser phaser) { runGroupAction(virtual, virtualExecutor, phaser); runGroupAction(platform, platformExecutor, phaser); } private void runGroupAction(List actions, ExecutorService executor, Phaser phaser) { + phaser.bulkRegister(actions.size()); for (MetaAction action : actions) { - phaser.register(); executor.execute(() -> { try { MetaAction.Result result = action.getResult(); do { // 该循环对应LLM的调整参数后重试 if (!result.isSuccess()) { - //TODO LLM决策是重构参数、执行自对话反思、还是选择向用户求助(通过cognationCore暴露方法,可能需要修改其他模块以进行适应),仅重构参数时无需结束当前循环 + // LLM决策是重构参数、执行自对话反思、还是选择向用户求助(通过cognationCore暴露方法,可能需要修改其他模块以进行适应),仅重构参数时无需结束当前循环 // 若使用Phaser作为执行线程与反思、求助等调用流程的同步协调,应当需要额外维护Phaser全局字段,获取到反思结果或者用户反馈后, // 调用对应的phaser注册任务,在ActionExecutor中动态添加任务至actionChain,同时启动异步执行 // 而且由于执行与放入的为同一个MetaAction对象,所以执行结果可被当前行动链获取,但virtual、executor两个列表似乎不行,需要重构执行模式,建议将行动链直接重构为LinkedHashMap,order为键 String input = getInput(result.getData()); - + // 执行时不可使用`for in`和`forEach`,因为在`Intervention`相关模块存在动态调整 } action.run(); } while (!result.isSuccess()); - //TODO 将执行结果写入特定对话角色记忆(cognationCore暴露方法) + // TODO 将执行结果写入特定对话角色记忆(cognationCore暴露方法) } finally { phaser.arriveAndDeregister(); } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/ActionInterventor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/ActionInterventor.java index 9721e8f8..75cd80e1 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/ActionInterventor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/ActionInterventor.java @@ -1,5 +1,6 @@ package work.slhaf.partner.module.modules.action.interventor; +import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule; @@ -7,24 +8,32 @@ import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCore.PhaserRecord; +import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.module.common.module.PreRunningModule; +import work.slhaf.partner.module.modules.action.interventor.entity.InterventionType; +import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; import work.slhaf.partner.module.modules.action.interventor.evaluator.InterventionEvaluator; import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorInput; import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult; import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult.EvaluatedInterventionData; import work.slhaf.partner.module.modules.action.interventor.handler.InterventionHandler; import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput; -import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.HandlerInputData; +import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.ExecutingInterventionData; +import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.InterventionData; +import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.PreparedInterventionData; import work.slhaf.partner.module.modules.action.interventor.recognizer.InterventionRecognizer; import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.RecognizerInput; import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.RecognizerResult; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; -import java.util.*; -import java.util.stream.Collectors; - +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Stream; /** * 负责识别潜在的行动干预信息,作用于正在进行或已存在的行动池中内容 @@ -57,108 +66,179 @@ public class ActionInterventor extends PreRunningModule implements ActivateModel // 综合当前正在进行的行动链信息、用户交互历史、激活的记忆切片,尝试识别出是否存在行动干预意图 // 首先通过recognizer进行快速意图识别,识别成功则步入评估阶段,评估成功则直接作用于目标行动链 // 进行快速意图识别时必须结合近期对话与进行中行动链情况 + + // 干预意图识别 String uuid = context.getUuid(); String userId = context.getUserId(); RecognizerResult recognizerResult = interventionRecognizer - .execute(buildRecognizerInput(userId, context.getInput())); //此处的输入内容携带了所有 PhaserRecord + .execute(buildRecognizerInput(userId, context.getInput())); // 此处的输入内容携带了所有 PhaserRecord if (!recognizerResult.isOk()) { - // 设置相应prompt setupNoInterventionPrompt(uuid); return; } - // 存在则进一步评估、评估通过则并直接添加行动程序至对应行动链 - Map recognizedInterventions = recognizerResult.getInterventions(); //这里的 PhaserRecord 已包含从 ActionCore 获取到的执行状态下的行动及其Phaser实例 + + // 干预意图评估 EvaluatorResult evaluatorResult = interventionEvaluator - .execute(buildEvaluatorInput(recognizedInterventions, userId)); - List interventions = evaluatorResult.getDataList(); //这里的 EvaluatedInterventionData 中的 tendency 即为 recognizedInterventions 中的键 - if (evaluatorResult.isOk() && isActionKeysExist(interventions)) { - setupErrorInterventionPrompt(uuid); - } else if (evaluatorResult.isOk()) { + .execute(buildEvaluatorInput(recognizerResult, userId)); + List executingDataList = evaluatorResult.getExecutingDataList(); + List preparedDataList = evaluatorResult.getPreparedDataList(); + + // 意图评估结果处理 + if (evaluatorResult.isOk()) { + // 对存在‘异常ActionKey’的评估结果列表进行过滤 + invalidActionKeysFilter(executingDataList); + invalidActionKeysFilter(preparedDataList); + // 同步写入prompt,异步处理干预行为,‘异步’在 interventionHandler 中体现 - setupInterventionPrompt(uuid, interventions); - interventionHandler.execute(buildHandlerInput(interventions, recognizedInterventions)); + setupInterventionPrompt(uuid, executingDataList, preparedDataList); + interventionHandler.execute(buildHandlerInput(executingDataList, preparedDataList, recognizerResult)); } else { - // 同步写入prompt - setupInterventionIgnoredPrompt(uuid, interventions); + setupInterventionIgnoredPrompt(uuid, executingDataList, preparedDataList); } + } - private void setupErrorInterventionPrompt(String uuid) { - interventionPrompt.put(uuid, Map.of( - "[识别状态] <是否识别到干预已存在行动的意图>", "识别出,但出现了不存在的行动单元key", - "[干预行动] <将对已存在行动做出的行为>", "无行为" - )); - } + private void invalidActionKeysFilter(List interventions) { + + List toRemove = new ArrayList<>(); - private boolean isActionKeysExist(List interventions) { for (EvaluatedInterventionData intervention : interventions) { - String[] array = intervention.getActions().values().toArray(new String[0]); - if (!actionCapability.checkExists(array)) { + List interventionData = intervention.getInterventionData(); + List actions = new ArrayList<>(); + for (MetaIntervention metaData : interventionData) { + actions.addAll(metaData.getActions()); + } + // 如果存在异常行动key,则可视为该评估结果存在问题,直接忽略该结果 + if (!actionCapability.checkExists(actions.toArray(String[]::new))) { + toRemove.add(intervention); + } + + // 针对 REBUILD 类型进行特殊校验, REBUILD 类型必须满足所有 MetaIntervention 的类型均为 REBUILD + if (!checkRebuildType(interventionData)) { + toRemove.add(intervention); + } + } + + interventions.removeAll(toRemove); + } + + private boolean checkRebuildType(List interventionData) { + boolean hasRebuild = false; + for (MetaIntervention meta : interventionData) { + if (meta.getType() == InterventionType.REBUILD) { + hasRebuild = true; + } else if (hasRebuild) { + // 已经存在REBUILD类型,但又发现了非REBUILD类型,不合法 return false; } } + return true; } - private HandlerInput buildHandlerInput(List interventions, Map recognizedInterventions) { + /** + * @param executingDataList 对应评估结果中的‘执行中行动’ + * @param preparedDataList 对应评估结果中的‘待执行行动’ + * @param recognizerResult 干预识别结果,包含‘执行中’‘待执行’两类评估结果各自对应的行动数据 + * @return 处理器输入 + */ + private HandlerInput buildHandlerInput(List executingDataList, + List preparedDataList, RecognizerResult recognizerResult) { HandlerInput input = new HandlerInput(); - List inputDataList = input.getData(); - for (EvaluatedInterventionData interventionData : interventions) { - HandlerInputData inputData = new HandlerInputData(); - inputData.setTendency(interventionData.getTendency()); - inputData.setDescription(interventionData.getDescription()); - inputData.setType(interventionData.getType()); - inputData.setActions(interventionData.getActions()); - inputData.setRecord(recognizedInterventions.get(interventionData.getTendency())); - inputDataList.add(inputData); - } + Map executingInterventions = recognizerResult.getExecutingInterventions(); + Map preparedInterventions = recognizerResult.getPreparedInterventions(); + + List executing = setupInputDataList(executingDataList, executingInterventions, + ExecutingInterventionData::new); + List prepared = setupInputDataList(preparedDataList, preparedInterventions, + PreparedInterventionData::new); + + input.setExecuting(executing); + input.setPrepared(prepared); + return input; } - private void setupInterventionIgnoredPrompt(String uuid, List dataList) { - String s = dataList.stream() - .map(data -> JSONObject.of( - "[干预倾向]", data.getTendency(), - "[未采用原因]", data.getDescription()).toString()) - .collect(Collectors.joining(",", "[", "]")); - interventionPrompt.put(uuid, Map.of( - "[识别状态] <是否识别到干预已存在行动的意图>", "识别到,但都未采用", - "[忽略原因] <各个意图被忽略的原因>", s, - "[干预行动] <将对已存在行动做出的行为>", "无行为" - )); + /** + * @param HandlerInput 中 List 对应的泛型 + * @param evaluatedDataList 评估结果列表 + * @param interventionMap 干预识别结果中的 tendency:data 映射 + * @param factory 输入类型构建工厂 + * @return 处理器输入(干预列表) + */ + private List setupInputDataList(List evaluatedDataList, + Map interventionMap, Supplier factory) { + + List result = new ArrayList<>(); + + for (EvaluatedInterventionData interventionData : evaluatedDataList) { + + I data = factory.get(); + + if (data instanceof InterventionData inputData) { + inputData.setTendency(interventionData.getTendency()); + inputData.setDescription(interventionData.getDescription()); + inputData.setInterventions(interventionData.getInterventionData()); + } + + if (data instanceof ExecutingInterventionData inputData) { + inputData.setRecord((PhaserRecord) interventionMap.get(interventionData.getTendency())); + } else if (data instanceof PreparedInterventionData inputData) { + inputData.setActionData((ActionData) interventionMap.get(interventionData.getTendency())); + } + + result.add(data); + } + + return result; } - private void setupInterventionPrompt(String uuid, List dataList) { - List> contents = new ArrayList<>(); - List actions = new ArrayList<>(); - for (EvaluatedInterventionData data : dataList) { + private void setupInterventionIgnoredPrompt(String uuid, List executingDataList, List preparedDataList) { + List total = Stream.concat(executingDataList.stream(), preparedDataList.stream()).toList(); + + JSONArray reasons = new JSONArray(); + + for (EvaluatedInterventionData data : total) { + JSONObject reason = reasons.addObject(); + reason.put("[干预倾向]", data.getTendency()); + reason.put("[未采用原因]", data.getDescription()); + } + + synchronized (interventionPrompt) { + interventionPrompt.put(uuid, Map.of( + "[识别状态] <是否识别到干预已存在行动的意图>", "识别到,但都未采用", + "[忽略原因] <各个意图被忽略的原因>", reasons.toString(), + "[干预行动] <将对已存在行动做出的行为>", "无行为")); + } + } + + private void setupInterventionPrompt(String uuid, List executingDataList, + List preparedDataList) { + JSONArray contents = new JSONArray(); + List temp = Stream.concat(executingDataList.stream(), preparedDataList.stream()).toList(); + + for (EvaluatedInterventionData data : temp) { if (!data.isOk()) { continue; } String tendency = data.getTendency(); - contents.add(Map.of( - "[干预倾向]", tendency, - "[干预类型]", data.getType().toString(), - "[行动链变动情况]", getActionChainStr(data.getActions()) - )); - actions.add(tendency); + JSONObject newElement = contents.addObject(); + newElement.put("[干预倾向]", tendency); + JSONArray changes = newElement.putArray("[行动链变动情况]"); + + for (MetaIntervention intervention : data.getInterventionData()) { + JSONObject change = changes.addObject(); + change.put("[干预类型]", intervention.getType()); + change.put("[干预序号]", intervention.getOrder()); + change.putArray("[干预内容]").addAll(intervention.getActions()); + } } - - interventionPrompt.put(uuid, Map.of( - "[识别状态] <是否识别到干预已存在行动的意图>", "识别到,将采用", - "[具体内容] <各个干预意图对应的具体信息>", contents.toString(), - "[干预行动] <将对已存在行动做出的行为>", actions.toString() - )); - } - - private String getActionChainStr(LinkedHashMap actions) { - ArrayList list = new ArrayList<>(); - //虽说actionCapability那边做了异常抛出,但说实话很明显放在这里不好处理啊🤔,还是在前边统一检查一下吧 - actions.forEach((order, actionKey) -> { - list.add(order + ":" + actionCapability.loadMetaActionInfo(actionKey).getDescription()); - }); - return list.toString(); + synchronized (interventionPrompt) { + interventionPrompt.put(uuid, Map.of( + "[识别状态] <是否识别到干预已存在行动的意图>", "识别到,将采用", + "[干预内容] <将对已存在行动做出的行为>", contents.toString())); + } } private void setupNoInterventionPrompt(String uuid) { @@ -167,9 +247,10 @@ public class ActionInterventor extends PreRunningModule implements ActivateModel "[干预行动] <将对已存在行动做出的行为>", "无行动")); } - private EvaluatorInput buildEvaluatorInput(Map recognizedInterventions, String userId) { + private EvaluatorInput buildEvaluatorInput(RecognizerResult recognizerResult, String userId) { EvaluatorInput input = new EvaluatorInput(); - input.setInterventionTendencies(recognizedInterventions); + input.setExecutingInterventions(recognizerResult.getExecutingInterventions()); + input.setPreparedInterventions(recognizerResult.getPreparedInterventions()); input.setRecentMessages(cognationCapability.getChatMessages()); input.setActivatedSlices(memoryCapability.getActivatedSlices(userId)); return input; @@ -182,6 +263,7 @@ public class ActionInterventor extends PreRunningModule implements ActivateModel // 参考的对话列表大小或需调整 recognizerInput.setRecentMessages(cognationCapability.getChatMessages()); recognizerInput.setExecutingActions(actionCapability.listPhaserRecords()); + recognizerInput.setPreparedActions(actionCapability.listPreparedAction(userId)); return recognizerInput; } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/entity/InterventionType.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/entity/InterventionType.java similarity index 87% rename from Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/entity/InterventionType.java rename to Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/entity/InterventionType.java index ba1de1cc..19164c1a 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/entity/InterventionType.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/entity/InterventionType.java @@ -1,4 +1,4 @@ -package work.slhaf.partner.module.modules.action.interventor.handler.entity; +package work.slhaf.partner.module.modules.action.interventor.entity; public enum InterventionType { /** diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/entity/MetaIntervention.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/entity/MetaIntervention.java new file mode 100644 index 00000000..0984548d --- /dev/null +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/entity/MetaIntervention.java @@ -0,0 +1,21 @@ +package work.slhaf.partner.module.modules.action.interventor.entity; + +import lombok.Data; + +import java.util.List; + +@Data +public class MetaIntervention { + /** + * 干预数据类型 + */ + private InterventionType type; + /** + * 干预数据对应的行动链序列 + */ + private int order; + /** + * 干预数据所需的行动key列表 + */ + private List actions; +} \ No newline at end of file diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/InterventionEvaluator.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/InterventionEvaluator.java index 562ee755..bccd2154 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/InterventionEvaluator.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/InterventionEvaluator.java @@ -1,14 +1,6 @@ package work.slhaf.partner.module.modules.action.interventor.evaluator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; - -import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; - import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; @@ -19,11 +11,17 @@ import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCore.ExecutorType; import work.slhaf.partner.core.action.ActionCore.PhaserRecord; +import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorInput; import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult; import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult.EvaluatedInterventionData; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + @Slf4j @AgentSubModule public class InterventionEvaluator extends AgentRunningSubModule @@ -32,35 +30,26 @@ public class InterventionEvaluator extends AgentRunningSubModule evaluatedDataList = result.getDataList(); - - Map interventionTendencies = data.getInterventionTendencies(); - Set tendencies = interventionTendencies.keySet(); + public EvaluatorResult execute(EvaluatorInput input) { + // 获取必须数据 ExecutorService executor = actionCapability.getExecutor(ExecutorType.VIRTUAL); + Map executingInterventions = input.getExecutingInterventions(); + Map preparedInterventions = input.getPreparedInterventions(); + CountDownLatch latch = new CountDownLatch(executingInterventions.size() + preparedInterventions.size()); + + // 创建结果容器 + EvaluatorResult result = new EvaluatorResult(); + List executingDataList = result.getExecutingDataList(); + List preparedDataList = result.getPreparedDataList(); + + // 并发评估 + evaluateIntervention(executingDataList, executingInterventions, input, executor, latch); + evaluateIntervention(preparedDataList, preparedInterventions, input, executor, latch); - CountDownLatch latch = new CountDownLatch(tendencies.size()); - tendencies.forEach(tendency -> { - executor.execute(() -> { - try { - String input = buildPrompt(data.getRecentMessages(), data.getActivatedSlices(), - interventionTendencies.get(tendency), tendency); - ChatResponse response = singleChat(input); - EvaluatedInterventionData evaluatedData = JSONObject.parseObject(response.getMessage(), - EvaluatedInterventionData.class); - synchronized (evaluatedDataList) { - evaluatedDataList.add(evaluatedData); - } - } catch (Exception e) { - log.error("干预意图评估出错: " + tendency, e); - } finally { - latch.countDown(); - } - }); - }); try { latch.await(); } catch (InterruptedException e) { @@ -70,13 +59,39 @@ public class InterventionEvaluator extends AgentRunningSubModule void evaluateIntervention(List evaluatedDataList, Map interventionMap, EvaluatorInput input, ExecutorService executor, CountDownLatch latch) { + interventionMap.forEach((tendency, data) -> { + executor.execute(() -> { + try { + ActionData actionData = switch (data) { + case PhaserRecord record -> record.actionData(); + case ActionData tempData -> tempData; + default -> null; + }; + String prompt = buildPrompt(input.getRecentMessages(), input.getActivatedSlices(), actionData, tendency); + + ChatResponse response = this.singleChat(prompt); + EvaluatedInterventionData evaluatedData = JSONObject.parseObject(response.getMessage(), + EvaluatedInterventionData.class); + synchronized (evaluatedDataList) { + evaluatedDataList.add(evaluatedData); + } + } catch (Exception e) { + log.error("干预意图评估出错: {}", tendency, e); + } finally { + latch.countDown(); + } + }); + }); + } + private String buildPrompt(List recentMessages, List activatedSlices, - PhaserRecord phaserRecord, String tendency) { + ActionData actionData, String tendency) { JSONObject json = new JSONObject(); json.put("干预倾向", tendency); json.putArray("近期对话").addAll(recentMessages); json.putArray("参考记忆").addAll(activatedSlices); - json.put("将干预的行动", JSONObject.toJSONString(phaserRecord.actionData())); + json.put("将干预的行动", JSONObject.toJSONString(actionData)); return json.toJSONString(); } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/entity/EvaluatorInput.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/entity/EvaluatorInput.java index af01c874..854f257e 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/entity/EvaluatorInput.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/entity/EvaluatorInput.java @@ -3,6 +3,7 @@ package work.slhaf.partner.module.modules.action.interventor.evaluator.entity; import lombok.Data; import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.core.action.ActionCore.PhaserRecord; +import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; import java.util.List; @@ -10,7 +11,8 @@ import java.util.Map; @Data public class EvaluatorInput { - private Map interventionTendencies; + private Map executingInterventions; + private Map preparedInterventions; private List activatedSlices; private List recentMessages; } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/entity/EvaluatorResult.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/entity/EvaluatorResult.java index eeda3cb6..5b927992 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/entity/EvaluatorResult.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/evaluator/entity/EvaluatorResult.java @@ -1,11 +1,9 @@ package work.slhaf.partner.module.modules.action.interventor.evaluator.entity; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; - import lombok.Data; -import work.slhaf.partner.module.modules.action.interventor.handler.entity.InterventionType; +import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; + +import java.util.List; /** * 干预倾向评估结果,包含评估通过的倾向文本、对行动链的行为、指定操作的行动单元key、未通过的原因 @@ -16,7 +14,8 @@ public class EvaluatorResult { * 是否存在通过的干预倾向 */ private boolean ok; - private List dataList = new ArrayList<>(); + private List executingDataList; + private List preparedDataList; @Data public static class EvaluatedInterventionData { @@ -29,10 +28,6 @@ public class EvaluatorResult { * 描述信息(包括通过、失败原因) */ private String description; - private InterventionType type; - /** - * 键为执行顺序,值为行动单元对应的key - */ - private LinkedHashMap actions; + private List interventionData; } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java index ade553f0..4d8bb525 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/InterventionHandler.java @@ -1,16 +1,27 @@ package work.slhaf.partner.module.modules.action.interventor.handler; -import java.util.List; -import java.util.concurrent.ExecutorService; - import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCore.ExecutorType; +import work.slhaf.partner.core.action.ActionCore.PhaserRecord; +import work.slhaf.partner.core.action.entity.ActionData; +import work.slhaf.partner.core.action.entity.ActionData.ActionStatus; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.module.modules.action.interventor.entity.InterventionType; +import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput; -import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.HandlerInputData; +import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.ExecutingInterventionData; +import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.InterventionData; +import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput.PreparedInterventionData; + +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Phaser; @Slf4j @AgentSubModule @@ -19,18 +30,159 @@ public class InterventionHandler extends AgentRunningSubModule { - log.debug("干预开始执行"); - List dataList = data.getData(); - for (HandlerInputData inputData : dataList) { - log.debug("干预操作: {}, 干预类型: {}",inputData.getTendency(),inputData.getType()); - - } - }); + handle(data.getExecuting(), executor); + handle(data.getPrepared(), executor); return null; } + private void handle(List executing, ExecutorService executor) { + executor.execute(() -> { + executing.forEach(interventionData -> { + // 干预逻辑一致 + // 同步操作不同 + // HandlerAction 抽取同步逻辑 + // 此处进行遍历 intervention + // 根据Intervention类型进行分发 + + List interventions = interventionData.getInterventions(); + if (interventionData instanceof ExecutingInterventionData data) { + handleInterventions(interventions, data.getRecord()); + } else if (interventionData instanceof PreparedInterventionData data) { + handleInterventions(interventions, data.getActionData()); + } + + }); + }); + } + + private void handleInterventions(List interventions, T data) { + // 加载数据 + Phaser phaser = null; + ActionData actionData = switch (data) { + case PhaserRecord record -> { + phaser = record.phaser(); + yield record.actionData(); + } + case ActionData tempData -> tempData; + default -> null; + }; + if (actionData == null) { + return; + } + + // 加锁确保同步 + synchronized (actionData) { + applyInterventions(interventions, actionData, phaser); + } + } + + private void applyInterventions(List interventions, ActionData actionData, Phaser phaser) { + boolean rebuildCleanTag = false; + + interventions.sort(Comparator.comparingInt(MetaIntervention::getOrder)); + + for (MetaIntervention intervention : interventions) { + List actions = intervention.getActions() + .stream() + .map(actionKey -> actionCapability.loadMetaAction(actionKey)) + .toList(); + + switch (intervention.getType()) { + case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions); + case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions, phaser); + case InterventionType.DELETE -> handleDelete(actionData, intervention.getOrder(), actions); + case InterventionType.CANCEL -> handleCancel(actionData); + case InterventionType.REBUILD -> { + if (!rebuildCleanTag) { + cleanActionData(actionData); + rebuildCleanTag = true; + } + handleRebuild(actionData, intervention.getOrder(), actions); + } + } + } + + } + + /** + * 在未进入执行阶段的行动单元组新增新的行动 + */ + private void handleAppend(ActionData actionData, int order, List actions) { + if (order <= actionData.getExecutingStage()) return; + + actionData.getActionChain().put(order, actions); + } + + /** + * 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动, 如果插入位置正处于执行阶段, 则启动执行线程, 通过 Phaser 确保同步 + */ + private void handleInsert(ActionData actionData, int order, List actions, Phaser phaser) { + if (order < actionData.getExecutingStage()) return; + + phaser.register(); + try { + LinkedHashMap> actionChain = actionData.getActionChain(); + actionChain.put(order, actions); + + if (order == actionData.getExecutingStage()) { + ExecutorService virtualExecutor = actionCapability.getExecutor(ExecutorType.VIRTUAL); + ExecutorService platformExecutor = actionCapability.getExecutor(ExecutorType.PLATFORM); + ExecutorService executor; + phaser.bulkRegister(actions.size()); + + for (MetaAction action : actions) { + executor = action.isIo() ? virtualExecutor : platformExecutor; + executor.execute(() -> { + try { + action.run(); + } finally { + phaser.arriveAndDeregister(); + } + }); + } + + } + } finally { + phaser.arriveAndDeregister(); + } + } + + private void handleDelete(ActionData actionData, int order, List actions) { + if (order <= actionData.getExecutingStage()) return; + + LinkedHashMap> actionChain = actionData.getActionChain(); + if (actionChain.containsKey(order)) { + actionChain.get(order).removeAll(actions); + if (actionChain.get(order).isEmpty()) { + actionChain.remove(order); + } + } + } + + private void handleCancel(ActionData actionData) { + actionData.setStatus(ActionStatus.FAILED); + actionData.setResult("行动取消"); + } + + private void handleRebuild(ActionData actionData, int order, List actions) { + LinkedHashMap> actionChain = actionData.getActionChain(); + actionChain.put(order, actions); + } + + private void cleanActionData(ActionData actionData) { + actionData.getActionChain().clear(); + actionData.setExecutingStage(0); + actionData.setStatus(ActionStatus.PREPARE); + actionData.getHistory().clear(); + } + } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/entity/HandlerInput.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/entity/HandlerInput.java index 21beb361..39b038ba 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/entity/HandlerInput.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/handler/entity/HandlerInput.java @@ -1,23 +1,39 @@ package work.slhaf.partner.module.modules.action.interventor.handler.entity; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; - import lombok.Data; +import lombok.EqualsAndHashCode; import work.slhaf.partner.core.action.ActionCore.PhaserRecord; +import work.slhaf.partner.core.action.entity.ActionData; +import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; + +import java.util.List; @Data public class HandlerInput { - - private List data = new ArrayList<>(); + + private List executing; + private List prepared; @Data - public static class HandlerInputData{ - private String tendency; - private String description; - private InterventionType type; - private LinkedHashMap actions; + @EqualsAndHashCode(callSuper = true) + public static class ExecutingInterventionData extends InterventionData { private PhaserRecord record; } + + @Data + @EqualsAndHashCode(callSuper = true) + public static class PreparedInterventionData extends InterventionData { + private ActionData actionData; + } + + /** + * 针对一个干预倾向而言,有可能针对一个行动数据做出多种、不同类型的干预操作,即封装为 InterventionData 内部的 MetaIntervention 列表 + */ + @Data + public static abstract class InterventionData { + protected String tendency; + protected String description; + protected List interventions; + } + } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/InterventionRecognizer.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/InterventionRecognizer.java index 6cb694d9..6136d658 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/InterventionRecognizer.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/InterventionRecognizer.java @@ -28,30 +28,21 @@ public class InterventionRecognizer extends AgentRunningSubModule resultInterventionMap = recognizerResult.getInterventions(); List executingActions = input.getExecutingActions(); - CountDownLatch countDownLatch = new CountDownLatch(executingActions.size()); - for (ActionCore.PhaserRecord record : executingActions) { - executor.execute(() -> { - try { - String prompt = buildPrompt(record, input); - ChatResponse response = this.singleChat(prompt); - MetaRecognizerResult result = JSONObject.parseObject(response.getMessage(), MetaRecognizerResult.class); - if (result.isOk()) { - synchronized (resultInterventionMap) { - resultInterventionMap.put(result.getIntervention(), record); - } - } - } catch (Exception e) { - log.error("LLM干预意图提取出错", e); - } finally { - countDownLatch.countDown(); - } - }); - } + List preparedActions = input.getPreparedActions(); + CountDownLatch countDownLatch = new CountDownLatch(executingActions.size() + preparedActions.size()); + + // 创建结果容器 + RecognizerResult recognizerResult = new RecognizerResult(); + Map executingInterventions = recognizerResult.getExecutingInterventions(); + Map preparedInterventions = recognizerResult.getPreparedInterventions(); + + // 执行识别操作 + recognizeIntervention(executingInterventions, executingActions, executor, input, countDownLatch); + recognizeIntervention(preparedInterventions, preparedActions, executor, input, countDownLatch); + try { countDownLatch.await(); } catch (InterruptedException e) { @@ -60,12 +51,40 @@ public class InterventionRecognizer extends AgentRunningSubModule void recognizeIntervention(Map interventionsMap, List actions, ExecutorService executor, RecognizerInput input, CountDownLatch latch) { + for (T data : actions) { + executor.execute(() -> { + try { + String prompt = buildPrompt(data, input); + ChatResponse response = this.singleChat(prompt); + MetaRecognizerResult result = JSONObject.parseObject(response.getMessage(), MetaRecognizerResult.class); + if (result.isOk()) { + synchronized (interventionsMap) { + interventionsMap.put(result.getIntervention(), data); + } + } + } catch (Exception e) { + log.error("LLM干预意图提取出错", e); + } finally { + latch.countDown(); + } + }); + } + } + + private String buildPrompt(T data, RecognizerInput input) { + ActionData actionData = switch (data) { + case ActionCore.PhaserRecord record -> record.actionData(); + case ActionData tempData -> tempData; + default -> null; + }; + if (actionData == null) { + return null; + } JSONObject json = new JSONObject(); JSONObject actionInfo = json.putObject("行动信息"); - actionInfo.put("行动倾向", actionData.getStatus()); + actionInfo.put("行动倾向", actionData.getTendency()); actionInfo.put("行动原因", actionData.getReason()); actionInfo.put("行动描述", actionData.getDescription()); actionInfo.put("行动状态", actionData.getStatus()); diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/entity/RecognizerInput.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/entity/RecognizerInput.java index 4b7c8153..078b8613 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/entity/RecognizerInput.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/entity/RecognizerInput.java @@ -3,6 +3,7 @@ package work.slhaf.partner.module.modules.action.interventor.recognizer.entity; import lombok.Data; import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.core.action.ActionCore; +import work.slhaf.partner.core.action.entity.ActionData; import java.util.List; @@ -18,4 +19,5 @@ public class RecognizerInput { * 正在执行的行动-Phaser记录列表,在Recognizer中结合本次输入并发评估(考虑到不同行动链之间对LLM的影响) */ private List executingActions; + private List preparedActions; } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/entity/RecognizerResult.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/entity/RecognizerResult.java index db52d39b..f7af4f89 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/entity/RecognizerResult.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/interventor/recognizer/entity/RecognizerResult.java @@ -2,6 +2,7 @@ package work.slhaf.partner.module.modules.action.interventor.recognizer.entity; import lombok.Data; import work.slhaf.partner.core.action.ActionCore; +import work.slhaf.partner.core.action.entity.ActionData; import java.util.HashMap; import java.util.Map; @@ -12,10 +13,18 @@ public class RecognizerResult { private boolean ok; /** + *

将被干预的‘执行中行动’

* key: 干预倾向 *
* value: 干预倾向将作用的 phaser 记录 */ - private Map interventions = new HashMap<>(); + private Map executingInterventions = new HashMap<>(); + /** + *

将被干预的‘等待中行动’

+ * key: 干预倾向 + *
+ * value: 干预倾向将作用的行动数据 + */ + private Map preparedInterventions = new HashMap<>(); } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java index 2a230e5b..871168eb 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java @@ -235,7 +235,7 @@ public class ActionPlanner extends PreRunningModule { return switch (evaluatorResult.getType()) { case PLANNING -> { ScheduledActionData actionInfo = new ScheduledActionData(); - actionInfo.setActionChain(actionChain); + actionInfo.getActionChain().putAll(actionChain); actionInfo.setScheduleContent(evaluatorResult.getScheduleContent()); actionInfo.setStatus(ActionData.ActionStatus.PREPARE); actionInfo.setUuid(UUID.randomUUID().toString()); @@ -243,7 +243,7 @@ public class ActionPlanner extends PreRunningModule { } case IMMEDIATE -> { ImmediateActionData actionInfo = new ImmediateActionData(); - actionInfo.setActionChain(actionChain); + actionInfo.getActionChain().putAll(actionChain); actionInfo.setStatus(ActionData.ActionStatus.PREPARE); actionInfo.setUuid(UUID.randomUUID().toString()); yield actionInfo;