推进 ActionExecutor 相关的动态插拔式行动调度机制

- 移除先前构想的 SpecializedPartnerInputData 及相关类,无论是自反思、向用户求助还是用户主动干预,都应当通过语义识别来作用于对应行动事件,使用固定行动id的机制不足以支撑这种机制
- 在 ActionCore 中新增执行中行动的 phaser 管理逻辑
- 新增几个异常类,适用于行动数据加载的异常情况
- 新增 ActionIdentifier 负责行动干预意图的识别
-
This commit is contained in:
2025-10-28 23:23:16 +08:00
parent e35e18f3b7
commit f9c3cacfea
17 changed files with 177 additions and 82 deletions

View File

@@ -1,12 +1,15 @@
package work.slhaf.partner.core.action; package work.slhaf.partner.core.action;
import lombok.NonNull;
import work.slhaf.partner.api.agent.factory.capability.annotation.Capability; import work.slhaf.partner.api.agent.factory.capability.annotation.Capability;
import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
@Capability(value = "action") @Capability(value = "action")
public interface ActionCapability { public interface ActionCapability {
@@ -27,4 +30,13 @@ public interface ActionCapability {
ExecutorService getExecutor(ActionCore.ExecutorType type); ExecutorService getExecutor(ActionCore.ExecutorType type);
Set<String> getExistedMetaActions(); Set<String> getExistedMetaActions();
void putPhaserRecord(Phaser phaser, ActionData actionData);
void removePhaserRecord(Phaser phaser);
ActionCore.PhaserRecord getPhaserRecord(String tendency, String source);
MetaAction loadMetaAction(@NonNull String actionKey);
} }

View File

@@ -1,20 +1,26 @@
package work.slhaf.partner.core.action; package work.slhaf.partner.core.action;
import cn.hutool.core.bean.BeanUtil;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore; import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod; import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod;
import work.slhaf.partner.common.vector.VectorClient; import work.slhaf.partner.common.vector.VectorClient;
import work.slhaf.partner.core.PartnerCore; import work.slhaf.partner.core.PartnerCore;
import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.cache.ActionCacheData; import work.slhaf.partner.core.action.entity.cache.ActionCacheData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
import work.slhaf.partner.core.action.exception.ActionDataNotFoundException;
import work.slhaf.partner.core.action.exception.MetaActionNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -44,7 +50,11 @@ public class ActionCore extends PartnerCore<ActionCore> {
private final ExecutorService platformExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private final ExecutorService platformExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); private final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
/**
* 已存在的行动程序,键为目录名,值为从目录加载的行动程序元信息
*/
private final LinkedHashMap<String, MetaActionInfo> existedMetaActions = new LinkedHashMap<>(); private final LinkedHashMap<String, MetaActionInfo> existedMetaActions = new LinkedHashMap<>();
private final List<PhaserRecord> phaserRecords = new ArrayList<>();
public ActionCore() throws IOException, ClassNotFoundException { public ActionCore() throws IOException, ClassNotFoundException {
new ActionWatchService(existedMetaActions, virtualExecutor).launch(); new ActionWatchService(existedMetaActions, virtualExecutor).launch();
@@ -67,7 +77,6 @@ public class ActionCore extends PartnerCore<ActionCore> {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@CapabilityMethod @CapabilityMethod
public synchronized void putPendingActions(String userId, ActionData actionData) { public synchronized void putPendingActions(String userId, ActionData actionData) {
pendingActions.computeIfAbsent(userId, k -> { pendingActions.computeIfAbsent(userId, k -> {
@@ -171,6 +180,48 @@ public class ActionCore extends PartnerCore<ActionCore> {
return existedMetaActions.keySet(); return existedMetaActions.keySet();
} }
@CapabilityMethod
public synchronized void putPhaserRecord(Phaser phaser, ActionData actionData) {
phaserRecords.add(new PhaserRecord(phaser, actionData));
}
@CapabilityMethod
public synchronized void removePhaserRecord(Phaser phaser) {
PhaserRecord remove = null;
for (PhaserRecord record : phaserRecords) {
if (record.phaser.equals(phaser)) {
remove = record;
}
}
if (remove != null) {
phaserRecords.remove(remove);
}
}
@CapabilityMethod
public PhaserRecord getPhaserRecord(String tendency, String source) {
for (PhaserRecord record : phaserRecords) {
ActionData data = record.actionData;
if (data.getTendency().equals(tendency) && data.getSource().equals(source)) {
return record;
}
}
throw new ActionDataNotFoundException("未找到对应的 Phaser 记录: tendency=" + tendency + ", source=" + source);
}
@CapabilityMethod
public MetaAction loadMetaAction(@NonNull String actionKey) {
for (MetaActionInfo actionInfo : existedMetaActions.values()) {
if (actionInfo.getKey().equals(actionKey)) {
MetaAction metaAction = new MetaAction();
BeanUtil.copyProperties(actionInfo, metaAction);
return metaAction;
}
}
throw new MetaActionNotFoundException("未找到对应的行动程序信息" + actionKey);
}
/** /**
* 命中缓存且评估通过时 * 命中缓存且评估通过时
* *
@@ -268,4 +319,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
public enum ExecutorType { public enum ExecutorType {
VIRTUAL, PLATFORM VIRTUAL, PLATFORM
} }
public record PhaserRecord(Phaser phaser, ActionData actionData) {
}
} }

View File

@@ -21,9 +21,9 @@ import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
@Slf4j @Slf4j
class ActionWatchService { class ActionWatchService {
private HashMap<Path, WatchKey> registeredPaths = new HashMap<>(); private final HashMap<Path, WatchKey> registeredPaths = new HashMap<>();
private LinkedHashMap<String, MetaActionInfo> existedMetaActions; private final LinkedHashMap<String, MetaActionInfo> existedMetaActions;
private ExecutorService virtualExecutor; private final ExecutorService virtualExecutor;
public ActionWatchService(LinkedHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService virtualExecutor) { public ActionWatchService(LinkedHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService virtualExecutor) {
this.existedMetaActions = existedMetaActions; this.existedMetaActions = existedMetaActions;
@@ -114,7 +114,7 @@ class ActionWatchService {
private boolean checkComplete(Path thisDir) { private boolean checkComplete(Path thisDir) {
File[] files = thisDir.toFile().listFiles(); File[] files = thisDir.toFile().listFiles();
if (files == null) { if (files == null) {
log.error("[{}]当前目录无法访问: [{}]", thisDir); log.error("当前目录无法访问: [{}]", thisDir);
return false; return false;
} }
boolean existedAction = false; boolean existedAction = false;

View File

@@ -17,6 +17,7 @@ public abstract class ActionData {
protected String result; protected String result;
protected String reason; protected String reason;
protected String description; protected String description;
protected String source;
public enum ActionStatus { public enum ActionStatus {
SUCCESS, FAILED, EXECUTING, WAITING, PREPARE SUCCESS, FAILED, EXECUTING, WAITING, PREPARE

View File

@@ -26,7 +26,7 @@ public class MetaAction implements Comparable<MetaAction>, Runnable {
/** /**
* 行动结果,包括执行状态和相应内容(执行结果或者错误信息) * 行动结果,包括执行状态和相应内容(执行结果或者错误信息)
*/ */
private Result result = new Result(); private final Result result = new Result();
/** /**
* 执行顺序,升序排列 * 执行顺序,升序排列
*/ */
@@ -49,7 +49,6 @@ public class MetaAction implements Comparable<MetaAction>, Runnable {
public void run() { public void run() {
File action = loadFromFile(); File action = loadFromFile();
if (!action.exists()) { if (!action.exists()) {
result = new Result();
result.setSuccess(false); result.setSuccess(false);
result.setData("Action file not found: " + action.getAbsolutePath()); result.setData("Action file not found: " + action.getAbsolutePath());
} }
@@ -60,7 +59,6 @@ public class MetaAction implements Comparable<MetaAction>, Runnable {
case SCRIPT -> executeScript(action); case SCRIPT -> executeScript(action);
} }
} catch (Exception e) { } catch (Exception e) {
result = new Result();
result.setSuccess(false); result.setSuccess(false);
result.setData(e.getMessage()); result.setData(e.getMessage());
} }
@@ -89,7 +87,7 @@ public class MetaAction implements Comparable<MetaAction>, Runnable {
@Data @Data
public static class Result { public static class Result {
private boolean success = true; private boolean success = true;
private String data; private String data = null;
} }
} }

View File

@@ -0,0 +1,13 @@
package work.slhaf.partner.core.action.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class ActionDataNotFoundException extends AgentRuntimeException {
public ActionDataNotFoundException(String message) {
super(message);
}
public ActionDataNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,13 @@
package work.slhaf.partner.core.action.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class MetaActionNotFoundException extends AgentRuntimeException {
public MetaActionNotFoundException(String message) {
super(message);
}
public MetaActionNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -488,7 +488,7 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
return targetParentNode; return targetParentNode;
} }
public void updateCacheCounter(List<String> topicPath) { private void updateCacheCounter(List<String> topicPath) {
ConcurrentHashMap<List<String>, Integer> memoryNodeCacheCounter = cache.memoryNodeCacheCounter; ConcurrentHashMap<List<String>, Integer> memoryNodeCacheCounter = cache.memoryNodeCacheCounter;
if (memoryNodeCacheCounter.containsKey(topicPath)) { if (memoryNodeCacheCounter.containsKey(topicPath)) {
Integer tempCount = memoryNodeCacheCounter.get(topicPath); Integer tempCount = memoryNodeCacheCounter.get(topicPath);

View File

@@ -12,7 +12,9 @@ import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ImmediateActionData; import work.slhaf.partner.core.action.entity.ImmediateActionData;
import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaAction;
import java.util.*; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
@@ -26,8 +28,6 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
private ExecutorService virtualExecutor; private ExecutorService virtualExecutor;
private ExecutorService platformExecutor; private ExecutorService platformExecutor;
private HashMap<String, PhaserActionChain> phaserRecorder = new HashMap<>();
@Init @Init
public void init() { public void init() {
virtualExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL); virtualExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
@@ -48,6 +48,9 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
LinkedHashMap<Integer, List<MetaAction>> actionChain = actionData.getActionChain(); LinkedHashMap<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
List<MetaAction> virtual = new ArrayList<>(); List<MetaAction> virtual = new ArrayList<>();
List<MetaAction> platform = new ArrayList<>(); List<MetaAction> platform = new ArrayList<>();
Phaser phaser = new Phaser();
phaser.register();
actionCapability.putPhaserRecord(phaser, actionData);
actionChain.forEach((k, v) -> { actionChain.forEach((k, v) -> {
for (MetaAction metaAction : v) { for (MetaAction metaAction : v) {
// 根据io类型放入合适的列表 // 根据io类型放入合适的列表
@@ -57,27 +60,27 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
platform.add(metaAction); platform.add(metaAction);
} }
} }
runGroupAction(virtual, platform, actionChain); runGroupAction(virtual, platform, actionChain, phaser);
virtual.clear();
platform.clear();
phaser.arriveAndAwaitAdvance();
}); });
actionCapability.removePhaserRecord(phaser);
}); });
} }
// 使用phaser来承担同组的动态任务新增 // 使用phaser来承担同组的动态任务新增
private void runGroupAction(List<MetaAction> virtual, List<MetaAction> platform, LinkedHashMap<Integer, List<MetaAction>> actionChain) { private void runGroupAction(List<MetaAction> virtual, List<MetaAction> platform, LinkedHashMap<Integer, List<MetaAction>> actionChain, Phaser phaser) {
Phaser phaser = new Phaser(); runGroupAction(virtual, virtualExecutor, phaser);
phaser.register(); runGroupAction(platform, platformExecutor, phaser);
String groupId = UUID.randomUUID().toString();
phaserRecorder.put(groupId, new PhaserActionChain(phaser, actionChain));
runGroupAction(virtual, virtualExecutor, actionChain, phaser);
runGroupAction(platform, platformExecutor, actionChain, phaser);
phaserRecorder.remove(groupId);
phaser.arriveAndAwaitAdvance();
} }
private void runGroupAction(List<MetaAction> actions, ExecutorService executor, LinkedHashMap<Integer, List<MetaAction>> actionChain, Phaser phaser) { private void runGroupAction(List<MetaAction> actions, ExecutorService executor, Phaser phaser) {
for (MetaAction action : actions) { for (MetaAction action : actions) {
phaser.register(); phaser.register();
executor.execute(() -> { executor.execute(() -> {
try {
MetaAction.Result result = action.getResult(); MetaAction.Result result = action.getResult();
do { do {
// 该循环对应LLM的调整参数后重试 // 该循环对应LLM的调整参数后重试
@@ -92,7 +95,9 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
action.run(); action.run();
} while (!result.isSuccess()); } while (!result.isSuccess());
//TODO 将执行结果写入特定对话角色记忆(cognationCore暴露方法) //TODO 将执行结果写入特定对话角色记忆(cognationCore暴露方法)
} finally {
phaser.arriveAndDeregister(); phaser.arriveAndDeregister();
}
}); });
} }
} }
@@ -112,6 +117,4 @@ public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionDa
return false; return false;
} }
private record PhaserActionChain(Phaser phaser, LinkedHashMap<Integer, List<MetaAction>> actionChain) {
}
} }

View File

@@ -0,0 +1,40 @@
package work.slhaf.partner.module.modules.action.identifier;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.module.common.module.PreRunningModule;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.util.HashMap;
/**
* 负责识别潜在的行动干预信息,作用于正在进行或已存在的行动池中内容
*/
@AgentModule(name = "action_identifier", order = 2)
public class ActionIdentifier extends PreRunningModule implements ActivateModel {
@Override
protected void doExecute(PartnerRunningFlowContext context) {
}
@Override
public String modelKey() {
return "";
}
@Override
public boolean withBasicPrompt() {
return false;
}
@Override
protected HashMap<String, String> getPromptDataMap(PartnerRunningFlowContext context) {
return null;
}
@Override
protected String moduleName() {
return "";
}
}

View File

@@ -3,7 +3,6 @@ package work.slhaf.partner.runtime.interaction;
import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter; import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter;
import work.slhaf.partner.runtime.interaction.data.PartnerInputData; import work.slhaf.partner.runtime.interaction.data.PartnerInputData;
import work.slhaf.partner.runtime.interaction.data.PartnerOutputData; import work.slhaf.partner.runtime.interaction.data.PartnerOutputData;
import work.slhaf.partner.runtime.interaction.data.SpecializedPartnerInputData;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
public class PartnerInteractionAdapter extends AgentInteractionAdapter<PartnerInputData, PartnerOutputData, PartnerRunningFlowContext> { public class PartnerInteractionAdapter extends AgentInteractionAdapter<PartnerInputData, PartnerOutputData, PartnerRunningFlowContext> {
@@ -34,10 +33,6 @@ public class PartnerInteractionAdapter extends AgentInteractionAdapter<PartnerIn
context.setSingle(inputData.isSingle()); context.setSingle(inputData.isSingle());
context.setPlatform(inputData.getPlatform()); context.setPlatform(inputData.getPlatform());
context.setInput(inputData.getContent()); context.setInput(inputData.getContent());
context.setType(inputData.getInputType());
if (inputData instanceof SpecializedPartnerInputData specializedData) {
context.setPayload(specializedData.getPayload());
}
return context; return context;
} }
} }

