fix(ActionScheduler): use checkThenExecute current hour consistently and trigger wheel tasks outside lock

This commit is contained in:
2026-02-09 15:56:12 +08:00
parent 9f479c5f6f
commit 650f9b27a1

View File

@@ -124,7 +124,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
val hour = parseToZonedDateTime.hour val hour = parseToZonedDateTime.hour
actionsGroupByHour[hour].add(actionData) actionsGroupByHour[hour].add(actionData)
if (recordHour == hour) { if (it.hour == hour) {
state.value = WheelState.ACTIVE state.value = WheelState.ACTIVE
} }
@@ -133,22 +133,20 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
private fun launchWheel() { private fun launchWheel() {
fun tickOnTrigger(tick: Int, previousTick: Int) { fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set<ScheduledActionData>? {
if (tick > previousTick) { if (tick > previousTick) {
val toTrigger = linkedSetOf<ScheduledActionData>() val toTrigger = mutableSetOf<ScheduledActionData>()
for (i in (previousTick + 1)..tick) { for (i in (previousTick + 1)..tick) {
val bucket = wheel[i] val bucket = wheel[i]
if (bucket.isNotEmpty()) { if (bucket.isNotEmpty()) {
toTrigger.addAll(bucket) toTrigger.addAll(bucket)
actionsGroupByHour[recordHour].removeAll(bucket) actionsGroupByHour[triggerHour].removeAll(bucket)
bucket.clear() // 避免重复触发 bucket.clear() // 避免重复触发
} }
} }
return toTrigger
if (toTrigger.isNotEmpty()) {
onTrigger(toTrigger)
}
} }
return null
} }
suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime) { suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime) {
@@ -170,22 +168,27 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
lastTickAdvanceTime = current lastTickAdvanceTime = current
var shouldBreak: Boolean? = null var shouldBreak: Boolean? = null
var toTrigger: Set<ScheduledActionData>? = null
checkThenExecute { checkThenExecute {
if (recordHour != launchingHour) { if (it.hour != launchingHour) {
// recordHour 在外部受到更新,不等于启动时的 hour则需要停止 // recordHout 已更新,此时后执行无意义,不等于启动时的 hour则需要停止
shouldBreak = true shouldBreak = true
return@checkThenExecute return@checkThenExecute
} }
toTrigger = collectToTrigger(tick, previousTick, it.hour)
// 推进到顶时停止循环、当前时无任务时停止循环 // 推进到顶时停止循环、当前时无任务时停止循环
if (tick >= wheel.lastIndex || actionsGroupByHour[recordHour].isEmpty()) { if (tick >= wheel.lastIndex || actionsGroupByHour[it.hour].isEmpty()) {
state.value = WheelState.SLEEPING
shouldBreak = true shouldBreak = true
return@checkThenExecute return@checkThenExecute
} }
// 取当前 tick、推进过程中经过的 tick 对应任务,异步启动 // 取当前 tick、推进过程中经过的 tick 对应任务,异步启动
tickOnTrigger(tick, previousTick)
} }
toTrigger?.let { onTrigger(it) }
if (shouldBreak!!) { if (shouldBreak!!) {
break break
} }
@@ -203,7 +206,6 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
withTimeoutOrNull(seconds) { withTimeoutOrNull(seconds) {
state.first { it == WheelState.ACTIVE } state.first { it == WheelState.ACTIVE }
} }
state.value = WheelState.SLEEPING
} }
timeWheelScope.launch { timeWheelScope.launch {
@@ -213,7 +215,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
var currentTime: ZonedDateTime? = null var currentTime: ZonedDateTime? = null
checkThenExecute { checkThenExecute {
currentTime = it currentTime = it
shouldWait = actionsGroupByHour[recordHour].isEmpty() shouldWait = actionsGroupByHour[it.hour].isEmpty()
} }
// 如果该时无任务则等待,插入事件可提前唤醒 // 如果该时无任务则等待,插入事件可提前唤醒
@@ -229,89 +231,74 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
} }
} }
suspend fun checkThenExecute(then: (currentTime: ZonedDateTime) -> Unit) = suspend fun checkThenExecute(then: (currentTime: ZonedDateTime) -> Unit) = wheelActionsLock.withLock {
wheelActionsLock.withLock { fun loadActions(
val currentTime = ZonedDateTime.now() now: ZonedDateTime,
val currentDay = currentTime.dayOfMonth load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit,
if (currentDay != recordDay) { repair: () -> Unit
recordDay = currentDay ) {
recordHour = currentTime.hour val runLoading = {
loadDayActions() for (actionData in listScheduledActions()) {
loadHourActions() val latestExecutingTime =
} else if (currentTime.hour != recordHour) { parseToZonedDateTime(
recordHour = currentTime.hour actionData.scheduleType,
loadHourActions() actionData.scheduleContent,
now
) ?: run {
logFailedStatus(actionData)
continue
}
load(latestExecutingTime, actionData)
}
} }
then(currentTime)
}
fun loadActions(
load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit,
invalid: (before: ZonedDateTime, after: ZonedDateTime) -> Boolean,
repair: () -> Unit
) {
val runLoading = {
val now = ZonedDateTime.now()
for (actionData in listScheduledActions()) {
val latestExecutingTime =
parseToZonedDateTime(
actionData.scheduleType,
actionData.scheduleContent,
now
) ?: run {
logFailedStatus(actionData)
continue
}
load(latestExecutingTime, actionData)
}
}
val before = ZonedDateTime.now()
runLoading()
val after = ZonedDateTime.now()
if (invalid(before, after)) {
repair() repair()
runLoading() runLoading()
} }
}
fun loadHourActions() { fun loadHourActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(actionData) wheel[secondsTime].add(actionData)
}
val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after ->
before.hour != after.hour
}
val repair: () -> Unit = {
for (set in wheel) {
set.clear()
} }
}
loadActions(load, invalid, repair) val repair: () -> Unit = {
} for (set in wheel) {
set.clear()
fun loadDayActions() { }
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
}
val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after ->
before.dayOfMonth != after.dayOfMonth
}
val repair: () -> Unit = {
for (set in actionsGroupByHour) {
set.clear()
} }
loadActions(currentTime, load, repair)
} }
loadActions(load, invalid, repair) fun loadDayActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
}
val repair: () -> Unit = {
for (set in actionsGroupByHour) {
set.clear()
}
}
loadActions(currentTime, load, repair)
}
val currentTime = ZonedDateTime.now()
val currentDay = currentTime.dayOfMonth
if (currentDay != recordDay) {
recordDay = currentDay
recordHour = currentTime.hour
loadDayActions(currentTime)
loadHourActions(currentTime)
} else if (currentTime.hour != recordHour) {
recordHour = currentTime.hour
loadHourActions(currentTime)
}
then(currentTime)
} }
private fun parseToZonedDateTime( private fun parseToZonedDateTime(