开始推进 ActionDispatcher 模块

- ActionDispatcher 划分为 ActionScheduler 和 ActionExecutor 两个子模块,分别负责处理计划任务和即时任务
- 正式确定 Action 将以 ActionChain 的形式进行执行,也采用同组并发策略,按照 order 字段在 chain 中进行排序
- 调整了 ActionInfo 等类以适应当前的元行动类
- 对于行动能力的支持,或可考虑这几种方式: Agent自生成python脚本(必须经过验证,确认可执行且无风险)、MCP调用(需适配为Partner所支持的形式)、普通插件(在指定目录动态加载)
This commit is contained in:
2025-10-22 15:15:38 +08:00
parent 6bfa941c35
commit 21b3a0e846
18 changed files with 192 additions and 79 deletions

View File

@@ -1,24 +1,24 @@
package work.slhaf.partner.core.action;
import work.slhaf.partner.api.agent.factory.capability.annotation.Capability;
import work.slhaf.partner.core.action.entity.CacheAdjustData;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.ActionInfo;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import java.util.List;
@Capability(value = "action")
public interface ActionCapability {
void putPreparedAction(String uuid, MetaActionInfo metaActionInfo);
void putPreparedAction(String uuid, ActionInfo actionInfo);
List<MetaActionInfo> popPreparedAction(String userId);
List<ActionInfo> popPreparedAction(String userId);
List<MetaActionInfo> popPendingAction(String userId);
List<ActionInfo> popPendingAction(String userId);
List<MetaActionInfo> listPreparedAction(String userId);
List<ActionInfo> listPreparedAction(String userId);
List<MetaActionInfo> listPendingAction(String userId);
List<ActionInfo> listPendingAction(String userId);
void putPendingActions(String userId, MetaActionInfo metaActionInfo);
void putPendingActions(String userId, ActionInfo actionInfo);
List<String> selectTendencyCache(String input);

View File

@@ -1,14 +1,14 @@
package work.slhaf.partner.core.action;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.Capability;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod;
import work.slhaf.partner.common.vector.VectorClient;
import work.slhaf.partner.core.PartnerCore;
import work.slhaf.partner.core.action.entity.ActionCacheData;
import work.slhaf.partner.core.action.entity.CacheAdjustData;
import work.slhaf.partner.core.action.entity.CacheAdjustMetaData;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.ActionInfo;
import work.slhaf.partner.core.action.entity.cache.ActionCacheData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
import java.io.IOException;
import java.util.ArrayList;
@@ -21,19 +21,19 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@SuppressWarnings("FieldMayBeFinal")
@Capability(value = "action")
@CapabilityCore(value = "action")
@Slf4j
public class ActionCore extends PartnerCore<ActionCore> {
/**
* 对应本次交互即将执行或将要放置在行动池的预备任务因此将以本次交互的uuid为键其起到的作用相当于暂时的模块上下文
*/
private HashMap<String, List<MetaActionInfo>> preparedActions = new HashMap<>();
private HashMap<String, List<ActionInfo>> preparedActions = new HashMap<>();
/**
* 待确认任务以userId区分不同用户因为需要跨请求确认
*/
private HashMap<String, List<MetaActionInfo>> pendingActions = new HashMap<>();
private HashMap<String, List<ActionInfo>> pendingActions = new HashMap<>();
/**
* 语义缓存与行为倾向映射
@@ -48,44 +48,44 @@ public class ActionCore extends PartnerCore<ActionCore> {
}
@CapabilityMethod
public synchronized void putPendingActions(String userId, MetaActionInfo metaActionInfo) {
public synchronized void putPendingActions(String userId, ActionInfo actionInfo) {
pendingActions.computeIfAbsent(userId, k -> {
List<MetaActionInfo> temp = new ArrayList<>();
temp.add(metaActionInfo);
List<ActionInfo> temp = new ArrayList<>();
temp.add(actionInfo);
return temp;
});
}
@CapabilityMethod
public synchronized List<MetaActionInfo> popPendingAction(String userId) {
List<MetaActionInfo> infos = pendingActions.get(userId);
public synchronized List<ActionInfo> popPendingAction(String userId) {
List<ActionInfo> infos = pendingActions.get(userId);
pendingActions.remove(userId);
return infos;
}
@CapabilityMethod
public synchronized void putPreparedAction(String uuid, MetaActionInfo metaActionInfo) {
public synchronized void putPreparedAction(String uuid, ActionInfo actionInfo) {
preparedActions.computeIfAbsent(uuid, k -> {
List<MetaActionInfo> temp = new ArrayList<>();
temp.add(metaActionInfo);
List<ActionInfo> temp = new ArrayList<>();
temp.add(actionInfo);
return temp;
});
}
@CapabilityMethod
public synchronized List<MetaActionInfo> popPreparedAction(String userId) {
List<MetaActionInfo> infos = preparedActions.get(userId);
public synchronized List<ActionInfo> popPreparedAction(String userId) {
List<ActionInfo> infos = preparedActions.get(userId);
preparedActions.remove(userId);
return infos;
}
@CapabilityMethod
public List<MetaActionInfo> listPreparedAction(String userId) {
public List<ActionInfo> listPreparedAction(String userId) {
return preparedActions.get(userId);
}
@CapabilityMethod
public List<MetaActionInfo> listPendingAction(String userId) {
public List<ActionInfo> listPendingAction(String userId) {
return pendingActions.get(userId);
}

View File

@@ -1,11 +0,0 @@
package work.slhaf.partner.core.action.entity;
import lombok.Data;
@Data
public class ActionData {
private String key;
private String[] array;
private String reason;
private String description;
}

View File

@@ -2,11 +2,15 @@ package work.slhaf.partner.core.action.entity;
import lombok.Data;
import java.util.List;
@Data
public abstract class MetaActionInfo {
public abstract class ActionInfo {
protected String uuid;
protected String tendency;
protected ActionStatus status;
protected ActionData actionData;
protected List<MetaAction> actionChain;
protected String Result;
protected String reason;
protected String description;
}

View File

@@ -5,5 +5,5 @@ import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
@Data
public class ImmediateActionInfo extends MetaActionInfo{
public class ImmediateActionInfo extends ActionInfo {
}

View File

@@ -0,0 +1,26 @@
package work.slhaf.partner.core.action.entity;
import lombok.Data;
import org.jetbrains.annotations.NotNull;
@Data
public class MetaAction implements Comparable<MetaAction> {
//行动key
private String key;
//行动参数
private String[] params;
//行动回应
private String response;
//执行顺序,升序排列
private int order;
private boolean io;
@Override
public int compareTo(@NotNull MetaAction metaAction) {
return this.order - metaAction.order;
}
private void execute() {
}
}

View File

@@ -5,7 +5,7 @@ import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
@Data
public class ScheduledActionInfo extends MetaActionInfo {
public class ScheduledActionInfo extends ActionInfo {
private ScheduledType type;
private String scheduleContent; //如果为周期则对应cron表达式如果为一次性则对应为LocalDateTime字符串

View File

@@ -1,4 +1,4 @@
package work.slhaf.partner.core.action.entity;
package work.slhaf.partner.core.action.entity.cache;
import lombok.Data;
import work.slhaf.partner.common.vector.VectorClient;

View File

@@ -1,4 +1,4 @@
package work.slhaf.partner.core.action.entity;
package work.slhaf.partner.core.action.entity.cache;
import lombok.Data;

View File

@@ -1,4 +1,4 @@
package work.slhaf.partner.core.action.entity;
package work.slhaf.partner.core.action.entity.cache;
import lombok.Data;

View File

@@ -0,0 +1,61 @@
package work.slhaf.partner.module.modules.action.dispatcher;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.entity.ActionInfo;
import work.slhaf.partner.core.action.entity.ImmediateActionInfo;
import work.slhaf.partner.core.action.entity.ScheduledActionInfo;
import work.slhaf.partner.module.common.module.PostRunningModule;
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor;
import work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.util.ArrayList;
import java.util.List;
@AgentModule(name = "action_dispatcher", order = 7)
public class ActionDispatcher extends PostRunningModule {
@InjectCapability
private ActionCapability actionCapability;
@InjectModule
private ActionExecutor actionExecutor;
@InjectModule
private ActionScheduler actionScheduler;
private InteractionThreadPoolExecutor executor;
@Init
public void init() {
executor = InteractionThreadPoolExecutor.getInstance();
}
@Override
public void doExecute(PartnerRunningFlowContext context) {
//只需要处理prepared action因为pending action在用户确认后也将变为prepared action
//将PLANNING action放入时间轮中IMMEDIATE action直接进入并发执行流
//对于将触发的PLANNING action理想做法是将执行工具做成执行链的形式模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力避免绑定固定流程
executor.execute(() -> {
String userId = context.getUserId();
List<ActionInfo> preparedActions = actionCapability.popPreparedAction(userId);
//分类成PLANNING和IMMEDIATE两类
List<ScheduledActionInfo> scheduledActions = new ArrayList<>();
List<ImmediateActionInfo> immediateActions = new ArrayList<>();
for (ActionInfo preparedAction : preparedActions) {
if (preparedAction instanceof ScheduledActionInfo actionInfo) {
scheduledActions.add(actionInfo);
} else if (preparedAction instanceof ImmediateActionInfo actionInfo) {
immediateActions.add(actionInfo);
}
}
actionExecutor.execute(immediateActions);
actionScheduler.execute(scheduledActions);
});
}
}

View File

@@ -0,0 +1,22 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.entity.ImmediateActionInfo;
import java.util.List;
@AgentSubModule
public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionInfo>, Void> {
@InjectCapability
private ActionCapability actionCapability;
@Override
public Void execute(List<ImmediateActionInfo> immediateActions) {
return null;
}
}

View File

@@ -0,0 +1,16 @@
package work.slhaf.partner.module.modules.action.dispatcher.scheduler;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.core.action.entity.ScheduledActionInfo;
import java.util.List;
@AgentSubModule
public class ActionScheduler extends AgentRunningSubModule<List<ScheduledActionInfo>, Void> {
@Override
public Void execute(List<ScheduledActionInfo> data) {
return null;
}
}

View File

@@ -9,7 +9,12 @@ import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor;
import work.slhaf.partner.common.vector.VectorClient;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.entity.*;
import work.slhaf.partner.core.action.entity.ActionInfo;
import work.slhaf.partner.core.action.entity.ActionStatus;
import work.slhaf.partner.core.action.entity.ImmediateActionInfo;
import work.slhaf.partner.core.action.entity.ScheduledActionInfo;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.core.perceive.PerceiveCapability;
@@ -129,14 +134,14 @@ public class ActionPlanner extends PreRunningModule {
}
private void setupPendingActionInfo(PartnerRunningFlowContext context, ConfirmerResult result) {
//TODO 需考虑未确认任务的失效或者拒绝时机
//TODO 需考虑未确认任务的失效或者拒绝时机在action core中实现
List<String> uuids = result.getUuids();
if (uuids == null) {
return;
}
String contextUuid = context.getUuid();
List<MetaActionInfo> pendingActions = actionCapability.popPendingAction(context.getUserId());
for (MetaActionInfo actionInfo : pendingActions) {
List<ActionInfo> pendingActions = actionCapability.popPendingAction(context.getUserId());
for (ActionInfo actionInfo : pendingActions) {
if (uuids.contains(actionInfo.getUuid())) {
actionCapability.putPreparedAction(contextUuid, actionInfo);
}
@@ -146,11 +151,11 @@ public class ActionPlanner extends PreRunningModule {
private void setupActionInfo(List<EvaluatorResult> evaluatorResults, PartnerRunningFlowContext context) {
for (EvaluatorResult evaluatorResult : evaluatorResults) {
MetaActionInfo metaActionInfo = assemblyHelper.buildMetaActionInfo(evaluatorResult);
ActionInfo actionInfo = assemblyHelper.buildMetaActionInfo(evaluatorResult);
if (evaluatorResult.isNeedConfirm()) {
actionCapability.putPendingActions(context.getUserId(), metaActionInfo);
actionCapability.putPendingActions(context.getUserId(), actionInfo);
} else {
actionCapability.putPreparedAction(context.getUuid(), metaActionInfo);
actionCapability.putPreparedAction(context.getUuid(), actionInfo);
}
}
}
@@ -165,7 +170,7 @@ public class ActionPlanner extends PreRunningModule {
}
private void setupPendingActions(HashMap<String, String> map, String userId) {
List<MetaActionInfo> actionInfos = actionCapability.listPendingAction(userId);
List<ActionInfo> actionInfos = actionCapability.listPendingAction(userId);
if (actionInfos == null || actionInfos.isEmpty()) {
map.put("[待确认行动] <待确认行动信息>", "无待确认行动");
return;
@@ -176,7 +181,7 @@ public class ActionPlanner extends PreRunningModule {
}
private void setupPreparedActions(HashMap<String, String> map, String uuid) {
List<MetaActionInfo> actionInfos = actionCapability.listPreparedAction(uuid);
List<ActionInfo> actionInfos = actionCapability.listPreparedAction(uuid);
if (actionInfos == null || actionInfos.isEmpty()) {
map.put("[预备行动] <预备行动信息>", "无预备行动");
return;
@@ -186,11 +191,10 @@ public class ActionPlanner extends PreRunningModule {
}
}
private String generateActionStr(MetaActionInfo metaActionInfo) {
ActionData actionData = metaActionInfo.getActionData();
return "<行动倾向>" + " : " + metaActionInfo.getTendency() +
"<行动原因>" + " : " + actionData.getReason() +
"<工具描述>" + " : " + actionData.getDescription();
private String generateActionStr(ActionInfo actionInfo) {
return "<行动倾向>" + " : " + actionInfo.getTendency() +
"<行动原因>" + " : " + actionInfo.getReason() +
"<工具描述>" + " : " + actionInfo.getDescription();
}
@Override
@@ -225,11 +229,11 @@ public class ActionPlanner extends PreRunningModule {
return input;
}
private MetaActionInfo buildMetaActionInfo(EvaluatorResult evaluatorResult) {
private ActionInfo buildMetaActionInfo(EvaluatorResult evaluatorResult) {
return switch (evaluatorResult.getType()) {
case PLANNING -> {
ScheduledActionInfo actionInfo = new ScheduledActionInfo();
actionInfo.setActionData(evaluatorResult.getActionData());
actionInfo.setActionChain(evaluatorResult.getActionChain());
actionInfo.setScheduleContent(evaluatorResult.getScheduleContent());
actionInfo.setStatus(ActionStatus.PREPARE);
actionInfo.setUuid(UUID.randomUUID().toString());
@@ -237,7 +241,7 @@ public class ActionPlanner extends PreRunningModule {
}
case IMMEDIATE -> {
ImmediateActionInfo actionInfo = new ImmediateActionInfo();
actionInfo.setActionData(evaluatorResult.getActionData());
actionInfo.setActionChain(evaluatorResult.getActionChain());
actionInfo.setStatus(ActionStatus.PREPARE);
actionInfo.setUuid(UUID.randomUUID().toString());
yield actionInfo;
@@ -248,7 +252,7 @@ public class ActionPlanner extends PreRunningModule {
private ConfirmerInput buildConfirmerInput(PartnerRunningFlowContext context) {
ConfirmerInput confirmerInput = new ConfirmerInput();
confirmerInput.setInput(context.getInput());
List<MetaActionInfo> pendingActions = actionCapability.listPendingAction(context.getUserId());
List<ActionInfo> pendingActions = actionCapability.listPendingAction(context.getUserId());
confirmerInput.setActionInfos(pendingActions);
return confirmerInput;
}

View File

@@ -2,13 +2,13 @@ package work.slhaf.partner.module.modules.action.planner.confirmer.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.ActionInfo;
import java.util.List;
@Data
public class ConfirmerInput {
private String input;
private List<MetaActionInfo> actionInfos;
private List<ActionInfo> actionInfos;
private List<Message> recentMessages;
}

View File

@@ -1,8 +1,10 @@
package work.slhaf.partner.module.modules.action.planner.evaluator.entity;
import lombok.Data;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ActionType;
import work.slhaf.partner.core.action.entity.MetaAction;
import java.util.List;
@Data
public class EvaluatorResult {
@@ -10,6 +12,6 @@ public class EvaluatorResult {
private boolean needConfirm;
private ActionType type;
private String scheduleContent;
private ActionData actionData;
private List<MetaAction> actionChain;
private String tendency;
}

View File

@@ -1,4 +0,0 @@
package work.slhaf.partner.module.modules.action.scheduler;
public class ActionDispatcher {
}

View File

@@ -1,7 +0,0 @@
package work.slhaf.partner.module.modules.action.scheduler;
/**
* 负责综合前置模块
*/
public class TaskScheduler {
}