View File

@@ -13,7 +13,8 @@ import work.slhaf.partner.api.agent.runtime.interaction.AgentGateway;
import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter; import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter;
import work.slhaf.partner.common.config.PartnerAgentConfigManager; import work.slhaf.partner.common.config.PartnerAgentConfigManager;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor;
import work.slhaf.partner.runtime.interaction.data.*; import work.slhaf.partner.runtime.interaction.data.PartnerInputData;
import work.slhaf.partner.runtime.interaction.data.PartnerOutputData;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@@ -140,12 +141,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<Pa
@Override @Override
public void onMessage(WebSocket webSocket, String s) { public void onMessage(WebSocket webSocket, String s) {
JSONObject parsedObject = JSONObject.parseObject(s); PartnerInputData inputData = JSONObject.parseObject(s, PartnerInputData.class);
PartnerInputType inputType = parsedObject.getObject(SpecializedPayloadConstant.TYPE, PartnerInputType.class);
PartnerInputData inputData = switch (inputType) {
case NORMAL -> parsedObject.to(PartnerInputData.class);
case SYSTEM, ASSIST_REQUEST, REFLECTION -> parsedObject.to(SpecializedPartnerInputData.class);
};
userSessions.put(inputData.getUserInfo(), webSocket); // 注册连接 userSessions.put(inputData.getUserInfo(), webSocket); // 注册连接
receive(inputData); receive(inputData);
} }

