From 11ea1045f483bae1e3a29e05844825f56aa1af2a Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Wed, 18 Feb 2026 15:20:52 +0800 Subject: [PATCH] refactor(Action): generalize ActionScheduler to `Schedulable` and add StateAction trigger execution path --- .../partner/core/action/entity/Action.kt | 1 + .../dispatcher/scheduler/ActionScheduler.kt | 206 ++++++++++-------- 2 files changed, 115 insertions(+), 92 deletions(-) diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/Action.kt b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/Action.kt index 46f5437a..79698de2 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/Action.kt +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/entity/Action.kt @@ -31,6 +31,7 @@ sealed interface Schedulable { val scheduleType: ScheduleType val scheduleContent: String + val uuid: String enum class ScheduleType { CYCLE, diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt index 767015fa..deb50222 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt @@ -17,9 +17,10 @@ import work.slhaf.partner.api.agent.factory.module.annotation.Init import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule import work.slhaf.partner.core.action.ActionCapability -import work.slhaf.partner.core.action.entity.ExecutableAction +import work.slhaf.partner.core.action.ActionCore import work.slhaf.partner.core.action.entity.Schedulable import work.slhaf.partner.core.action.entity.SchedulableExecutableAction +import work.slhaf.partner.core.action.entity.StateAction import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput import java.io.Closeable @@ -30,7 +31,7 @@ import java.util.stream.Collectors import kotlin.jvm.optionals.getOrNull @AgentSubModule -class ActionScheduler : AgentRunningSubModule, Void>() { +class ActionScheduler : AgentRunningSubModule, Void>() { @InjectCapability private lateinit var actionCapability: ActionCapability @@ -49,17 +50,34 @@ class ActionScheduler : AgentRunningSubModule, @Init fun init() { - val listScheduledActions: () -> Set = { - actionCapability.listActions(null, null) - .stream() - .filter { it is SchedulableExecutableAction } - .map { it as SchedulableExecutableAction } - .collect(Collectors.toSet()) + fun loadScheduledActions() { + val listScheduledActions: () -> Set = { + actionCapability.listActions(null, null) + .stream() + .filter { it is SchedulableExecutableAction } + .map { it as SchedulableExecutableAction } + .collect(Collectors.toSet()) + } + + // TODO 3. 重构 trigger 内容,在替换为 Set 后,需要进行类型判定,确认是自行执行,还是交给 actionExecutor + val onTrigger: (Set) -> Unit = { schedulableSet -> + val executableActions = mutableSetOf() + val stateActions = mutableSetOf() + for (schedulable in schedulableSet) { + when (schedulable) { + is SchedulableExecutableAction -> executableActions.add(schedulable) + is StateAction -> stateActions.add(schedulable) + } + } + actionExecutor.execute(ActionExecutorInput(executableActions)) + actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL) + .execute { stateActions.forEach { it.trigger.onTrigger() } } + } + + timeWheel = TimeWheel(listScheduledActions, onTrigger) } - val onTrigger: (Set) -> Unit = { actionExecutor.execute(ActionExecutorInput(it)) } - - timeWheel = TimeWheel(listScheduledActions, onTrigger) + loadScheduledActions() setupShutdownHook() } @@ -71,27 +89,30 @@ class ActionScheduler : AgentRunningSubModule, }) } - // TODO 如果要将 TimeWheel 作为 Agent 内部的循环周期,那么不依赖 Action 链路的内容,将不适合参与到 ActionExecutor,因此需要将 ActionData 的触发类型进行分类:SILENT TRIGGER(仅限更新 ActionData 内部状态,通过属性 copy 完成,不开放过多权限,防止序列化失败)、EXECUTOR、AGENT TURN。考虑将时间轮下放至 ActionCapability,作为底层行动语义的一部分 - override fun execute(scheduledActionDataSet: Set?): Void? { + override fun execute(schedulableSet: Set?): Void? { + // TODO 1. 将输入参数重构为 Set,在 for 循环中依据计划字段放入时间轮 schedulerScope.launch { - scheduledActionDataSet?.run { - for (scheduledActionData in scheduledActionDataSet) { - log.debug("New action to schedule: {}", scheduledActionData) - actionCapability.putAction(scheduledActionData) - timeWheel.schedule(scheduledActionData) + schedulableSet?.run { + for (schedulableData in schedulableSet) { + log.debug("New data to schedule: {}", schedulableData) + timeWheel.schedule(schedulableData) + if (schedulableData is SchedulableExecutableAction) { + actionCapability.putAction(schedulableData) + } } } } return null } + // TODO 2. 重构为 Set private class TimeWheel( - val listScheduledActions: () -> Set, - val onTrigger: (toTrigger: Set) -> Unit + val listSource: () -> Set, + val onTrigger: (toTrigger: Set) -> Unit ) : Closeable { - private val actionsGroupByHour = Array>(24) { mutableSetOf() } - private val wheel = Array>(60 * 60) { mutableSetOf() } + private val schedulableGroupByHour = Array>(24) { mutableSetOf() } + private val wheel = Array>(60 * 60) { mutableSetOf() } private var recordHour: Int = -1 private var recordDay: Int = -1 private val state = MutableStateFlow(WheelState.SLEEPING) @@ -102,37 +123,30 @@ class ActionScheduler : AgentRunningSubModule, private val cronDefinition: CronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ) private val cronParser: CronParser = CronParser(cronDefinition) - /** - * 根据 primaryActions 建立时间轮,并只加载当天任务,同时启动 tick 线程 - */ init { // 启动时间轮 - launchWheel() + wheel() } - suspend fun schedule(actionData: SchedulableExecutableAction) { - if (actionData.status != ExecutableAction.Status.PREPARE) { - return - } - + suspend fun schedule(schedulableData: Schedulable) { checkThenExecute { val parseToZonedDateTime = parseToZonedDateTime( - actionData.scheduleType, - actionData.scheduleContent, + schedulableData.scheduleType, + schedulableData.scheduleContent, it ) ?: run { - logFailedStatus(actionData) + logFailedStatus(schedulableData) return@checkThenExecute } log.debug("Action next execution time: {}", parseToZonedDateTime) val hour = parseToZonedDateTime.hour - actionsGroupByHour[hour].add(actionData) + schedulableGroupByHour[hour].add(schedulableData) log.debug("Action scheduled at {}", hour) if (it.hour == hour) { val wheelOffset = parseToZonedDateTime.minute * 60 + parseToZonedDateTime.second - wheel[wheelOffset].add(actionData) + wheel[wheelOffset].add(schedulableData) state.value = WheelState.ACTIVE log.debug("Action scheduled at wheel offset {}", wheelOffset) } @@ -140,17 +154,22 @@ class ActionScheduler : AgentRunningSubModule, } } - private fun launchWheel() { + private fun wheel() { - fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set? { + data class WheelStepResult( + val toTrigger: Set?, + val shouldBreak: Boolean + ) + + fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set? { if (tick > previousTick) { - val toTrigger = mutableSetOf() + val toTrigger = mutableSetOf() for (i in previousTick..tick) { val bucket = wheel[i] if (bucket.isNotEmpty()) { toTrigger.addAll(bucket) val bucketUuids = bucket.asSequence().map { it.uuid }.toHashSet() - actionsGroupByHour[triggerHour].removeIf { it.uuid in bucketUuids } + schedulableGroupByHour[triggerHour].removeIf { it.uuid in bucketUuids } bucket.clear() // 避免重复触发 } } @@ -178,37 +197,42 @@ class ActionScheduler : AgentRunningSubModule, // 2) 推进节拍器:按“理论秒”前进 step 次 nextTickNanos += step.toLong() * 1_000_000_000L - var shouldBreak = false - var toTrigger: Set? = null + val stepResult = run { + var shouldBreak = false + var toTrigger: Set? = null + + checkThenExecute(false) { + if (it.hour != launchingHour) { + shouldBreak = true + toTrigger = collectToTrigger(wheel.lastIndex, previousTick, launchingHour) + log.debug( + "Hour changed, previousTick: {}, tick: {}, toTriggerSize: {}", + previousTick, + tick, + toTrigger?.size + ) + return@checkThenExecute + } + + toTrigger = collectToTrigger(tick, previousTick, launchingHour) + + if (tick >= wheel.lastIndex || schedulableGroupByHour[launchingHour].isEmpty()) { + state.value = WheelState.SLEEPING + shouldBreak = true + } - checkThenExecute(false) { - if (it.hour != launchingHour) { - shouldBreak = true - toTrigger = collectToTrigger(wheel.lastIndex, previousTick, launchingHour) - log.debug( - "Hour changed, previousTick: {}, tick: {}, toTriggerSize: {}", - previousTick, - tick, - toTrigger?.size - ) - return@checkThenExecute } - toTrigger = collectToTrigger(tick, previousTick, launchingHour) + WheelStepResult(toTrigger, shouldBreak) + } - if (tick >= wheel.lastIndex || actionsGroupByHour[launchingHour].isEmpty()) { - state.value = WheelState.SLEEPING - shouldBreak = true - return@checkThenExecute + stepResult.toTrigger?.let { trigger -> + timeWheelScope.launch { + onTrigger(trigger) } } - toTrigger?.takeIf { it.isNotEmpty() }?.let { - onTrigger(it) - log.debug("Executing action at hour {} tick {}", launchingHour, tick) - } - - if (shouldBreak) { + if (stepResult.shouldBreak) { log.debug("Wheel stopped at tick {}", tick) break } @@ -243,7 +267,7 @@ class ActionScheduler : AgentRunningSubModule, var primaryTickAdvanceTime: Long? = null checkThenExecute { currentTime = it - shouldWait = actionsGroupByHour[it.hour].isEmpty() + shouldWait = schedulableGroupByHour[it.hour].isEmpty() // 由于 wheel 的启动时间可能存在延迟,而时内推进由 nanoTime 保证不会漏发, // 正常的时序结束又由 tick 是否触顶、当前时是否存在额外任务触发, // 而启动时无触发保障,此时一并初始化 tick 推进时间,足以应对 check 与 wheel 间的这段时间间隔 @@ -266,24 +290,24 @@ class ActionScheduler : AgentRunningSubModule, suspend fun checkThenExecute(finallyToExecute: Boolean = true, then: (currentTime: ZonedDateTime) -> Unit) = wheelActionsLock.withLock { fun loadActions( - source: Set, + source: Set, now: ZonedDateTime, - load: (latestExecutingTime: ZonedDateTime, actionData: SchedulableExecutableAction) -> Unit, + load: (latestExecutingTime: ZonedDateTime, schedulableData: Schedulable) -> Unit, repair: () -> Unit ) { val runLoading = { - for (actionData in source) { + for (schedulableData in source) { val nextExecutingTime = parseToZonedDateTime( - actionData.scheduleType, - actionData.scheduleContent, + schedulableData.scheduleType, + schedulableData.scheduleContent, now ) ?: run { - logFailedStatus(actionData) + logFailedStatus(schedulableData) continue } - load(nextExecutingTime, actionData) + load(nextExecutingTime, schedulableData) } } @@ -292,12 +316,12 @@ class ActionScheduler : AgentRunningSubModule, } fun loadHourActions(currentTime: ZonedDateTime) { - val load: (ZonedDateTime, SchedulableExecutableAction) -> Unit = - { latestExecutionTime, actionData -> - val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second - wheel[secondsTime].add(actionData) - log.debug("Action loaded to hour: {}", actionData) - } + val load: (ZonedDateTime, Schedulable) -> Unit = + { latestExecutionTime, schedulableData -> + val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second + wheel[secondsTime].add(schedulableData) + log.debug("Action loaded to hour: {}", schedulableData) + } val repair: () -> Unit = { for (set in wheel) { @@ -305,23 +329,23 @@ class ActionScheduler : AgentRunningSubModule, } } - loadActions(actionsGroupByHour[currentTime.hour], currentTime, load, repair) + loadActions(schedulableGroupByHour[currentTime.hour], currentTime, load, repair) } fun loadDayActions(currentTime: ZonedDateTime) { - val load: (ZonedDateTime, SchedulableExecutableAction) -> Unit = - { latestExecutingTime, actionData -> - actionsGroupByHour[latestExecutingTime.hour].add(actionData) - log.debug("Action loaded to day: {}", actionData) - } + val load: (ZonedDateTime, Schedulable) -> Unit = + { latestExecutingTime, schedulableData -> + schedulableGroupByHour[latestExecutingTime.hour].add(schedulableData) + log.debug("Action loaded to day: {}", schedulableData) + } val repair: () -> Unit = { - for (set in actionsGroupByHour) { + for (set in schedulableGroupByHour) { set.clear() } } - loadActions(listScheduledActions(), currentTime, load, repair) + loadActions(listSource(), currentTime, load, repair) } fun refreshIfNeeded(now: ZonedDateTime) { @@ -382,13 +406,11 @@ class ActionScheduler : AgentRunningSubModule, } - private fun logFailedStatus(actionData: SchedulableExecutableAction) { + private fun logFailedStatus(scheduleData: Schedulable) { log.warn( - "行动未加载,uuid: {}, source: {}, tendency: {}, scheduleContent: {}", - actionData.uuid, - actionData.source, - actionData.tendency, - actionData.scheduleContent, + "行动未加载,scheduleType: {}, scheduleContent: {}", + scheduleData.scheduleType, + scheduleData.scheduleContent, ) }