diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/Action.kt b/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/Action.kt index ff7c0703..84549b0d 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/Action.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/entity/Action.kt @@ -3,6 +3,8 @@ package work.slhaf.partner.core.action.entity import work.slhaf.partner.module.modules.action.executor.entity.HistoryAction import java.time.ZonedDateTime import java.util.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes sealed class Action { /** @@ -63,6 +65,7 @@ sealed interface Schedulable { val scheduleType: ScheduleType val scheduleContent: String val uuid: String + val timeout: Duration var enabled: Boolean enum class ScheduleType { @@ -114,7 +117,8 @@ data class SchedulableExecutableAction( override val description: String, override val source: String, override val scheduleType: Schedulable.ScheduleType, - override val scheduleContent: String + override val scheduleContent: String, + override val timeout: Duration = 10.minutes ) : ExecutableAction(), Schedulable { override var enabled = true @@ -132,8 +136,6 @@ data class SchedulableExecutableAction( action.result.reset() } } - - status = Status.PREPARE } data class ScheduleHistory( @@ -166,6 +168,7 @@ data class StateAction( override val scheduleContent: String, override var enabled: Boolean = true, + override val timeout: Duration = 5.minutes, val trigger: Trigger ) : Action(), Schedulable { diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java index 0926ee37..f2dba35c 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/executor/ActionExecutor.java @@ -61,8 +61,12 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { case StateAction stateAction -> handleStateAction(stateAction); default -> handleUnknownAction(action); } + if (action.getStatus() == Action.Status.FAILED) { + return; + } + action.setStatus(Action.Status.SUCCESS); } catch (Exception e) { - log.warn("Action execute failure, uuid: {}, description: {}, failure reason: {}", action.getUuid(), action.getDescription(), e.getLocalizedMessage()); + log.warn("Unexpected action execution failure, uuid: {}, description: {}, failure reason: {}", action.getUuid(), action.getDescription(), e.getLocalizedMessage()); action.setStatus(Action.Status.FAILED); } }; @@ -155,16 +159,11 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { } while (stageCursor.next()); // 结束 actionCapability.removePhaserRecord(phaser); - if (executableAction.getStatus() != Action.Status.FAILED) { - // 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果 - if (executableAction instanceof SchedulableExecutableAction scheduledActionData) { - scheduledActionData.recordAndReset(); - actionScheduler.schedule(scheduledActionData); - } else { - executableAction.setStatus(Action.Status.SUCCESS); - } - // TODO 执行过后需要回写至任务上下文(recentCompletedTask),同时触发自对话信号进行确认并记录以及是否通知用户(触发与否需要机制进行匹配,在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等) + // 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果 + if (executableAction instanceof SchedulableExecutableAction scheduledActionData) { + scheduledActionData.recordAndReset(); } + // TODO 执行过后需要回写至任务上下文(recentCompletedTask),同时触发自对话信号进行确认并记录以及是否通知用户(触发与否需要机制进行匹配,在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等) } private MetaActionsListeningRecord executeAndListening(List metaActions, PhaserRecord phaserRecord, String source) { diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/scheduler/ActionScheduler.kt b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/scheduler/ActionScheduler.kt index d78d88c6..08e35b27 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/scheduler/ActionScheduler.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/modules/action/scheduler/ActionScheduler.kt @@ -26,6 +26,7 @@ import java.time.ZonedDateTime import java.time.temporal.ChronoUnit import java.util.stream.Collectors import kotlin.jvm.optionals.getOrNull +import kotlin.time.Duration.Companion.milliseconds class ActionScheduler : AbstractAgentModule.Standalone() { @InjectCapability @@ -55,7 +56,13 @@ class ActionScheduler : AbstractAgentModule.Standalone() { schedulableSet.filterIsInstance() .forEach { actionExecutor.execute(it) } } - timeWheel = TimeWheel(listScheduledActions, onTrigger) + val doneCondition: (Schedulable) -> Boolean = { schedulable -> + if (schedulable is Action) { + schedulable.status == Action.Status.FAILED || schedulable.status == Action.Status.SUCCESS + } + true + } + timeWheel = TimeWheel(listScheduledActions, onTrigger, doneCondition) } loadScheduledActions() setupShutdownHook() @@ -83,7 +90,8 @@ class ActionScheduler : AbstractAgentModule.Standalone() { private class TimeWheel( val listSource: () -> Set, - val onTrigger: (toTrigger: Set) -> Unit + val onTrigger: (toTrigger: Set) -> Unit, + val doneCondition: (schedulable: Schedulable) -> Boolean ) : Closeable { private val schedulableGroupByHour = Array>(24) { mutableSetOf() } private val wheel = Array>(60 * 60) { mutableSetOf() } @@ -146,6 +154,29 @@ class ActionScheduler : AbstractAgentModule.Standalone() { return null } + fun handleToTrigger(toTrigger: Set) { + timeWheelScope.launch { + onTrigger(toTrigger) + } + for (schedulable in toTrigger) timeWheelScope.launch { + if (schedulable.scheduleType == Schedulable.ScheduleType.ONCE) { + return@launch + } + + withTimeoutOrNull(schedulable.timeout) { + while (!doneCondition(schedulable)) { + delay(50.milliseconds) + } + } + + if (!schedulable.enabled) { + return@launch + } + + this@TimeWheel.schedule(schedulable) + } + } + suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime, primaryTickAdvanceTime: Long) { val launchingHour = launchingTime.hour var tick = launchingTime.minute * 60 + launchingTime.second @@ -183,11 +214,7 @@ class ActionScheduler : AbstractAgentModule.Standalone() { } WheelStepResult(toTrigger, shouldBreak) } - stepResult.toTrigger?.let { trigger -> - timeWheelScope.launch { - onTrigger(trigger) - } - } + stepResult.toTrigger?.let { handleToTrigger(it) } if (stepResult.shouldBreak) { log.debug("Wheel stopped at tick {}", tick) break