View File

@@ -10,5 +10,4 @@ public class PartnerInputData extends AgentInputData {
protected String userNickName; protected String userNickName;
protected String platform; protected String platform;
protected boolean single; protected boolean single;
protected PartnerInputType inputType;
} }

View File

@@ -1,5 +0,0 @@
package work.slhaf.partner.runtime.interaction.data;
public enum PartnerInputType {
NORMAL, REFLECTION, ASSIST_REQUEST, SYSTEM
}

View File

@@ -1,12 +0,0 @@
package work.slhaf.partner.runtime.interaction.data;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Map;
@EqualsAndHashCode(callSuper = true)
@Data
public class SpecializedPartnerInputData extends PartnerInputData {
protected Map<String, String> payload;
}

View File

@@ -1,7 +0,0 @@
package work.slhaf.partner.runtime.interaction.data;
public class SpecializedPayloadConstant {
public static final String TASK_ID = "taskId";
public static final String ACTION_ID = "actionId";
public static final String TYPE = "inputType";
}

View File

@@ -5,7 +5,6 @@ import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import work.slhaf.partner.api.agent.runtime.interaction.flow.entity.RunningFlowContext; import work.slhaf.partner.api.agent.runtime.interaction.flow.entity.RunningFlowContext;
import work.slhaf.partner.module.common.entity.AppendPromptData; import work.slhaf.partner.module.common.entity.AppendPromptData;
import work.slhaf.partner.runtime.interaction.data.PartnerInputType;
import work.slhaf.partner.runtime.interaction.data.context.subcontext.CoreContext; import work.slhaf.partner.runtime.interaction.data.context.subcontext.CoreContext;
import work.slhaf.partner.runtime.interaction.data.context.subcontext.ModuleContext; import work.slhaf.partner.runtime.interaction.data.context.subcontext.ModuleContext;
@@ -13,7 +12,6 @@ import java.io.Serial;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@@ -32,9 +30,6 @@ public class PartnerRunningFlowContext extends RunningFlowContext {
protected LocalDateTime dateTime; protected LocalDateTime dateTime;
protected boolean single; protected boolean single;
protected PartnerInputType type;
protected Map<String, String> payload;
protected String input; protected String input;
protected CoreContext coreContext = new CoreContext(); protected CoreContext coreContext = new CoreContext();