refactor(Action): split ActionData into Action/ExecutableAction and unify scheduled action types

This commit is contained in:
2026-02-15 21:26:17 +08:00
parent 2b0682b9e0
commit 5f0165fa3a
21 changed files with 283 additions and 272 deletions

View File

@@ -3,7 +3,7 @@ package work.slhaf.partner.core.action;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;
import work.slhaf.partner.api.agent.factory.capability.annotation.Capability;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.PhaserRecord;
@@ -20,15 +20,15 @@ import java.util.concurrent.Phaser;
@Capability(value = "action")
public interface ActionCapability {
void putAction(@NonNull ActionData actionData);
void putAction(@NonNull ExecutableAction executableAction);
Set<ActionData> listActions(@Nullable ActionData.ActionStatus actionStatus, @Nullable String source);
Set<ExecutableAction> listActions(@Nullable ExecutableAction.Status status, @Nullable String source);
List<ActionData> popPendingAction(String userId);
List<ExecutableAction> popPendingAction(String userId);
List<ActionData> listPendingAction(String userId);
List<ExecutableAction> listPendingAction(String userId);
void putPendingActions(String userId, ActionData actionData);
void putPendingActions(String userId, ExecutableAction executableAction);
List<String> selectTendencyCache(String input);
@@ -36,7 +36,7 @@ public interface ActionCapability {
ExecutorService getExecutor(ActionCore.ExecutorType type);
PhaserRecord putPhaserRecord(Phaser phaser, ActionData actionData);
PhaserRecord putPhaserRecord(Phaser phaser, ExecutableAction executableAction);
void removePhaserRecord(Phaser phaser);
@@ -54,6 +54,6 @@ public interface ActionCapability {
RunnerClient runnerClient();
void handleInterventions(List<MetaIntervention> interventions, ActionData data);
void handleInterventions(List<MetaIntervention> interventions, ExecutableAction data);
}

View File

@@ -8,7 +8,7 @@ import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod;
import work.slhaf.partner.common.vector.VectorClient;
import work.slhaf.partner.core.PartnerCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.PhaserRecord;
@@ -37,12 +37,12 @@ public class ActionCore extends PartnerCore<ActionCore> {
/**
* 持久行动池
*/
private CopyOnWriteArraySet<ActionData> actionPool = new CopyOnWriteArraySet<>();
private CopyOnWriteArraySet<ExecutableAction> actionPool = new CopyOnWriteArraySet<>();
/**
* 待确认任务以userId区分不同用户因为需要跨请求确认
*/
private HashMap<String, List<ActionData>> pendingActions = new HashMap<>();
private HashMap<String, List<ExecutableAction>> pendingActions = new HashMap<>();
/**
* 语义缓存与行为倾向映射
@@ -71,45 +71,45 @@ public class ActionCore extends PartnerCore<ActionCore> {
private void setupShutdownHook() {
// 将执行中的行动状态置为失败
val executingActionSet = listActions(ActionData.ActionStatus.EXECUTING, null);
for (ActionData actionData : executingActionSet) {
actionData.setStatus(ActionData.ActionStatus.FAILED);
actionData.setResult("由于系统中断而失败");
val executingActionSet = listActions(ExecutableAction.Status.EXECUTING, null);
for (ExecutableAction executableAction : executingActionSet) {
executableAction.setStatus(ExecutableAction.Status.FAILED);
executableAction.setResult("由于系统中断而失败");
}
}
@CapabilityMethod
public void putAction(@NonNull ActionData actionData) {
actionPool.removeIf(data -> data.getUuid().equals(actionData.getUuid())); // 用来应对 ScheduledActionData 的重新排列
actionPool.add(actionData);
public void putAction(@NonNull ExecutableAction executableAction) {
actionPool.removeIf(data -> data.getUuid().equals(executableAction.getUuid())); // 用来应对 ScheduledActionData 的重新排列
actionPool.add(executableAction);
}
@CapabilityMethod
public Set<ActionData> listActions(@Nullable ActionData.ActionStatus actionStatus, @Nullable String source) {
public Set<ExecutableAction> listActions(@Nullable ExecutableAction.Status status, @Nullable String source) {
return actionPool.stream()
.filter(actionData -> actionStatus == null || actionData.getStatus().equals(actionStatus))
.filter(actionData -> status == null || actionData.getStatus().equals(status))
.filter(actionData -> source == null || actionData.getSource().equals(source))
.collect(Collectors.toSet());
}
@CapabilityMethod
public synchronized void putPendingActions(String userId, ActionData actionData) {
public synchronized void putPendingActions(String userId, ExecutableAction executableAction) {
pendingActions.computeIfAbsent(userId, k -> {
List<ActionData> temp = new ArrayList<>();
temp.add(actionData);
List<ExecutableAction> temp = new ArrayList<>();
temp.add(executableAction);
return temp;
});
}
@CapabilityMethod
public synchronized List<ActionData> popPendingAction(String userId) {
List<ActionData> infos = pendingActions.get(userId);
public synchronized List<ExecutableAction> popPendingAction(String userId) {
List<ExecutableAction> infos = pendingActions.get(userId);
pendingActions.remove(userId);
return infos;
}
@CapabilityMethod
public List<ActionData> listPendingAction(String userId) {
public List<ExecutableAction> listPendingAction(String userId) {
return pendingActions.get(userId);
}
@@ -180,8 +180,8 @@ public class ActionCore extends PartnerCore<ActionCore> {
}
@CapabilityMethod
public synchronized PhaserRecord putPhaserRecord(Phaser phaser, ActionData actionData) {
PhaserRecord record = new PhaserRecord(phaser, actionData);
public synchronized PhaserRecord putPhaserRecord(Phaser phaser, ExecutableAction executableAction) {
PhaserRecord record = new PhaserRecord(phaser, executableAction);
phaserRecords.add(record);
return record;
}
@@ -203,7 +203,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
@CapabilityMethod
public PhaserRecord getPhaserRecord(String tendency, String source) {
for (PhaserRecord record : phaserRecords) {
ActionData data = record.actionData();
ExecutableAction data = record.executableAction();
if (data.getTendency().equals(tendency) && data.getSource().equals(source)) {
return record;
}
@@ -255,19 +255,19 @@ public class ActionCore extends PartnerCore<ActionCore> {
}
@CapabilityMethod
public void handleInterventions(List<MetaIntervention> interventions, ActionData actionData) {
public void handleInterventions(List<MetaIntervention> interventions, ExecutableAction executableAction) {
// 加载数据
if (actionData == null) {
if (executableAction == null) {
return;
}
// 加锁确保同步
synchronized (actionData.getStatus()) {
applyInterventions(interventions, actionData);
synchronized (executableAction.getStatus()) {
applyInterventions(interventions, executableAction);
}
}
private void applyInterventions(List<MetaIntervention> interventions, ActionData actionData) {
private void applyInterventions(List<MetaIntervention> interventions, ExecutableAction executableAction) {
boolean rebuildCleanTag = false;
interventions.sort(Comparator.comparingInt(MetaIntervention::getOrder));
@@ -279,16 +279,16 @@ public class ActionCore extends PartnerCore<ActionCore> {
.toList();
switch (intervention.getType()) {
case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions);
case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions);
case InterventionType.DELETE -> handleDelete(actionData, intervention.getOrder(), actions);
case InterventionType.CANCEL -> handleCancel(actionData);
case InterventionType.APPEND -> handleAppend(executableAction, intervention.getOrder(), actions);
case InterventionType.INSERT -> handleInsert(executableAction, intervention.getOrder(), actions);
case InterventionType.DELETE -> handleDelete(executableAction, intervention.getOrder(), actions);
case InterventionType.CANCEL -> handleCancel(executableAction);
case InterventionType.REBUILD -> {
if (!rebuildCleanTag) {
cleanActionData(actionData);
cleanActionData(executableAction);
rebuildCleanTag = true;
}
handleRebuild(actionData, intervention.getOrder(), actions);
handleRebuild(executableAction, intervention.getOrder(), actions);
}
}
}
@@ -298,28 +298,28 @@ public class ActionCore extends PartnerCore<ActionCore> {
/**
* 在未进入执行阶段的行动单元组新增新的行动
*/
private void handleAppend(ActionData actionData, int order, List<MetaAction> actions) {
if (order <= actionData.getExecutingStage())
private void handleAppend(ExecutableAction executableAction, int order, List<MetaAction> actions) {
if (order <= executableAction.getExecutingStage())
return;
actionData.getActionChain().put(order, actions);
executableAction.getActionChain().put(order, actions);
}
/**
* 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动
*/
private void handleInsert(ActionData actionData, int order, List<MetaAction> actions) {
if (order < actionData.getExecutingStage())
private void handleInsert(ExecutableAction executableAction, int order, List<MetaAction> actions) {
if (order < executableAction.getExecutingStage())
return;
actionData.getActionChain().computeIfAbsent(order, k -> new ArrayList<>()).addAll(actions);
executableAction.getActionChain().computeIfAbsent(order, k -> new ArrayList<>()).addAll(actions);
}
private void handleDelete(ActionData actionData, int order, List<MetaAction> actions) {
if (order <= actionData.getExecutingStage())
private void handleDelete(ExecutableAction executableAction, int order, List<MetaAction> actions) {
if (order <= executableAction.getExecutingStage())
return;
Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
Map<Integer, List<MetaAction>> actionChain = executableAction.getActionChain();
if (actionChain.containsKey(order)) {
actionChain.get(order).removeAll(actions);
if (actionChain.get(order).isEmpty()) {
@@ -328,21 +328,21 @@ public class ActionCore extends PartnerCore<ActionCore> {
}
}
private void handleCancel(ActionData actionData) {
actionData.setStatus(ActionData.ActionStatus.FAILED);
actionData.setResult("行动取消");
private void handleCancel(ExecutableAction executableAction) {
executableAction.setStatus(ExecutableAction.Status.FAILED);
executableAction.setResult("行动取消");
}
private void handleRebuild(ActionData actionData, int order, List<MetaAction> actions) {
Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
private void handleRebuild(ExecutableAction executableAction, int order, List<MetaAction> actions) {
Map<Integer, List<MetaAction>> actionChain = executableAction.getActionChain();
actionChain.put(order, actions);
}
private void cleanActionData(ActionData actionData) {
actionData.getActionChain().clear();
actionData.setExecutingStage(0);
actionData.setStatus(ActionData.ActionStatus.PREPARE);
actionData.getHistory().clear();
private void cleanActionData(ExecutableAction executableAction) {
executableAction.getActionChain().clear();
executableAction.setExecutingStage(0);
executableAction.setStatus(ExecutableAction.Status.PREPARE);
executableAction.getHistory().clear();
}
/**

View File

@@ -4,15 +4,44 @@ import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.Histo
import java.time.ZonedDateTime
import java.util.*
/**
* 行动模块传递的行动数据包含行动uuid倾向状态行动链结果发起原因行动描述等信息
*/
sealed class ActionData {
sealed class Action {
/**
* 行动ID
*/
val uuid: String = UUID.randomUUID().toString()
/**
* 行动来源
*/
abstract val source: String
/**
* 行动原因
*/
abstract val reason: String
/**
* 行动描述
*/
abstract val description: String
}
sealed interface Scheduled {
val scheduleType: ScheduleType
val scheduleContent: String
enum class ScheduleType {
CYCLE,
ONCE
}
}
/**
* 行动模块传递的行动数据包含行动uuid倾向状态行动链结果发起原因行动描述等信息
*/
sealed class ExecutableAction : Action() {
/**
* 行动倾向
*/
@@ -21,7 +50,7 @@ sealed class ActionData {
/**
* 行动状态
*/
var status: ActionStatus = ActionStatus.PREPARE
var status: Status = Status.PREPARE
/**
* 行动链
@@ -45,22 +74,7 @@ sealed class ActionData {
*/
val additionalContext: MutableMap<Int, MutableList<String>> = mutableMapOf()
/**
* 行动原因
*/
abstract val reason: String
/**
* 行动描述
*/
abstract val description: String
/**
* 行动来源
*/
abstract val source: String
enum class ActionStatus {
enum class Status {
/**
* 执行成功
*/
@@ -89,17 +103,17 @@ sealed class ActionData {
}
/**
* 计划行动数据类继承自{@link ActionData}扩展了属性{@link ScheduledActionData#type}{@link ScheduledActionData#scheduleContent}用于标识计划类型(单次还是周期性任务)和计划内容
* 计划行动数据类继承自[Action]扩展了[Scheduled]相关调度属性用于标识计划类型(单次还是周期性任务)和计划内容
*/
data class ScheduledActionData(
data class ScheduledExecutableAction(
override val tendency: String,
override val actionChain: MutableMap<Int, MutableList<MetaAction>>,
override val reason: String,
override val description: String,
override val source: String,
val scheduleType: ScheduleType,
val scheduleContent: String,
) : ActionData() {
override val scheduleType: Scheduled.ScheduleType,
override val scheduleContent: String
) : ExecutableAction(), Scheduled {
val scheduleHistories = ArrayList<ScheduleHistory>()
@@ -116,12 +130,7 @@ data class ScheduledActionData(
}
}
status = ActionStatus.PREPARE
}
enum class ScheduleType {
CYCLE,
ONCE
status = Status.PREPARE
}
data class ScheduleHistory(
@@ -134,10 +143,10 @@ data class ScheduledActionData(
/**
* 即时行动数据类
*/
data class ImmediateActionData(
data class ImmediateExecutableAction(
override val tendency: String,
override val actionChain: MutableMap<Int, MutableList<MetaAction>>,
override val reason: String,
override val description: String,
override val source: String,
) : ActionData()
) : ExecutableAction()

View File

@@ -1,13 +1,13 @@
package work.slhaf.partner.core.action.entity;
import work.slhaf.partner.core.action.entity.ActionData.ActionStatus;
import work.slhaf.partner.core.action.entity.ExecutableAction.Status;
import java.util.concurrent.Phaser;
public record PhaserRecord(Phaser phaser, ActionData actionData) {
public record PhaserRecord(Phaser phaser, ExecutableAction executableAction) {
public void fail() {
actionData.setStatus(ActionStatus.FAILED);
executableAction.setStatus(Status.FAILED);
}
/**
@@ -15,8 +15,8 @@ public record PhaserRecord(Phaser phaser, ActionData actionData) {
* 同时循环检查进行阻塞
*/
public void interrupt() {
actionData.setStatus(ActionStatus.INTERRUPTED);
while (actionData().getStatus() == ActionStatus.INTERRUPTED) {
executableAction.setStatus(Status.INTERRUPTED);
while (executableAction().getStatus() == Status.INTERRUPTED) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
@@ -28,6 +28,6 @@ public record PhaserRecord(Phaser phaser, ActionData actionData) {
* 将状态重新设置为 EXECUTING ,恢复 interrupt 阻塞状态
*/
public void complete() {
actionData().setStatus(ActionStatus.EXECUTING);
executableAction().setStatus(Status.EXECUTING);
}
}

View File

@@ -7,9 +7,9 @@ import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ImmediateActionData;
import work.slhaf.partner.core.action.entity.ScheduledActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.core.action.entity.ImmediateExecutableAction;
import work.slhaf.partner.core.action.entity.ScheduledExecutableAction;
import work.slhaf.partner.module.common.module.PostRunningModule;
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput;
@@ -46,14 +46,14 @@ public class ActionDispatcher extends PostRunningModule {
// action理想做法是将执行工具做成执行链的形式模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力避免绑定固定流程
executor.execute(() -> {
String userId = context.getUserId();
val preparedActions = actionCapability.listActions(ActionData.ActionStatus.PREPARE, userId);
val preparedActions = actionCapability.listActions(ExecutableAction.Status.PREPARE, userId);
// 分类成PLANNING和IMMEDIATE两类
Set<ScheduledActionData> scheduledActions = new HashSet<>();
Set<ImmediateActionData> immediateActions = new HashSet<>();
for (ActionData preparedAction : preparedActions) {
if (preparedAction instanceof ScheduledActionData actionInfo) {
Set<ScheduledExecutableAction> scheduledActions = new HashSet<>();
Set<ImmediateExecutableAction> immediateActions = new HashSet<>();
for (ExecutableAction preparedAction : preparedActions) {
if (preparedAction instanceof ScheduledExecutableAction actionInfo) {
scheduledActions.add(actionInfo);
} else if (preparedAction instanceof ImmediateActionData actionInfo) {
} else if (preparedAction instanceof ImmediateExecutableAction actionInfo) {
immediateActions.add(actionInfo);
}
}

View File

@@ -10,7 +10,7 @@ import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunn
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.*;
import work.slhaf.partner.core.action.entity.ActionData.ActionStatus;
import work.slhaf.partner.core.action.entity.ExecutableAction.Status;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability;
@@ -69,22 +69,22 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
public Void execute(ActionExecutorInput input) {
val actions = input.getActions();
// 异步执行所有行动
for (ActionData actionData : actions) {
for (ExecutableAction executableAction : actions) {
platformExecutor.execute(() -> {
val source = actionData.getSource();
if (actionData.getStatus() != ActionStatus.PREPARE) {
val source = executableAction.getSource();
if (executableAction.getStatus() != Status.PREPARE) {
return;
}
val actionChain = actionData.getActionChain();
val actionChain = executableAction.getActionChain();
if (actionChain.isEmpty()) {
actionData.setStatus(ActionStatus.FAILED);
actionData.setResult("行动链为空");
executableAction.setStatus(Status.FAILED);
executableAction.setResult("行动链为空");
return;
}
// 注册执行中行动
val phaser = new Phaser();
val phaserRecord = actionCapability.putPhaserRecord(phaser, actionData);
actionData.setStatus(ActionStatus.EXECUTING);
val phaserRecord = actionCapability.putPhaserRecord(phaser, executableAction);
executableAction.setStatus(Status.EXECUTING);
// 开始执行
val stageCursor = new Object() {
@@ -120,13 +120,13 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
void update() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
actionData.setExecutingStage(orderList.get(stageCount));
executableAction.setExecutingStage(orderList.get(stageCount));
}
};
stageCursor.init();
do {
val metaActions = actionChain.get(actionData.getExecutingStage());
val metaActions = actionChain.get(executableAction.getExecutingStage());
val listeningRecord = executeAndListening(metaActions, phaserRecord, source);
phaser.awaitAdvance(listeningRecord.phase());
@@ -144,9 +144,9 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
try {
// 针对行动链进行修正,修正需要传入执行历史、行动目标等内容
// 如果后续运行 corrector 触发频率较高,可考虑增加重试机制
val correctorInput = assemblyHelper.buildCorrectorInput(actionData, source);
val correctorInput = assemblyHelper.buildCorrectorInput(executableAction, source);
val correctorResult = actionCorrector.execute(correctorInput);
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), actionData);
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), executableAction);
} catch (Exception ignored) {
}
@@ -157,13 +157,13 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
// 结束
actionCapability.removePhaserRecord(phaser);
if (actionData.getStatus() != ActionStatus.FAILED) {
if (executableAction.getStatus() != Status.FAILED) {
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
if (actionData instanceof ScheduledActionData scheduledActionData) {
if (executableAction instanceof ScheduledExecutableAction scheduledActionData) {
scheduledActionData.recordAndReset();
actionScheduler.execute(Set.of(scheduledActionData));
} else {
actionData.setStatus(ActionStatus.SUCCESS);
executableAction.setStatus(Status.SUCCESS);
}
// TODO 执行过后需要回写至任务上下文recentCompletedTask同时触发自对话信号进行确认并记录以及是否通知用户触发与否需要机制进行匹配在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等)
@@ -232,7 +232,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
try {
val result = metaAction.getResult();
do {
val actionData = phaserRecord.actionData();
val actionData = phaserRecord.executableAction();
val executingStage = actionData.getExecutingStage();
val historyActionResults = actionData.getHistory().get(executingStage);
val additionalContext = actionData.getAdditionalContext().get(executingStage);
@@ -307,14 +307,14 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
return input;
}
private CorrectorInput buildCorrectorInput(ActionData actionData, String source) {
private CorrectorInput buildCorrectorInput(ExecutableAction executableAction, String source) {
return CorrectorInput.builder()
.tendency(actionData.getTendency())
.source(actionData.getSource())
.reason(actionData.getReason())
.description(actionData.getDescription())
.history(actionData.getHistory().get(actionData.getExecutingStage()))
.status(actionData.getStatus())
.tendency(executableAction.getTendency())
.source(executableAction.getSource())
.reason(executableAction.getReason())
.description(executableAction.getDescription())
.history(executableAction.getHistory().get(executableAction.getExecutingStage()))
.status(executableAction.getStatus())
.recentMessages(cognationCapability.getChatMessages())
.activatedSlices(memoryCapability.getActivatedSlices(source))
.build();

View File

@@ -1,5 +1,5 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity
import work.slhaf.partner.core.action.entity.ActionData
import work.slhaf.partner.core.action.entity.ExecutableAction
data class ActionExecutorInput(val actions: Set<ActionData>)
data class ActionExecutorInput(val actions: Set<ExecutableAction>)

View File

@@ -3,7 +3,7 @@ package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Builder;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import java.util.List;
@@ -17,7 +17,7 @@ public class CorrectorInput {
private String description;
private List<HistoryAction> history;
private ActionData.ActionStatus status;
private ExecutableAction.Status status;
private List<Message> recentMessages;
private List<EvaluatedSlice> activatedSlices;

View File

@@ -17,8 +17,9 @@ import work.slhaf.partner.api.agent.factory.module.annotation.Init
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule
import work.slhaf.partner.core.action.ActionCapability
import work.slhaf.partner.core.action.entity.ActionData
import work.slhaf.partner.core.action.entity.ScheduledActionData
import work.slhaf.partner.core.action.entity.ExecutableAction
import work.slhaf.partner.core.action.entity.Scheduled
import work.slhaf.partner.core.action.entity.ScheduledExecutableAction
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput
import java.io.Closeable
@@ -29,7 +30,7 @@ import java.util.stream.Collectors
import kotlin.jvm.optionals.getOrNull
@AgentSubModule
class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>() {
class ActionScheduler : AgentRunningSubModule<Set<ScheduledExecutableAction>, Void>() {
@InjectCapability
private lateinit var actionCapability: ActionCapability
@@ -48,15 +49,15 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
@Init
fun init() {
val listScheduledActions: () -> Set<ScheduledActionData> = {
val listScheduledActions: () -> Set<ScheduledExecutableAction> = {
actionCapability.listActions(null, null)
.stream()
.filter { it is ScheduledActionData }
.map { it as ScheduledActionData }
.filter { it is ScheduledExecutableAction }
.map { it as ScheduledExecutableAction }
.collect(Collectors.toSet())
}
val onTrigger: (Set<ScheduledActionData>) -> Unit = { actionExecutor.execute(ActionExecutorInput(it)) }
val onTrigger: (Set<ScheduledExecutableAction>) -> Unit = { actionExecutor.execute(ActionExecutorInput(it)) }
timeWheel = TimeWheel(listScheduledActions, onTrigger)
@@ -71,7 +72,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
// TODO 如果要将 TimeWheel 作为 Agent 内部的循环周期,那么不依赖 Action 链路的内容,将不适合参与到 ActionExecutor因此需要将 ActionData 的触发类型进行分类SILENT TRIGGER仅限更新 ActionData 内部状态,通过属性 copy 完成不开放过多权限防止序列化失败、EXECUTOR、AGENT TURN。考虑将时间轮下放至 ActionCapability作为底层行动语义的一部分
override fun execute(scheduledActionDataSet: Set<ScheduledActionData>?): Void? {
override fun execute(scheduledActionDataSet: Set<ScheduledExecutableAction>?): Void? {
schedulerScope.launch {
scheduledActionDataSet?.run {
for (scheduledActionData in scheduledActionDataSet) {
@@ -85,12 +86,12 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
private class TimeWheel(
val listScheduledActions: () -> Set<ScheduledActionData>,
val onTrigger: (toTrigger: Set<ScheduledActionData>) -> Unit
val listScheduledActions: () -> Set<ScheduledExecutableAction>,
val onTrigger: (toTrigger: Set<ScheduledExecutableAction>) -> Unit
) : Closeable {
private val actionsGroupByHour = Array<MutableSet<ScheduledActionData>>(24) { mutableSetOf() }
private val wheel = Array<MutableSet<ScheduledActionData>>(60 * 60) { mutableSetOf() }
private val actionsGroupByHour = Array<MutableSet<ScheduledExecutableAction>>(24) { mutableSetOf() }
private val wheel = Array<MutableSet<ScheduledExecutableAction>>(60 * 60) { mutableSetOf() }
private var recordHour: Int = -1
private var recordDay: Int = -1
private val state = MutableStateFlow(WheelState.SLEEPING)
@@ -109,8 +110,8 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
launchWheel()
}
suspend fun schedule(actionData: ScheduledActionData) {
if (actionData.status != ActionData.ActionStatus.PREPARE) {
suspend fun schedule(actionData: ScheduledExecutableAction) {
if (actionData.status != ExecutableAction.Status.PREPARE) {
return
}
@@ -141,9 +142,9 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
private fun launchWheel() {
fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set<ScheduledActionData>? {
fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set<ScheduledExecutableAction>? {
if (tick > previousTick) {
val toTrigger = mutableSetOf<ScheduledActionData>()
val toTrigger = mutableSetOf<ScheduledExecutableAction>()
for (i in previousTick..tick) {
val bucket = wheel[i]
if (bucket.isNotEmpty()) {
@@ -178,7 +179,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
nextTickNanos += step.toLong() * 1_000_000_000L
var shouldBreak = false
var toTrigger: Set<ScheduledActionData>? = null
var toTrigger: Set<ScheduledExecutableAction>? = null
checkThenExecute(false) {
if (it.hour != launchingHour) {
@@ -265,9 +266,9 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
suspend fun checkThenExecute(finallyToExecute: Boolean = true, then: (currentTime: ZonedDateTime) -> Unit) =
wheelActionsLock.withLock {
fun loadActions(
source: Set<ScheduledActionData>,
source: Set<ScheduledExecutableAction>,
now: ZonedDateTime,
load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit,
load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledExecutableAction) -> Unit,
repair: () -> Unit
) {
val runLoading = {
@@ -291,7 +292,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
fun loadHourActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData ->
val load: (ZonedDateTime, ScheduledExecutableAction) -> Unit = { latestExecutionTime, actionData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(actionData)
log.debug("Action loaded to hour: {}", actionData)
@@ -307,7 +308,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
fun loadDayActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
val load: (ZonedDateTime, ScheduledExecutableAction) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
log.debug("Action loaded to day: {}", actionData)
}
@@ -347,12 +348,12 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
private fun parseToZonedDateTime(
scheduleType: ScheduledActionData.ScheduleType,
scheduleType: Scheduled.ScheduleType,
scheduleContent: String,
now: ZonedDateTime
): ZonedDateTime? {
return when (scheduleType) {
ScheduledActionData.ScheduleType.CYCLE
Scheduled.ScheduleType.CYCLE
-> {
val cron = try {
cronParser.parse(scheduleContent).validate()
@@ -363,7 +364,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
executionTime.nextExecution(now).getOrNull()
}
ScheduledActionData.ScheduleType.ONCE -> {
Scheduled.ScheduleType.ONCE -> {
val executionTime = try {
ZonedDateTime.parse(scheduleContent)
} catch (_: Exception) {
@@ -379,7 +380,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
private fun logFailedStatus(actionData: ScheduledActionData) {
private fun logFailedStatus(actionData: ScheduledExecutableAction) {
log.warn(
"行动未加载uuid: {}, source: {}, tendency: {}, scheduleContent: {}",
actionData.uuid,

View File

@@ -9,7 +9,7 @@ import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.core.action.entity.PhaserRecord;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability;
@@ -96,7 +96,7 @@ public class ActionInterventor extends PreRunningModule implements ActivateModel
}
private void handleInterventions(List<EvaluatedInterventionData> interventionDataList, Map<String, ActionData> interventionDataMap) {
private void handleInterventions(List<EvaluatedInterventionData> interventionDataList, Map<String, ExecutableAction> interventionDataMap) {
val executor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
executor.execute(() -> {
for (EvaluatedInterventionData interventionData : interventionDataList) {
@@ -175,8 +175,8 @@ public class ActionInterventor extends PreRunningModule implements ActivateModel
recognizerInput.setUserDialogMapStr(memoryCapability.getUserDialogMapStr(userId));
// 参考的对话列表大小或需调整
recognizerInput.setRecentMessages(cognationCapability.getChatMessages());
recognizerInput.setExecutingActions(actionCapability.listPhaserRecords().stream().map(PhaserRecord::actionData).toList());
recognizerInput.setPreparedActions(actionCapability.listActions(ActionData.ActionStatus.PREPARE, userId).stream().toList());
recognizerInput.setExecutingActions(actionCapability.listPhaserRecords().stream().map(PhaserRecord::executableAction).toList());
recognizerInput.setPreparedActions(actionCapability.listActions(ExecutableAction.Status.PREPARE, userId).stream().toList());
return recognizerInput;
}

View File

@@ -10,7 +10,7 @@ import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore.ExecutorType;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorInput;
import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult;
@@ -36,8 +36,8 @@ public class InterventionEvaluator extends AgentRunningSubModule<EvaluatorInput,
public EvaluatorResult execute(EvaluatorInput input) {
// 获取必须数据
ExecutorService executor = actionCapability.getExecutor(ExecutorType.VIRTUAL);
Map<String, ActionData> executingInterventions = input.getExecutingInterventions();
Map<String, ActionData> preparedInterventions = input.getPreparedInterventions();
Map<String, ExecutableAction> executingInterventions = input.getExecutingInterventions();
Map<String, ExecutableAction> preparedInterventions = input.getPreparedInterventions();
CountDownLatch latch = new CountDownLatch(executingInterventions.size() + preparedInterventions.size());
// 创建结果容器
@@ -58,7 +58,7 @@ public class InterventionEvaluator extends AgentRunningSubModule<EvaluatorInput,
return result;
}
private void evaluateIntervention(List<EvaluatedInterventionData> evaluatedDataList, Map<String, ActionData> interventionMap, EvaluatorInput input, ExecutorService executor, CountDownLatch latch) {
private void evaluateIntervention(List<EvaluatedInterventionData> evaluatedDataList, Map<String, ExecutableAction> interventionMap, EvaluatorInput input, ExecutorService executor, CountDownLatch latch) {
interventionMap.forEach((tendency, actionData) -> executor.execute(() -> {
try {
String prompt = buildPrompt(input.getRecentMessages(), input.getActivatedSlices(), actionData, tendency);
@@ -78,12 +78,12 @@ public class InterventionEvaluator extends AgentRunningSubModule<EvaluatorInput,
}
private String buildPrompt(List<Message> recentMessages, List<EvaluatedSlice> activatedSlices,
ActionData actionData, String tendency) {
ExecutableAction executableAction, String tendency) {
JSONObject json = new JSONObject();
json.put("干预倾向", tendency);
json.putArray("近期对话").addAll(recentMessages);
json.putArray("参考记忆").addAll(activatedSlices);
json.put("将干预的行动", JSONObject.toJSONString(actionData));
json.put("将干预的行动", JSONObject.toJSONString(executableAction));
return json.toJSONString();
}

View File

@@ -2,7 +2,7 @@ package work.slhaf.partner.module.modules.action.interventor.evaluator.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import java.util.List;
@@ -10,8 +10,8 @@ import java.util.Map;
@Data
public class EvaluatorInput {
private Map<String, ActionData> executingInterventions;
private Map<String, ActionData> preparedInterventions;
private Map<String, ExecutableAction> executingInterventions;
private Map<String, ExecutableAction> preparedInterventions;
private List<EvaluatedSlice> activatedSlices;
private List<Message> recentMessages;
}

View File

@@ -9,7 +9,7 @@ import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunn
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.MetaRecognizerResult;
import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.RecognizerInput;
import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.RecognizerResult;
@@ -30,14 +30,14 @@ public class InterventionRecognizer extends AgentRunningSubModule<RecognizerInpu
public RecognizerResult execute(RecognizerInput input) {
// 获取必须数据
ExecutorService executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
List<ActionData> executingActions = input.getExecutingActions();
List<ActionData> preparedActions = input.getPreparedActions();
List<ExecutableAction> executingActions = input.getExecutingActions();
List<ExecutableAction> preparedActions = input.getPreparedActions();
CountDownLatch countDownLatch = new CountDownLatch(executingActions.size() + preparedActions.size());
// 创建结果容器
RecognizerResult recognizerResult = new RecognizerResult();
Map<String, ActionData> executingInterventions = recognizerResult.getExecutingInterventions();
Map<String, ActionData> preparedInterventions = recognizerResult.getPreparedInterventions();
Map<String, ExecutableAction> executingInterventions = recognizerResult.getExecutingInterventions();
Map<String, ExecutableAction> preparedInterventions = recognizerResult.getPreparedInterventions();
// 执行识别操作
recognizeIntervention(executingInterventions, executingActions, executor, input, countDownLatch);
@@ -51,8 +51,8 @@ public class InterventionRecognizer extends AgentRunningSubModule<RecognizerInpu
return recognizerResult;
}
private void recognizeIntervention(Map<String, ActionData> interventionsMap, List<ActionData> actions, ExecutorService executor, RecognizerInput input, CountDownLatch latch) {
for (ActionData data : actions) {
private void recognizeIntervention(Map<String, ExecutableAction> interventionsMap, List<ExecutableAction> actions, ExecutorService executor, RecognizerInput input, CountDownLatch latch) {
for (ExecutableAction data : actions) {
executor.execute(() -> {
try {
String prompt = buildPrompt(data, input);
@@ -72,15 +72,15 @@ public class InterventionRecognizer extends AgentRunningSubModule<RecognizerInpu
}
}
private String buildPrompt(ActionData actionData, RecognizerInput input) {
private String buildPrompt(ExecutableAction executableAction, RecognizerInput input) {
JSONObject json = new JSONObject();
JSONObject actionInfo = json.putObject("行动信息");
actionInfo.put("行动倾向", actionData.getTendency());
actionInfo.put("行动原因", actionData.getReason());
actionInfo.put("行动描述", actionData.getDescription());
actionInfo.put("行动状态", actionData.getStatus());
actionInfo.put("行动来源", actionData.getSource());
actionInfo.put("行动倾向", executableAction.getTendency());
actionInfo.put("行动原因", executableAction.getReason());
actionInfo.put("行动描述", executableAction.getDescription());
actionInfo.put("行动状态", executableAction.getStatus());
actionInfo.put("行动来源", executableAction.getSource());
JSONObject interactionInfo = json.putObject("交互信息");
interactionInfo.put("用户输入", input.getInput());

View File

@@ -2,7 +2,7 @@ package work.slhaf.partner.module.modules.action.interventor.recognizer.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import java.util.List;
@@ -17,6 +17,6 @@ public class RecognizerInput {
/**
* 正在执行的行动-Phaser记录列表在Recognizer中结合本次输入并发评估(考虑到不同行动链之间对LLM的影响)
*/
private List<ActionData> executingActions;
private List<ActionData> preparedActions;
private List<ExecutableAction> executingActions;
private List<ExecutableAction> preparedActions;
}

View File

@@ -1,7 +1,7 @@
package work.slhaf.partner.module.modules.action.interventor.recognizer.entity;
import lombok.Data;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import java.util.HashMap;
import java.util.Map;
@@ -17,7 +17,7 @@ public class RecognizerResult {
* <br/>
* value: 干预倾向将作用的行动数据
*/
private Map<String, ActionData> executingInterventions = new HashMap<>();
private Map<String, ExecutableAction> executingInterventions = new HashMap<>();
/**
* <h4>将被干预的‘等待中行动’</h4>
@@ -25,5 +25,5 @@ public class RecognizerResult {
* <br/>
* value: 干预倾向将作用的行动数据
*/
private Map<String, ActionData> preparedInterventions = new HashMap<>();
private Map<String, ExecutableAction> preparedInterventions = new HashMap<>();
}

View File

@@ -143,21 +143,21 @@ public class ActionPlanner extends PreRunningModule {
if (uuids == null) {
return;
}
List<ActionData> pendingActions = actionCapability.popPendingAction(context.getUserId());
for (ActionData actionData : pendingActions) {
if (uuids.contains(actionData.getUuid())) {
actionCapability.putAction(actionData);
List<ExecutableAction> pendingActions = actionCapability.popPendingAction(context.getUserId());
for (ExecutableAction executableAction : pendingActions) {
if (uuids.contains(executableAction.getUuid())) {
actionCapability.putAction(executableAction);
}
}
}
private void putActionData(List<EvaluatorResult> evaluatorResults, PartnerRunningFlowContext context) {
for (EvaluatorResult evaluatorResult : evaluatorResults) {
ActionData actionData = assemblyHelper.buildActionData(evaluatorResult, context.getUserId());
ExecutableAction executableAction = assemblyHelper.buildActionData(evaluatorResult, context.getUserId());
if (evaluatorResult.isNeedConfirm()) {
actionCapability.putPendingActions(context.getUserId(), actionData);
actionCapability.putPendingActions(context.getUserId(), executableAction);
} else {
actionCapability.putAction(actionData);
actionCapability.putAction(executableAction);
}
}
}
@@ -172,18 +172,18 @@ public class ActionPlanner extends PreRunningModule {
}
private void setupPendingActions(HashMap<String, String> map, String userId) {
List<ActionData> actionData = actionCapability.listPendingAction(userId);
if (actionData == null || actionData.isEmpty()) {
List<ExecutableAction> executableActionData = actionCapability.listPendingAction(userId);
if (executableActionData == null || executableActionData.isEmpty()) {
map.put("[待确认行动] <等待用户确认的行动信息>", "无待确认行动");
return;
}
for (int i = 0; i < actionData.size(); i++) {
map.put("[待确认行动 " + (i + 1) + " ] <等待用户确认的行动信息>", generateActionStr(actionData.get(i)));
for (int i = 0; i < executableActionData.size(); i++) {
map.put("[待确认行动 " + (i + 1) + " ] <等待用户确认的行动信息>", generateActionStr(executableActionData.get(i)));
}
}
private void setupPreparedActions(HashMap<String, String> map, String userId) {
val preparedActions = actionCapability.listActions(ActionData.ActionStatus.PREPARE, userId).stream().toList();
val preparedActions = actionCapability.listActions(ExecutableAction.Status.PREPARE, userId).stream().toList();
if (preparedActions.isEmpty()) {
map.put("[预备行动] <预备执行或放入计划池的行动信息>", "无预备行动");
return;
@@ -193,10 +193,10 @@ public class ActionPlanner extends PreRunningModule {
}
}
private String generateActionStr(ActionData actionData) {
return "<行动倾向>" + " : " + actionData.getTendency() +
"<行动原因>" + " : " + actionData.getReason() +
"<工具描述>" + " : " + actionData.getDescription();
private String generateActionStr(ExecutableAction executableAction) {
return "<行动倾向>" + " : " + executableAction.getTendency() +
"<行动原因>" + " : " + executableAction.getReason() +
"<工具描述>" + " : " + executableAction.getDescription();
}
@Override
@@ -231,10 +231,10 @@ public class ActionPlanner extends PreRunningModule {
return input;
}
private ActionData buildActionData(EvaluatorResult evaluatorResult, String userId) {
private ExecutableAction buildActionData(EvaluatorResult evaluatorResult, String userId) {
Map<Integer, List<MetaAction>> actionChain = getActionChain(evaluatorResult);
return switch (evaluatorResult.getType()) {
case PLANNING -> new ScheduledActionData(
case PLANNING -> new ScheduledExecutableAction(
evaluatorResult.getTendency(),
actionChain,
evaluatorResult.getReason(),
@@ -243,7 +243,7 @@ public class ActionPlanner extends PreRunningModule {
evaluatorResult.getScheduleType(),
evaluatorResult.getScheduleContent()
);
case IMMEDIATE -> new ImmediateActionData(
case IMMEDIATE -> new ImmediateExecutableAction(
evaluatorResult.getTendency(),
actionChain,
evaluatorResult.getReason(),
@@ -332,8 +332,8 @@ public class ActionPlanner extends PreRunningModule {
private ConfirmerInput buildConfirmerInput(PartnerRunningFlowContext context) {
ConfirmerInput confirmerInput = new ConfirmerInput();
confirmerInput.setInput(context.getInput());
List<ActionData> pendingActions = actionCapability.listPendingAction(context.getUserId());
confirmerInput.setActionData(pendingActions);
List<ExecutableAction> pendingActions = actionCapability.listPendingAction(context.getUserId());
confirmerInput.setExecutableActionData(pendingActions);
return confirmerInput;
}
}

View File

@@ -11,7 +11,7 @@ import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerInput;
import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerResult;
@@ -30,23 +30,23 @@ public class ActionConfirmer extends AgentRunningSubModule<ConfirmerInput, Confi
@Override
public ConfirmerResult execute(ConfirmerInput data) {
List<ActionData> actionDataList = data.getActionData();
List<ExecutableAction> executableActionList = data.getExecutableActionData();
ExecutorService executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
CountDownLatch latch = new CountDownLatch(actionDataList.size());
CountDownLatch latch = new CountDownLatch(executableActionList.size());
ConfirmerResult result = new ConfirmerResult();
List<String> uuids = result.getUuids();
for (ActionData actionData : actionDataList) {
for (ExecutableAction executableAction : executableActionList) {
executor.execute(() -> {
try {
String prompt = buildPrompt(actionData, data.getInput(), data.getRecentMessages());
String prompt = buildPrompt(executableAction, data.getInput(), data.getRecentMessages());
ChatResponse response = this.singleChat(prompt);
JSONObject tempResult = JSONObject.parseObject(extractJson(response.getMessage()));
if (tempResult.getBoolean("confirmed")) {
actionData.setStatus(ActionData.ActionStatus.PREPARE);
executableAction.setStatus(ExecutableAction.Status.PREPARE);
synchronized (uuids) {
uuids.add(actionData.getUuid());
uuids.add(executableAction.getUuid());
}
}
} finally {
@@ -62,7 +62,7 @@ public class ActionConfirmer extends AgentRunningSubModule<ConfirmerInput, Confi
return result;
}
private String buildPrompt(ActionData data, String input, List<Message> recentMessages) {
private String buildPrompt(ExecutableAction data, String input, List<Message> recentMessages) {
JSONObject prompt = new JSONObject();
prompt.put("[用户输入]", input);

View File

@@ -2,13 +2,13 @@ package work.slhaf.partner.module.modules.action.planner.confirmer.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ExecutableAction;
import java.util.List;
@Data
public class ConfirmerInput {
private String input;
private List<ActionData> actionData;
private List<ExecutableAction> executableActionData;
private List<Message> recentMessages;
}

View File

@@ -1,7 +1,7 @@
package work.slhaf.partner.module.modules.action.planner.evaluator.entity;
import lombok.Data;
import work.slhaf.partner.core.action.entity.ScheduledActionData;
import work.slhaf.partner.core.action.entity.ScheduledExecutableAction;
import java.util.List;
import java.util.Map;
@@ -12,7 +12,7 @@ public class EvaluatorResult {
private boolean needConfirm;
private ActionType type;
private String scheduleContent;
private ScheduledActionData.ScheduleType scheduleType;
private ScheduledExecutableAction.ScheduleType scheduleType;
private Map<Integer, List<String>> primaryActionChain;
private String tendency;
private String reason;

View File

@@ -72,7 +72,7 @@ class ActionExecutorTest {
void setUp() {
lenient().when(cognationCapability.getChatMessages()).thenReturn(Collections.emptyList());
lenient().when(memoryCapability.getActivatedSlices(anyString())).thenReturn(Collections.emptyList());
lenient().when(actionCapability.putPhaserRecord(any(Phaser.class), any(ActionData.class)))
lenient().when(actionCapability.putPhaserRecord(any(Phaser.class), any(ExecutableAction.class)))
.thenAnswer(inv -> new PhaserRecord(inv.getArgument(0), inv.getArgument(1)));
lenient().when(actionCapability.loadMetaActionInfo(anyString())).thenAnswer(inv -> {
MetaActionInfo info = new MetaActionInfo();
@@ -92,7 +92,7 @@ class ActionExecutorTest {
ExecutorService directExecutor = new DirectExecutorService();
stubExecutors(directExecutor, directExecutor);
ImmediateActionData actionData = buildActionData(singleStageChain(false));
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult extractorResult = new ExtractorResult();
@@ -109,7 +109,7 @@ class ActionExecutorTest {
verify(runnerClient, times(1)).submit(any(MetaAction.class));
verify(actionCapability, times(1)).removePhaserRecord(any(Phaser.class));
assertEquals(ActionData.ActionStatus.SUCCESS, actionData.getStatus());
assertEquals(ExecutableAction.Status.SUCCESS, actionData.getStatus());
assertEquals(1, actionData.getHistory().get(0).size());
}
@@ -119,14 +119,14 @@ class ActionExecutorTest {
ExecutorService directExecutor = new DirectExecutorService();
stubExecutors(directExecutor, directExecutor);
ImmediateActionData actionData = buildActionData(singleStageChain(false));
actionData.setStatus(ActionData.ActionStatus.EXECUTING);
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
actionData.setStatus(ExecutableAction.Status.EXECUTING);
ActionExecutorInput input = buildInput("u1", actionData);
actionExecutor.init();
actionExecutor.execute(input);
verify(actionCapability, never()).putPhaserRecord(any(Phaser.class), any(ActionData.class));
verify(actionCapability, never()).putPhaserRecord(any(Phaser.class), any(ExecutableAction.class));
verify(runnerClient, never()).submit(any(MetaAction.class));
}
@@ -140,7 +140,7 @@ class ActionExecutorTest {
Map<Integer, List<MetaAction>> chain = new HashMap<>();
chain.put(0, List.of(buildMetaAction("a1", false)));
chain.put(1, List.of(buildMetaAction("a2", false)));
ImmediateActionData actionData = buildActionData(chain);
ImmediateExecutableAction actionData = buildActionData(chain);
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult extractorResult = new ExtractorResult();
@@ -159,7 +159,7 @@ class ActionExecutorTest {
verify(runnerClient, timeout(5000).times(2)).submit(any(MetaAction.class));
verify(actionCorrector, timeout(5000).times(2)).execute(any());
assertEquals(2, actionData.getHistory().size());
assertEquals(ActionData.ActionStatus.SUCCESS, actionData.getStatus());
assertEquals(ExecutableAction.Status.SUCCESS, actionData.getStatus());
}
// 场景5B4.2。目的:验证 IO 行动使用虚拟线程池。
@@ -169,7 +169,7 @@ class ActionExecutorTest {
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateActionData actionData = buildActionData(singleStageChain(true));
ImmediateExecutableAction actionData = buildActionData(singleStageChain(true));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult extractorResult = new ExtractorResult();
@@ -195,7 +195,7 @@ class ActionExecutorTest {
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateActionData actionData = buildActionData(singleStageChain(false));
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult fail = new ExtractorResult();
@@ -233,7 +233,7 @@ class ActionExecutorTest {
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateActionData actionData = buildActionData(singleStageChain(false));
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult fail = new ExtractorResult();
@@ -264,7 +264,7 @@ class ActionExecutorTest {
ExecutorService virtualExecutor = Executors.newCachedThreadPool();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateActionData actionData = buildActionData(singleStageChain(false));
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult fail = new ExtractorResult();
@@ -291,13 +291,13 @@ class ActionExecutorTest {
ExecutorService resumeExecutor = Executors.newSingleThreadExecutor();
resumeExecutor.execute(() -> {
while (actionData.getStatus() != ActionData.ActionStatus.INTERRUPTED) {
while (actionData.getStatus() != ExecutableAction.Status.INTERRUPTED) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
actionData.setStatus(ActionData.ActionStatus.EXECUTING);
actionData.setStatus(ExecutableAction.Status.EXECUTING);
});
actionExecutor.init();
@@ -318,7 +318,7 @@ class ActionExecutorTest {
ExecutorService virtualExecutor = Executors.newCachedThreadPool();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateActionData actionData = buildActionData(singleStageChain(false));
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult ok = new ExtractorResult();
@@ -351,7 +351,7 @@ class ActionExecutorTest {
ExecutorService virtualExecutor = Executors.newCachedThreadPool();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateActionData actionData = buildActionData(new HashMap<>());
ImmediateExecutableAction actionData = buildActionData(new HashMap<>());
ActionExecutorInput input = buildInput("u1", actionData);
actionExecutor.init();
@@ -369,12 +369,12 @@ class ActionExecutorTest {
when(actionCapability.runnerClient()).thenReturn(runnerClient);
}
private ActionExecutorInput buildInput(String userId, ImmediateActionData actionData) {
private ActionExecutorInput buildInput(String userId, ImmediateExecutableAction actionData) {
return new ActionExecutorInput(Set.of(actionData));
}
private ImmediateActionData buildActionData(Map<Integer, List<MetaAction>> actionChain) {
val immediateActionData = new ImmediateActionData(
private ImmediateExecutableAction buildActionData(Map<Integer, List<MetaAction>> actionChain) {
val immediateActionData = new ImmediateExecutableAction(
"tendency",
actionChain,
"reason",

View File

@@ -14,8 +14,9 @@ import org.mockito.Mockito.verify
import org.mockito.junit.jupiter.MockitoExtension
import org.slf4j.LoggerFactory
import work.slhaf.partner.core.action.ActionCapability
import work.slhaf.partner.core.action.entity.ActionData
import work.slhaf.partner.core.action.entity.ScheduledActionData
import work.slhaf.partner.core.action.entity.ExecutableAction
import work.slhaf.partner.core.action.entity.Scheduled
import work.slhaf.partner.core.action.entity.ScheduledExecutableAction
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput
import work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler
import java.time.ZonedDateTime
@@ -60,14 +61,14 @@ class ActionSchedulerTest {
@Test
fun `running test`() {
fun buildAction(time: ZonedDateTime): ScheduledActionData {
return ScheduledActionData(
fun buildAction(time: ZonedDateTime): ScheduledExecutableAction {
return ScheduledExecutableAction(
"tendency",
mutableMapOf(),
"reason",
"description",
"source",
ScheduledActionData.ScheduleType.ONCE,
Scheduled.ScheduleType.ONCE,
time.toString()
)
}
@@ -78,7 +79,7 @@ class ActionSchedulerTest {
buildAction(now.truncatedTo(ChronoUnit.HOURS).plusHours(1).minusSeconds(1)),
)
Mockito.`when`(actionCapability.listActions(null, null))
.thenReturn(actions as Set<ActionData>)
.thenReturn(actions as Set<ExecutableAction>)
Mockito.`when`(actionExecutor.execute(any()))
.thenAnswer {
val input = it.arguments[0] as ActionExecutorInput
@@ -107,7 +108,7 @@ class ActionSchedulerTest {
val result = actionScheduler.execute(null)
assertEquals(null, result)
verify(actionCapability, Mockito.never()).putAction(any(ActionData::class.java))
verify(actionCapability, Mockito.never()).putAction(any(ExecutableAction::class.java))
}
@Test
@@ -115,7 +116,7 @@ class ActionSchedulerTest {
// 场景编号2路径B2 → B2.3;目的:验证正常入轮与副作用
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = ScheduledActionData.ScheduleType.ONCE,
type = Scheduled.ScheduleType.ONCE,
ZonedDateTime.now().plusHours(1).toString()
)
@@ -132,7 +133,7 @@ class ActionSchedulerTest {
// 场景编号3路径B2 → B2.1;目的:验证忽略非 PREPARE 状态
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = ScheduledActionData.ScheduleType.ONCE
type = Scheduled.ScheduleType.ONCE
)
actionScheduler.execute(setOf(action))
@@ -147,7 +148,7 @@ class ActionSchedulerTest {
// 场景编号4路径B2 → B2.2;目的:验证解析失败被跳过
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = ScheduledActionData.ScheduleType.ONCE
type = Scheduled.ScheduleType.ONCE
)
actionScheduler.execute(setOf(action))
@@ -161,7 +162,7 @@ class ActionSchedulerTest {
// 场景编号5路径B2 → B2.2;目的:验证 cron 解析失败被跳过
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = ScheduledActionData.ScheduleType.CYCLE,
type = Scheduled.ScheduleType.CYCLE,
scheduleContentOverride = "invalid-cron"
)
@@ -176,7 +177,7 @@ class ActionSchedulerTest {
// 场景编号6路径B2 异常中断;目的:验证异常传播
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = ScheduledActionData.ScheduleType.ONCE
type = Scheduled.ScheduleType.ONCE
)
Mockito.doThrow(RuntimeException("boom"))
.`when`(actionCapability)
@@ -192,7 +193,7 @@ class ActionSchedulerTest {
// 场景编号7路径B2.3;目的:验证同小时调度触发 ACTIVE
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = ScheduledActionData.ScheduleType.ONCE,
type = Scheduled.ScheduleType.ONCE,
scheduleContentOverride = ZonedDateTime.now().plusMinutes(2).toString()
)
@@ -211,16 +212,16 @@ class ActionSchedulerTest {
// 场景编号15路径B2 + B2.1/B2.2/B2.3;目的:验证混合输入行为
initTimeWheelWithPrimaryActions(emptySet())
val ok = buildScheduledAction(
type = ScheduledActionData.ScheduleType.ONCE,
type = Scheduled.ScheduleType.ONCE,
scheduleContentOverride = ZonedDateTime.now().plusMinutes(2).toString()
)
val nonPrepare = buildScheduledAction(
type = ScheduledActionData.ScheduleType.ONCE,
type = Scheduled.ScheduleType.ONCE,
scheduleContentOverride = ZonedDateTime.now().plusMinutes(2).toString()
)
nonPrepare.status = ActionData.ActionStatus.FAILED
nonPrepare.status = ExecutableAction.Status.FAILED
val invalid = buildScheduledAction(
type = ScheduledActionData.ScheduleType.CYCLE,
type = Scheduled.ScheduleType.CYCLE,
scheduleContentOverride = "invalid-cron"
)
@@ -235,18 +236,18 @@ class ActionSchedulerTest {
assertFalse(allScheduled.contains(invalid))
}
private fun initTimeWheelWithPrimaryActions(actions: Set<ScheduledActionData>) {
private fun initTimeWheelWithPrimaryActions(actions: Set<ScheduledExecutableAction>) {
@Suppress("UNCHECKED_CAST")
Mockito.`when`(actionCapability.listActions(null, null))
.thenReturn(actions as Set<ActionData>)
.thenReturn(actions as Set<ExecutableAction>)
actionScheduler.init()
}
private fun buildScheduledAction(
type: ScheduledActionData.ScheduleType,
type: Scheduled.ScheduleType,
scheduleContentOverride: String? = null
): ScheduledActionData {
val action = ScheduledActionData(
): ScheduledExecutableAction {
val action = ScheduledExecutableAction(
"test",
mutableMapOf(),
"reason",
@@ -258,7 +259,7 @@ class ActionSchedulerTest {
return action
}
private fun ScheduledActionData.scheduleContentHour(): Int {
private fun ScheduledExecutableAction.scheduleContentHour(): Int {
return ZonedDateTime.parse(this.scheduleContent).hour
}
@@ -269,14 +270,14 @@ class ActionSchedulerTest {
}
@Suppress("UNCHECKED_CAST")
private fun actionsGroupByHour(timeWheel: Any): Array<MutableSet<ScheduledActionData>> {
private fun actionsGroupByHour(timeWheel: Any): Array<MutableSet<ScheduledExecutableAction>> {
val field = timeWheel.javaClass.getDeclaredField("actionsGroupByHour")
field.isAccessible = true
return field.get(timeWheel) as Array<MutableSet<ScheduledActionData>>
return field.get(timeWheel) as Array<MutableSet<ScheduledExecutableAction>>
}
private fun allScheduledActions(timeWheel: Any): Set<ScheduledActionData> {
val result = linkedSetOf<ScheduledActionData>()
private fun allScheduledActions(timeWheel: Any): Set<ScheduledExecutableAction> {
val result = linkedSetOf<ScheduledExecutableAction>()
for (bucket in actionsGroupByHour(timeWheel)) {
result.addAll(bucket)
}