fix(ActionScheduler): stabilize wheel tick pacing and run trigger scan before hour/day refresh

This commit is contained in:
2026-02-09 21:04:48 +08:00
parent e2bd9eb0af
commit 69d9f04f11

View File

@@ -158,57 +158,59 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
} }
suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime, primaryTickAdvanceTime: Long) { suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime, primaryTickAdvanceTime: Long) {
log.debug("Wheel launched at {}, current nanoTime: {}", launchingTime, primaryTickAdvanceTime)
// 计算当前距离时内下次任务的剩余时间, 秒级推进
val launchingHour = launchingTime.hour val launchingHour = launchingTime.hour
var tick = launchingTime.minute * 60 + launchingTime.second var tick = launchingTime.minute * 60 + launchingTime.second
var lastTickAdvanceTime = primaryTickAdvanceTime
// 让节拍器从“启动时刻的下一秒”开始(避免立即 step=0
var nextTickNanos = primaryTickAdvanceTime + 1_000_000_000L
while (isActive) { while (isActive) {
// tick 推进nano -> second // 1) 计算落后多少秒:至少 1正常推进也可能 >1追赶
val current = System.nanoTime() val now0 = System.nanoTime()
val step = ((current - lastTickAdvanceTime) / 1_000_000_000L).toInt() val lagNanos = now0 - nextTickNanos
val step = if (lagNanos < 0) 1 else (lagNanos / 1_000_000_000L).toInt() + 1
val previousTick = tick val previousTick = tick
tick = (tick + step).coerceAtMost(wheel.lastIndex) tick = (tick + step).coerceAtMost(wheel.lastIndex)
lastTickAdvanceTime = current
// 2) 推进节拍器:按“理论秒”前进 step 次
nextTickNanos += step.toLong() * 1_000_000_000L
var shouldBreak = false var shouldBreak = false
var toTrigger: Set<ScheduledActionData>? = null var toTrigger: Set<ScheduledActionData>? = null
checkThenExecute {
checkThenExecute(false) {
if (it.hour != launchingHour) { if (it.hour != launchingHour) {
// recordHout 已更新,此时后执行无意义,不等于启动时的 hour则需要停止
shouldBreak = true shouldBreak = true
toTrigger = collectToTrigger(wheel.lastIndex, previousTick, launchingHour)
return@checkThenExecute return@checkThenExecute
} }
toTrigger = collectToTrigger(tick, previousTick, it.hour) toTrigger = collectToTrigger(tick, previousTick, launchingHour)
// 推进到顶时停止循环、当前时无任务时停止循环 if (tick >= wheel.lastIndex || actionsGroupByHour[launchingHour].isEmpty()) {
if (tick >= wheel.lastIndex || actionsGroupByHour[it.hour].isEmpty()) {
state.value = WheelState.SLEEPING state.value = WheelState.SLEEPING
shouldBreak = true shouldBreak = true
return@checkThenExecute return@checkThenExecute
} }
// 取当前 tick、推进过程中经过的 tick 对应任务,异步启动
} }
toTrigger?.let {
if (it.isEmpty()) return@let
toTrigger?.takeIf { it.isNotEmpty() }?.let {
onTrigger(it) onTrigger(it)
log.debug("Executing action at hour {} tick {}", launchingHour, tick) log.debug("Executing action at hour {} tick {}", launchingHour, tick)
} }
// 休眠一秒
delay(1000)
if (shouldBreak) { if (shouldBreak) {
log.debug("Wheel stopped at tick {}", tick) log.debug("Wheel stopped at tick {}", tick)
break break
} }
// 3) 精确睡到下一次理论 tick用最新 nanoTime
val now1 = System.nanoTime()
val sleepNanos = nextTickNanos - now1
if (sleepNanos > 0) {
delay(sleepNanos / 1_000_000L) // 毫秒级 delay 足够;剩余 nanos 不必忙等
}
} }
} }
@@ -253,80 +255,90 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
} }
} }
suspend fun checkThenExecute(then: (currentTime: ZonedDateTime) -> Unit) = wheelActionsLock.withLock { suspend fun checkThenExecute(finallyToExecute: Boolean = true, then: (currentTime: ZonedDateTime) -> Unit) =
fun loadActions( wheelActionsLock.withLock {
source: Set<ScheduledActionData>, fun loadActions(
now: ZonedDateTime, source: Set<ScheduledActionData>,
load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit, now: ZonedDateTime,
repair: () -> Unit load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit,
) { repair: () -> Unit
val runLoading = { ) {
for (actionData in source) { val runLoading = {
val nextExecutingTime = for (actionData in source) {
parseToZonedDateTime( val nextExecutingTime =
actionData.scheduleType, parseToZonedDateTime(
actionData.scheduleContent, actionData.scheduleType,
now actionData.scheduleContent,
) ?: run { now
logFailedStatus(actionData) ) ?: run {
continue logFailedStatus(actionData)
} continue
}
load(nextExecutingTime, actionData) load(nextExecutingTime, actionData)
}
}
repair()
runLoading()
}
fun loadHourActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(actionData)
log.debug("Action loaded to hour: {}", actionData)
}
val repair: () -> Unit = {
for (set in wheel) {
set.clear()
}
}
loadActions(actionsGroupByHour[currentTime.hour], currentTime, load, repair)
}
fun loadDayActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
log.debug("Action loaded to day: {}", actionData)
}
val repair: () -> Unit = {
for (set in actionsGroupByHour) {
set.clear()
}
}
loadActions(listScheduledActions(), currentTime, load, repair)
}
fun refreshIfNeeded(now: ZonedDateTime) {
val d = now.dayOfMonth
val h = now.hour
if (d != recordDay) {
recordDay = d
recordHour = h
loadDayActions(now)
loadHourActions(now)
} else if (h != recordHour) {
recordHour = h
loadHourActions(now)
} }
} }
repair() val now = ZonedDateTime.now()
runLoading()
}
fun loadHourActions(currentTime: ZonedDateTime) { if (finallyToExecute) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> refreshIfNeeded(now)
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second then(now)
wheel[secondsTime].add(actionData) } else {
log.debug("Action loaded to hour: {}", actionData) then(now)
refreshIfNeeded(now)
} }
val repair: () -> Unit = {
for (set in wheel) {
set.clear()
}
}
loadActions(actionsGroupByHour[currentTime.hour], currentTime, load, repair)
} }
fun loadDayActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
log.debug("Action loaded to day: {}", actionData)
}
val repair: () -> Unit = {
for (set in actionsGroupByHour) {
set.clear()
}
}
loadActions(listScheduledActions(), currentTime, load, repair)
}
val currentTime = ZonedDateTime.now()
val currentDay = currentTime.dayOfMonth
val currentHour = currentTime.hour
if (currentDay != recordDay) {
recordDay = currentDay
recordHour = currentHour
loadDayActions(currentTime)
loadHourActions(currentTime)
} else if (currentHour != recordHour) {
recordHour = currentHour
loadHourActions(currentTime)
}
then(currentTime)
}
private fun parseToZonedDateTime( private fun parseToZonedDateTime(
scheduleType: ScheduledActionData.ScheduleType, scheduleType: ScheduledActionData.ScheduleType,
scheduleContent: String, scheduleContent: String,