diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java index 79444127..e436d96e 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java @@ -57,6 +57,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { } catch (TimeoutException e) { future.cancel(true); action.setStatus(Action.Status.FAILED); + if (action instanceof ExecutableAction executableAction) { + ensureExecutableResult(executableAction, true, "行动执行超时"); + } log.warn("Action timeout, uuid: {}", action.getUuid()); } catch (Exception ignored) { } @@ -71,6 +74,13 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { case StateAction stateAction -> handleStateAction(stateAction); default -> handleUnknownAction(action); } + if (action instanceof ExecutableAction executableAction) { + if (action.getStatus() == Action.Status.FAILED) { + ensureExecutableResult(executableAction, true, null); + return; + } + ensureExecutableResult(executableAction, false, null); + } if (action.getStatus() == Action.Status.FAILED) { return; } @@ -78,6 +88,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { } catch (Exception e) { log.warn("Unexpected action execution failure, uuid: {}, description: {}, failure reason: {}", action.getUuid(), action.getDescription(), e.getLocalizedMessage()); action.setStatus(Action.Status.FAILED); + if (action instanceof ExecutableAction executableAction) { + ensureExecutableResult(executableAction, true, e.getLocalizedMessage()); + } } }; } @@ -173,7 +186,6 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { if (executableAction instanceof SchedulableExecutableAction scheduledActionData) { scheduledActionData.recordAndReset(); } - // TODO 执行过后需要回写至任务上下文(recentCompletedTask),同时触发自对话信号进行确认并记录以及是否通知用户(触发与否需要机制进行匹配,在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等) } private MetaActionsListeningRecord executeAndListening(List metaActions, PhaserRecord phaserRecord, String source) { @@ -270,6 +282,69 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { }; } + private void ensureExecutableResult(ExecutableAction executableAction, boolean failed, String failureReason) { + if (hasExecutableResult(executableAction)) { + return; + } + executableAction.setResult(resolveExecutableResult(executableAction, failed, failureReason)); + } + + private String resolveExecutableResult(ExecutableAction executableAction, boolean failed, String failureReason) { + String extracted = extractLastMetaActionResult(executableAction); + if (extracted != null && !extracted.isBlank()) { + return extracted; + } + if (!failed) { + return "行动执行成功"; + } + if (failureReason != null && !failureReason.isBlank()) { + return "行动执行失败: " + failureReason; + } + return "行动执行失败"; + } + + private String extractLastMetaActionResult(ExecutableAction executableAction) { + if (!executableAction.getHistory().isEmpty()) { + Integer lastStage = executableAction.getHistory().keySet().stream() + .max(Integer::compareTo) + .orElse(null); + if (lastStage != null) { + List historyActions = executableAction.getHistory().get(lastStage); + if (historyActions != null && !historyActions.isEmpty()) { + String result = historyActions.getLast().result(); + if (result != null && !result.isBlank()) { + return result; + } + } + } + } + + if (!executableAction.getActionChain().isEmpty()) { + Integer lastStage = executableAction.getActionChain().keySet().stream() + .max(Integer::compareTo) + .orElse(null); + if (lastStage != null) { + List metaActions = executableAction.getActionChain().get(lastStage); + if (metaActions != null && !metaActions.isEmpty()) { + String result = metaActions.getLast().getResult().getData(); + if (result != null && !result.isBlank()) { + return result; + } + } + } + } + return null; + } + + private boolean hasExecutableResult(ExecutableAction executableAction) { + try { + String result = executableAction.getResult(); + return result != null && !result.isBlank(); + } catch (Exception e) { + return false; + } + } + private record MetaActionsListeningRecord(AtomicBoolean accepting, int phase) { } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java index 97afc1d5..4958a32f 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/planner/ActionPlanner.java @@ -46,6 +46,7 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { private static final long PENDING_TTL_MILLIS = 30 * 60 * 1000L; private static final long PENDING_REMINDER_ADVANCE_MILLIS = 5 * 60 * 1000L; + private static final String IMMEDIATE_WATCHER_CRON = "0/5 * * * * ?"; private final ActionAssemblyHelper assemblyHelper = new ActionAssemblyHelper(); @@ -180,7 +181,7 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { ); schedulePendingLifecycleActions(pendingAction); } else { - actionCapability.putAction(executableAction); + executeOrSchedule(executableAction); } } } @@ -245,14 +246,61 @@ public class ActionPlanner extends PreRunningAbstractAgentModuleAbstract { } private void executeOrSchedule(ExecutableAction executableAction) { - actionCapability.putAction(executableAction); switch (executableAction) { + case ImmediateExecutableAction action -> executeImmediateWithWatcher(action); case SchedulableExecutableAction action -> actionScheduler.schedule(action); - case ImmediateExecutableAction action -> actionExecutor.execute(action); default -> log.error("unknown executable action type: {}", executableAction.getClass().getSimpleName()); } } + private void executeImmediateWithWatcher(ImmediateExecutableAction action) { + actionCapability.putAction(action); + actionExecutor.execute(action); + + AtomicBoolean notified = new AtomicBoolean(false); + final StateAction[] watcherRef = new StateAction[1]; + StateAction watcher = new StateAction( + action.getSource(), + "immediate-action-watcher:" + action.getUuid(), + "轮询即时行动执行结果", + Schedulable.ScheduleType.CYCLE, + IMMEDIATE_WATCHER_CRON, + new StateAction.Trigger.Call(() -> { + Action.Status status = action.getStatus(); + if (status != Action.Status.SUCCESS && status != Action.Status.FAILED) { + return Unit.INSTANCE; + } + if (watcherRef[0] != null) { + watcherRef[0].setEnabled(false); + } + if (!notified.compareAndSet(false, true)) { + return Unit.INSTANCE; + } + watcherSelfTalk(action); + return Unit.INSTANCE; + }) + ); + watcherRef[0] = watcher; + actionScheduler.schedule(watcher); + } + + private void watcherSelfTalk(ImmediateExecutableAction action) { + String result = action.getResult(); + String structuredSignal = String.format( + "{event=immediate_action_finished,actionUuid=%s,tendency=%s,status=%s,source=%s,result=%s}", + action.getUuid(), + action.getTendency(), + action.getStatus(), + action.getSource(), + result == null ? "" : result //将会在 ActionExecutor + ); + try { + cognationCapability.initiateTurn(structuredSignal); + } catch (Exception e) { + log.warn("触发 immediate 行动完成自对话失败, actionUuid: {}", action.getUuid(), e); + } + } + @Override protected Map getPromptDataMap(PartnerRunningFlowContext context) { HashMap map = new HashMap<>();