diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCapability.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCapability.java index a22a1758..cd9a8d00 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCapability.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCapability.java @@ -3,10 +3,7 @@ package work.slhaf.partner.core.action; import lombok.NonNull; import org.jetbrains.annotations.Nullable; import work.slhaf.partner.api.agent.factory.capability.annotation.Capability; -import work.slhaf.partner.core.action.entity.ExecutableAction; -import work.slhaf.partner.core.action.entity.MetaAction; -import work.slhaf.partner.core.action.entity.MetaActionInfo; -import work.slhaf.partner.core.action.entity.PhaserRecord; +import work.slhaf.partner.core.action.entity.*; import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import work.slhaf.partner.core.action.runner.RunnerClient; import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention; @@ -24,11 +21,19 @@ public interface ActionCapability { Set listActions(@Nullable ExecutableAction.Status status, @Nullable String source); - List popPendingAction(String userId); + PendingActionRecord createPendingAction(String userId, ExecutableAction executableAction, long ttlMillis, long reminderBeforeMillis); - List listPendingAction(String userId); + List listActivePendingActions(String userId); - void putPendingActions(String userId, ExecutableAction executableAction); + PendingActionRecord resolvePendingDecision(String userId, String pendingId, PendingActionRecord.Decision decision, String reason); + + boolean markPendingReminded(String pendingId); + + PendingActionRecord expirePendingIfWaiting(String pendingId); + + void bindPendingLifecycleActions(String pendingId, StateAction reminderAction, StateAction expireAction); + + void cancelPendingLifecycleActions(String pendingId); List selectTendencyCache(String input); diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 28fff56c..b9f0e817 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -8,10 +8,7 @@ import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod; import work.slhaf.partner.common.vector.VectorClient; import work.slhaf.partner.core.PartnerCore; -import work.slhaf.partner.core.action.entity.ExecutableAction; -import work.slhaf.partner.core.action.entity.MetaAction; -import work.slhaf.partner.core.action.entity.MetaActionInfo; -import work.slhaf.partner.core.action.entity.PhaserRecord; +import work.slhaf.partner.core.action.entity.*; import work.slhaf.partner.core.action.entity.cache.ActionCacheData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustData; import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData; @@ -51,7 +48,8 @@ public class ActionCore extends PartnerCore { /** * 待确认任务,以userId区分不同用户,因为需要跨请求确认 */ - private HashMap> pendingActions = new HashMap<>(); + private final HashMap> pendingActions = new HashMap<>(); + private final HashMap pendingLifecycleActions = new HashMap<>(); /** * 语义缓存与行为倾向映射 */ @@ -88,24 +86,129 @@ public class ActionCore extends PartnerCore { } @CapabilityMethod - public synchronized void putPendingActions(String userId, ExecutableAction executableAction) { - pendingActions.computeIfAbsent(userId, k -> { - List temp = new ArrayList<>(); - temp.add(executableAction); - return temp; - }); + public synchronized PendingActionRecord createPendingAction(String userId, ExecutableAction executableAction, + long ttlMillis, long reminderBeforeMillis) { + long now = System.currentTimeMillis(); + long safeTtl = Math.max(ttlMillis, 1000L); + long safeReminderBefore = Math.max(0L, Math.min(reminderBeforeMillis, safeTtl - 1000L)); + + PendingActionRecord record = new PendingActionRecord(); + record.setUserId(userId); + record.setExecutableAction(executableAction); + record.setCreatedAt(now); + record.setExpireAt(now + safeTtl); + record.setRemindAt(record.getExpireAt() - safeReminderBefore); + record.setStatus(PendingActionRecord.Status.WAITING_CONFIRM); + + pendingActions.computeIfAbsent(userId, k -> new ArrayList<>()).add(record); + return record; } @CapabilityMethod - public synchronized List popPendingAction(String userId) { - List infos = pendingActions.get(userId); - pendingActions.remove(userId); - return infos; + public synchronized List listActivePendingActions(String userId) { + List records = pendingActions.get(userId); + if (records == null || records.isEmpty()) { + return List.of(); + } + return records.stream() + .filter(record -> record.getStatus() == PendingActionRecord.Status.WAITING_CONFIRM + || record.getStatus() == PendingActionRecord.Status.REMINDER_SENT) + .toList(); } @CapabilityMethod - public List listPendingAction(String userId) { - return pendingActions.get(userId); + public synchronized PendingActionRecord resolvePendingDecision(String userId, String pendingId, + PendingActionRecord.Decision decision, String reason) { + PendingActionRecord record = findPendingByUserAndId(userId, pendingId); + if (record == null) { + return null; + } + PendingActionRecord.Status status = record.getStatus(); + boolean active = status == PendingActionRecord.Status.WAITING_CONFIRM + || status == PendingActionRecord.Status.REMINDER_SENT; + if (!active) { + return record; + } + + if (decision == PendingActionRecord.Decision.CONFIRM) { + record.setStatus(PendingActionRecord.Status.CONFIRMED); + record.setDecisionAt(System.currentTimeMillis()); + record.setDecisionReason(reason); + cancelPendingLifecycleActions(pendingId); + return record; + } + + if (decision == PendingActionRecord.Decision.REJECT) { + record.setStatus(PendingActionRecord.Status.REJECTED); + record.setDecisionAt(System.currentTimeMillis()); + record.setDecisionReason(reason); + ExecutableAction executableAction = record.getExecutableAction(); + executableAction.setStatus(ExecutableAction.Status.FAILED); + executableAction.setResult("行动被拒绝"); + cancelPendingLifecycleActions(pendingId); + return record; + } + + record.setDecisionReason(reason); + return record; + } + + @CapabilityMethod + public synchronized boolean markPendingReminded(String pendingId) { + PendingActionRecord record = findPendingById(pendingId); + if (record == null) { + return false; + } + if (record.getStatus() != PendingActionRecord.Status.WAITING_CONFIRM) { + return false; + } + record.setStatus(PendingActionRecord.Status.REMINDER_SENT); + record.setReminded(true); + return true; + } + + @CapabilityMethod + public synchronized PendingActionRecord expirePendingIfWaiting(String pendingId) { + PendingActionRecord record = findPendingById(pendingId); + if (record == null) { + return null; + } + PendingActionRecord.Status status = record.getStatus(); + if (status != PendingActionRecord.Status.WAITING_CONFIRM + && status != PendingActionRecord.Status.REMINDER_SENT) { + return null; + } + record.setStatus(PendingActionRecord.Status.EXPIRED); + record.setDecisionAt(System.currentTimeMillis()); + record.setDecisionReason("等待确认超时"); + cancelPendingLifecycleActions(pendingId); + return record; + } + + @CapabilityMethod + public synchronized void bindPendingLifecycleActions(String pendingId, StateAction reminderAction, StateAction expireAction) { + PendingLifecycleActions old = pendingLifecycleActions.put( + pendingId, new PendingLifecycleActions(reminderAction, expireAction) + ); + if (old != null) { + old.reminderAction.setEnabled(false); + old.expireAction.setEnabled(false); + } + PendingActionRecord record = findPendingById(pendingId); + if (record != null) { + record.setReminderActionId(reminderAction.getUuid()); + record.setExpireActionId(expireAction.getUuid()); + } + } + + @CapabilityMethod + public synchronized void cancelPendingLifecycleActions(String pendingId) { + PendingLifecycleActions actions = pendingLifecycleActions.remove(pendingId); + if (actions == null) { + return; + } + actions.reminderAction.setEnabled(false); + actions.expireAction.setEnabled(false); } /** @@ -434,6 +537,33 @@ public class ActionCore extends PartnerCore { return "action-core"; } + private PendingActionRecord findPendingByUserAndId(String userId, String pendingId) { + List records = pendingActions.get(userId); + if (records == null) { + return null; + } + for (PendingActionRecord record : records) { + if (record.getPendingId().equals(pendingId)) { + return record; + } + } + return null; + } + + private PendingActionRecord findPendingById(String pendingId) { + for (List records : pendingActions.values()) { + for (PendingActionRecord record : records) { + if (record.getPendingId().equals(pendingId)) { + return record; + } + } + } + return null; + } + + private record PendingLifecycleActions(StateAction reminderAction, StateAction expireAction) { + } + public enum ExecutorType { VIRTUAL, PLATFORM } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/PendingActionRecord.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/PendingActionRecord.java new file mode 100644 index 00000000..680f02c9 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/PendingActionRecord.java @@ -0,0 +1,39 @@ +package work.slhaf.partner.core.action.entity; + +import lombok.Data; + +import java.util.UUID; + +@Data +public class PendingActionRecord { + + private final String pendingId = UUID.randomUUID().toString(); + private String userId; + private ExecutableAction executableAction; + private Status status = Status.WAITING_CONFIRM; + + private long createdAt; + private long remindAt; + private long expireAt; + + private boolean reminded; + private long decisionAt; + private String decisionReason; + + private String reminderActionId; + private String expireActionId; + + public enum Status { + WAITING_CONFIRM, + REMINDER_SENT, + CONFIRMED, + REJECTED, + EXPIRED + } + + public enum Decision { + CONFIRM, + REJECT, + HOLD + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java index 85c60d92..97afc1d5 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java @@ -1,5 +1,6 @@ package work.slhaf.partner.module.modules.action.planner; +import kotlin.Unit; import lombok.val; import org.jetbrains.annotations.NotNull; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; @@ -20,6 +21,7 @@ import work.slhaf.partner.module.modules.action.executor.ActionExecutor; import work.slhaf.partner.module.modules.action.planner.confirmer.ActionConfirmer; import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerInput; import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerResult; +import work.slhaf.partner.module.modules.action.planner.confirmer.entity.PendingDecisionItem; import work.slhaf.partner.module.modules.action.planner.evaluator.ActionEvaluator; import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorInput; import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorResult; @@ -29,6 +31,9 @@ import work.slhaf.partner.module.modules.action.planner.extractor.entity.Extract import work.slhaf.partner.module.modules.action.scheduler.ActionScheduler; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -39,6 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { + private static final long PENDING_TTL_MILLIS = 30 * 60 * 1000L; + private static final long PENDING_REMINDER_ADVANCE_MILLIS = 5 * 60 * 1000L; + private final ActionAssemblyHelper assemblyHelper = new ActionAssemblyHelper(); @InjectCapability @@ -139,22 +147,23 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { } private void setupConfirmedActionInfo(PartnerRunningFlowContext context, ConfirmerResult result) { - // TODO 需考虑未确认任务的失效或者拒绝时机,在action core中实现 - List uuids = result.getUuids(); - if (uuids == null) { + List decisions = result.getDecisions(); + if (decisions == null || decisions.isEmpty()) { return; } - List pendingActions = actionCapability.popPendingAction(context.getSource()); - for (ExecutableAction executableAction : pendingActions) { - // put action into ActionCore to record - if (uuids.contains(executableAction.getUuid())) { - actionCapability.putAction(executableAction); + for (PendingDecisionItem decisionItem : decisions) { + PendingActionRecord pendingAction = actionCapability.resolvePendingDecision( + context.getSource(), + decisionItem.getPendingId(), + decisionItem.getDecision(), + decisionItem.getReason() + ); + if (pendingAction == null) { + continue; } - // execute or schedule it immediately - switch (executableAction) { - case SchedulableExecutableAction action -> actionScheduler.schedule(action); - case ImmediateExecutableAction action -> actionExecutor.execute(action); - default -> log.error("unknown executable action type: {}", executableAction.getClass().getSimpleName()); + if (decisionItem.getDecision() == PendingActionRecord.Decision.CONFIRM + && pendingAction.getStatus() == PendingActionRecord.Status.CONFIRMED) { + executeOrSchedule(pendingAction.getExecutableAction()); } } } @@ -163,13 +172,87 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { for (EvaluatorResult evaluatorResult : evaluatorResults) { ExecutableAction executableAction = assemblyHelper.buildActionData(evaluatorResult, context.getSource()); if (evaluatorResult.isNeedConfirm()) { - actionCapability.putPendingActions(context.getSource(), executableAction); + PendingActionRecord pendingAction = actionCapability.createPendingAction( + context.getSource(), + executableAction, + PENDING_TTL_MILLIS, + PENDING_REMINDER_ADVANCE_MILLIS + ); + schedulePendingLifecycleActions(pendingAction); } else { actionCapability.putAction(executableAction); } } } + private void schedulePendingLifecycleActions(PendingActionRecord pendingAction) { + StateAction reminderAction = buildPendingReminderAction(pendingAction); + StateAction expireAction = buildPendingExpireAction(pendingAction); + actionCapability.bindPendingLifecycleActions(pendingAction.getPendingId(), reminderAction, expireAction); + actionScheduler.schedule(reminderAction); + actionScheduler.schedule(expireAction); + } + + private StateAction buildPendingReminderAction(PendingActionRecord pendingAction) { + return new StateAction( + pendingAction.getUserId(), + "pending-action-reminder:" + pendingAction.getPendingId(), + "待确认行动提醒", + Schedulable.ScheduleType.ONCE, + asScheduleContent(pendingAction.getRemindAt()), + new StateAction.Trigger.Call(() -> { + handlePendingReminder(pendingAction.getPendingId()); + return Unit.INSTANCE; + }) + ); + } + + private StateAction buildPendingExpireAction(PendingActionRecord pendingAction) { + return new StateAction( + pendingAction.getUserId(), + "pending-action-expire:" + pendingAction.getPendingId(), + "待确认行动失效", + Schedulable.ScheduleType.ONCE, + asScheduleContent(pendingAction.getExpireAt()), + new StateAction.Trigger.Call(() -> { + handlePendingExpire(pendingAction.getPendingId()); + return Unit.INSTANCE; + }) + ); + } + + private void handlePendingReminder(String pendingId) { + boolean marked = actionCapability.markPendingReminded(pendingId); + if (!marked) { + return; + } + try { + // TODO target 指定行为待补充; 主动回复链路待补充 + cognationCapability.initiateTurn("系统提醒:存在待确认行动即将过期,请确认是否继续执行。pendingId=" + pendingId); + } catch (Exception e) { + log.warn("触发待确认行动提醒失败, pendingId: {}", pendingId, e); + } + } + + private void handlePendingExpire(String pendingId) { + actionCapability.expirePendingIfWaiting(pendingId); + } + + private String asScheduleContent(long targetTimeMillis) { + long now = System.currentTimeMillis(); + long safeTarget = Math.max(targetTimeMillis, now + 1000L); + return ZonedDateTime.ofInstant(Instant.ofEpochMilli(safeTarget), ZoneId.systemDefault()).toString(); + } + + private void executeOrSchedule(ExecutableAction executableAction) { + actionCapability.putAction(executableAction); + switch (executableAction) { + case SchedulableExecutableAction action -> actionScheduler.schedule(action); + case ImmediateExecutableAction action -> actionExecutor.execute(action); + default -> log.error("unknown executable action type: {}", executableAction.getClass().getSimpleName()); + } + } + @Override protected Map getPromptDataMap(PartnerRunningFlowContext context) { HashMap map = new HashMap<>(); @@ -180,13 +263,16 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { } private void setupPendingActions(HashMap map, String userId) { - List executableActionData = actionCapability.listPendingAction(userId); - if (executableActionData == null || executableActionData.isEmpty()) { + List pendingActions = actionCapability.listActivePendingActions(userId); + if (pendingActions.isEmpty()) { map.put("[待确认行动] <等待用户确认的行动信息>", "无待确认行动"); return; } - for (int i = 0; i < executableActionData.size(); i++) { - map.put("[待确认行动 " + (i + 1) + " ] <等待用户确认的行动信息>", generateActionStr(executableActionData.get(i))); + for (int i = 0; i < pendingActions.size(); i++) { + map.put( + "[待确认行动 " + (i + 1) + " ] <等待用户确认的行动信息>", + generateActionStr(pendingActions.get(i).getExecutableAction()) + ); } } @@ -344,8 +430,9 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { private ConfirmerInput buildConfirmerInput(PartnerRunningFlowContext context) { ConfirmerInput confirmerInput = new ConfirmerInput(); confirmerInput.setInput(context.getInput()); - List pendingActions = actionCapability.listPendingAction(context.getSource()); - confirmerInput.setExecutableActionData(pendingActions); + List pendingActions = actionCapability.listActivePendingActions(context.getSource()); + confirmerInput.setRecentMessages(cognationCapability.snapshotChatMessages()); + confirmerInput.setPendingActions(pendingActions); return confirmerInput; } } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/ActionConfirmer.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/ActionConfirmer.java index 2cbc5abb..11a21248 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/ActionConfirmer.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/ActionConfirmer.java @@ -10,8 +10,10 @@ import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCore; import work.slhaf.partner.core.action.entity.ExecutableAction; +import work.slhaf.partner.core.action.entity.PendingActionRecord; import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerInput; import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerResult; +import work.slhaf.partner.module.modules.action.planner.confirmer.entity.PendingDecisionItem; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -25,22 +27,33 @@ public class ActionConfirmer extends AbstractAgentModule.Sub executableActionList = data.getExecutableActionData(); + List pendingActions = data.getPendingActions(); + if (pendingActions == null || pendingActions.isEmpty()) { + return new ConfirmerResult(); + } ExecutorService executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL); - CountDownLatch latch = new CountDownLatch(executableActionList.size()); + CountDownLatch latch = new CountDownLatch(pendingActions.size()); ConfirmerResult result = new ConfirmerResult(); - List uuids = result.getUuids(); - for (ExecutableAction executableAction : executableActionList) { + List decisions = result.getDecisions(); + for (PendingActionRecord pendingAction : pendingActions) { executor.execute(() -> { try { + ExecutableAction executableAction = pendingAction.getExecutableAction(); String prompt = buildPrompt(executableAction, data.getInput(), data.getRecentMessages()); ChatResponse response = this.singleChat(prompt); JSONObject tempResult = JSONObject.parseObject(extractJson(response.getMessage())); - if (tempResult.getBoolean("confirmed")) { - executableAction.setStatus(ExecutableAction.Status.PREPARE); - synchronized (uuids) { - uuids.add(executableAction.getUuid()); - } + PendingActionRecord.Decision decision = parseDecision(tempResult); + String reason = tempResult == null ? null : tempResult.getString("reason"); + synchronized (decisions) { + decisions.add(new PendingDecisionItem(pendingAction.getPendingId(), decision, reason)); + } + } catch (Exception e) { + synchronized (decisions) { + decisions.add(new PendingDecisionItem( + pendingAction.getPendingId(), + PendingActionRecord.Decision.HOLD, + "确认解析失败: " + e.getLocalizedMessage() + )); } } finally { latch.countDown(); @@ -55,6 +68,30 @@ public class ActionConfirmer extends AbstractAgentModule.Sub recentMessages) { JSONObject prompt = new JSONObject(); prompt.put("[用户输入]", input); @@ -63,8 +100,14 @@ public class ActionConfirmer extends AbstractAgentModule.Sub executableActionData; + private List pendingActions; private List recentMessages; } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/entity/ConfirmerResult.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/entity/ConfirmerResult.java index cf61639d..c25abf84 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/entity/ConfirmerResult.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/entity/ConfirmerResult.java @@ -7,5 +7,5 @@ import java.util.List; @Data public class ConfirmerResult { - private List uuids = new ArrayList<>(); + private List decisions = new ArrayList<>(); } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/entity/PendingDecisionItem.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/entity/PendingDecisionItem.java new file mode 100644 index 00000000..1bfbaafa --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/confirmer/entity/PendingDecisionItem.java @@ -0,0 +1,15 @@ +package work.slhaf.partner.module.modules.action.planner.confirmer.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import work.slhaf.partner.core.action.entity.PendingActionRecord; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class PendingDecisionItem { + private String pendingId; + private PendingActionRecord.Decision decision; + private String reason; +}