From 69d9f04f11b9661ef57f8e96b6f122025032db4d Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Mon, 9 Feb 2026 21:04:48 +0800 Subject: [PATCH] fix(ActionScheduler): stabilize wheel tick pacing and run trigger scan before hour/day refresh --- .../dispatcher/scheduler/ActionScheduler.kt | 182 ++++++++++-------- 1 file changed, 97 insertions(+), 85 deletions(-) diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt index bc3f584b..e8472fbd 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt @@ -158,57 +158,59 @@ class ActionScheduler : AgentRunningSubModule, Void>() } suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime, primaryTickAdvanceTime: Long) { - log.debug("Wheel launched at {}, current nanoTime: {}", launchingTime, primaryTickAdvanceTime) - // 计算当前距离时内下次任务的剩余时间, 秒级推进 val launchingHour = launchingTime.hour var tick = launchingTime.minute * 60 + launchingTime.second - var lastTickAdvanceTime = primaryTickAdvanceTime + + // 让节拍器从“启动时刻的下一秒”开始(避免立即 step=0) + var nextTickNanos = primaryTickAdvanceTime + 1_000_000_000L while (isActive) { - // tick 推进(nano -> second) - val current = System.nanoTime() - val step = ((current - lastTickAdvanceTime) / 1_000_000_000L).toInt() + // 1) 计算落后多少秒:至少 1(正常推进),也可能 >1(追赶) + val now0 = System.nanoTime() + val lagNanos = now0 - nextTickNanos + val step = if (lagNanos < 0) 1 else (lagNanos / 1_000_000_000L).toInt() + 1 val previousTick = tick tick = (tick + step).coerceAtMost(wheel.lastIndex) - lastTickAdvanceTime = current + + // 2) 推进节拍器:按“理论秒”前进 step 次 + nextTickNanos += step.toLong() * 1_000_000_000L var shouldBreak = false var toTrigger: Set? = null - checkThenExecute { + checkThenExecute(false) { if (it.hour != launchingHour) { - // recordHout 已更新,此时后执行无意义,不等于启动时的 hour,则需要停止 shouldBreak = true + toTrigger = collectToTrigger(wheel.lastIndex, previousTick, launchingHour) return@checkThenExecute } - toTrigger = collectToTrigger(tick, previousTick, it.hour) + toTrigger = collectToTrigger(tick, previousTick, launchingHour) - // 推进到顶时停止循环、当前时无任务时停止循环 - if (tick >= wheel.lastIndex || actionsGroupByHour[it.hour].isEmpty()) { + if (tick >= wheel.lastIndex || actionsGroupByHour[launchingHour].isEmpty()) { state.value = WheelState.SLEEPING shouldBreak = true return@checkThenExecute } - - // 取当前 tick、推进过程中经过的 tick 对应任务,异步启动 } - toTrigger?.let { - if (it.isEmpty()) return@let + toTrigger?.takeIf { it.isNotEmpty() }?.let { onTrigger(it) log.debug("Executing action at hour {} tick {}", launchingHour, tick) } - // 休眠一秒 - delay(1000) - if (shouldBreak) { log.debug("Wheel stopped at tick {}", tick) break } + // 3) 精确睡到下一次理论 tick(用最新 nanoTime) + val now1 = System.nanoTime() + val sleepNanos = nextTickNanos - now1 + if (sleepNanos > 0) { + delay(sleepNanos / 1_000_000L) // 毫秒级 delay 足够;剩余 nanos 不必忙等 + } } } @@ -253,80 +255,90 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - suspend fun checkThenExecute(then: (currentTime: ZonedDateTime) -> Unit) = wheelActionsLock.withLock { - fun loadActions( - source: Set, - now: ZonedDateTime, - load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit, - repair: () -> Unit - ) { - val runLoading = { - for (actionData in source) { - val nextExecutingTime = - parseToZonedDateTime( - actionData.scheduleType, - actionData.scheduleContent, - now - ) ?: run { - logFailedStatus(actionData) - continue - } + suspend fun checkThenExecute(finallyToExecute: Boolean = true, then: (currentTime: ZonedDateTime) -> Unit) = + wheelActionsLock.withLock { + fun loadActions( + source: Set, + now: ZonedDateTime, + load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit, + repair: () -> Unit + ) { + val runLoading = { + for (actionData in source) { + val nextExecutingTime = + parseToZonedDateTime( + actionData.scheduleType, + actionData.scheduleContent, + now + ) ?: run { + logFailedStatus(actionData) + continue + } - load(nextExecutingTime, actionData) + load(nextExecutingTime, actionData) + } + } + + repair() + runLoading() + } + + fun loadHourActions(currentTime: ZonedDateTime) { + val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> + val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second + wheel[secondsTime].add(actionData) + log.debug("Action loaded to hour: {}", actionData) + } + + val repair: () -> Unit = { + for (set in wheel) { + set.clear() + } + } + + loadActions(actionsGroupByHour[currentTime.hour], currentTime, load, repair) + } + + fun loadDayActions(currentTime: ZonedDateTime) { + val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData -> + actionsGroupByHour[latestExecutingTime.hour].add(actionData) + log.debug("Action loaded to day: {}", actionData) + } + + val repair: () -> Unit = { + for (set in actionsGroupByHour) { + set.clear() + } + } + + loadActions(listScheduledActions(), currentTime, load, repair) + } + + fun refreshIfNeeded(now: ZonedDateTime) { + val d = now.dayOfMonth + val h = now.hour + if (d != recordDay) { + recordDay = d + recordHour = h + loadDayActions(now) + loadHourActions(now) + } else if (h != recordHour) { + recordHour = h + loadHourActions(now) } } - repair() - runLoading() - } + val now = ZonedDateTime.now() - fun loadHourActions(currentTime: ZonedDateTime) { - val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> - val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second - wheel[secondsTime].add(actionData) - log.debug("Action loaded to hour: {}", actionData) + if (finallyToExecute) { + refreshIfNeeded(now) + then(now) + } else { + then(now) + refreshIfNeeded(now) } - - val repair: () -> Unit = { - for (set in wheel) { - set.clear() - } - } - - loadActions(actionsGroupByHour[currentTime.hour], currentTime, load, repair) } - fun loadDayActions(currentTime: ZonedDateTime) { - val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData -> - actionsGroupByHour[latestExecutingTime.hour].add(actionData) - log.debug("Action loaded to day: {}", actionData) - } - - val repair: () -> Unit = { - for (set in actionsGroupByHour) { - set.clear() - } - } - - loadActions(listScheduledActions(), currentTime, load, repair) - } - - val currentTime = ZonedDateTime.now() - val currentDay = currentTime.dayOfMonth - val currentHour = currentTime.hour - if (currentDay != recordDay) { - recordDay = currentDay - recordHour = currentHour - loadDayActions(currentTime) - loadHourActions(currentTime) - } else if (currentHour != recordHour) { - recordHour = currentHour - loadHourActions(currentTime) - } - - then(currentTime) - } - private fun parseToZonedDateTime( scheduleType: ScheduledActionData.ScheduleType, scheduleContent: String,