fix(ActionScheduler): unify time check/loading under checkThenExecute and guard wheel loop with launch-hour consistency

This commit is contained in:
2026-02-09 14:57:13 +08:00
parent 227c735667
commit 9f479c5f6f

View File

@@ -112,32 +112,30 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
return return
} }
checkThenExecute {
val parseToZonedDateTime = parseToZonedDateTime( val parseToZonedDateTime = parseToZonedDateTime(
actionData.scheduleType, actionData.scheduleType,
actionData.scheduleContent, actionData.scheduleContent,
ZonedDateTime.now() it
) ?: run { ) ?: run {
logFailedStatus(actionData) logFailedStatus(actionData)
return return@checkThenExecute
} }
checkTimeAndLoad()
wheelActionsLock.withLock {
val hour = parseToZonedDateTime.hour val hour = parseToZonedDateTime.hour
actionsGroupByHour[hour].add(actionData) actionsGroupByHour[hour].add(actionData)
if (recordHour == hour) { if (recordHour == hour) {
state.value = WheelState.ACTIVE state.value = WheelState.ACTIVE
} }
} }
} }
private fun launchWheel() { private fun launchWheel() {
suspend fun tickOnTrigger(tick: Int, previousTick: Int) { fun tickOnTrigger(tick: Int, previousTick: Int) {
if (tick > previousTick) { if (tick > previousTick) {
val toTrigger = linkedSetOf<ScheduledActionData>() val toTrigger = linkedSetOf<ScheduledActionData>()
wheelActionsLock.withLock {
for (i in (previousTick + 1)..tick) { for (i in (previousTick + 1)..tick) {
val bucket = wheel[i] val bucket = wheel[i]
if (bucket.isNotEmpty()) { if (bucket.isNotEmpty()) {
@@ -146,7 +144,6 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
bucket.clear() // 避免重复触发 bucket.clear() // 避免重复触发
} }
} }
}
if (toTrigger.isNotEmpty()) { if (toTrigger.isNotEmpty()) {
onTrigger(toTrigger) onTrigger(toTrigger)
@@ -154,10 +151,10 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
} }
} }
suspend fun CoroutineScope.wheel() { suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime) {
// 计算当前距离时内下次任务的剩余时间, 秒级推进 // 计算当前距离时内下次任务的剩余时间, 秒级推进
val now = ZonedDateTime.now() val launchingHour = launchingTime.hour
var tick = now.minute * 60 + now.second var tick = launchingTime.minute * 60 + launchingTime.second
var lastTickAdvanceTime = System.nanoTime() var lastTickAdvanceTime = System.nanoTime()
while (isActive) { while (isActive) {
// tick 推进nano -> second // tick 推进nano -> second
@@ -172,11 +169,24 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
tick = (tick + step).coerceAtMost(wheel.lastIndex) tick = (tick + step).coerceAtMost(wheel.lastIndex)
lastTickAdvanceTime = current lastTickAdvanceTime = current
// 取当前 tick、推进过程中经过的 tick 对应任务,异步启动 var shouldBreak: Boolean? = null
tickOnTrigger(tick, previousTick) checkThenExecute {
if (recordHour != launchingHour) {
// recordHour 在外部受到更新,不等于启动时的 hour则需要停止
shouldBreak = true
return@checkThenExecute
}
// 推进到顶时停止循环、当前时无任务时停止循环 // 推进到顶时停止循环、当前时无任务时停止循环
if (tick >= wheel.lastIndex || actionsGroupByHour[recordHour].isEmpty()) { if (tick >= wheel.lastIndex || actionsGroupByHour[recordHour].isEmpty()) {
shouldBreak = true
return@checkThenExecute
}
// 取当前 tick、推进过程中经过的 tick 对应任务,异步启动
tickOnTrigger(tick, previousTick)
}
if (shouldBreak!!) {
break break
} }
@@ -189,6 +199,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
val seconds = Duration.between( val seconds = Duration.between(
currentTime, currentTime.truncatedTo(ChronoUnit.HOURS).plusHours(1) currentTime, currentTime.truncatedTo(ChronoUnit.HOURS).plusHours(1)
).toMillis() ).toMillis()
// withTimeoutOrNull 内部已处理 seconds 小于 0 的情况
withTimeoutOrNull(seconds) { withTimeoutOrNull(seconds) {
state.first { it == WheelState.ACTIVE } state.first { it == WheelState.ACTIVE }
} }
@@ -198,22 +209,28 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
timeWheelScope.launch { timeWheelScope.launch {
while (isActive) { while (isActive) {
// 判断是否该步入下一小时 // 判断是否该步入下一小时
val currentTime = checkTimeAndLoad() var shouldWait: Boolean? = null
var currentTime: ZonedDateTime? = null
checkThenExecute {
currentTime = it
shouldWait = actionsGroupByHour[recordHour].isEmpty()
}
// 如果该时无任务则等待,插入事件可提前唤醒 // 如果该时无任务则等待,插入事件可提前唤醒
if (actionsGroupByHour[recordHour].isEmpty()) { if (shouldWait!!) {
// 计算距离下一小时的时间,等待 // 计算距离下一小时的时间,等待
wait(currentTime) currentTime?.let { wait(it) }
continue continue
} }
// 唤醒进行时间轮循环 // 唤醒进行时间轮循环
wheel() wheel(currentTime!!)
} }
} }
} }
suspend fun checkTimeAndLoad(): ZonedDateTime { suspend fun checkThenExecute(then: (currentTime: ZonedDateTime) -> Unit) =
wheelActionsLock.withLock {
val currentTime = ZonedDateTime.now() val currentTime = ZonedDateTime.now()
val currentDay = currentTime.dayOfMonth val currentDay = currentTime.dayOfMonth
if (currentDay != recordDay) { if (currentDay != recordDay) {
@@ -225,7 +242,8 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
recordHour = currentTime.hour recordHour = currentTime.hour
loadHourActions() loadHourActions()
} }
return currentTime
then(currentTime)
} }
fun loadActions( fun loadActions(
@@ -259,7 +277,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
} }
} }
suspend fun loadHourActions() { fun loadHourActions() {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(actionData) wheel[secondsTime].add(actionData)
@@ -275,12 +293,10 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
} }
} }
wheelActionsLock.withLock {
loadActions(load, invalid, repair) loadActions(load, invalid, repair)
} }
}
suspend fun loadDayActions() { fun loadDayActions() {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData -> val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData) actionsGroupByHour[latestExecutingTime.hour].add(actionData)
} }
@@ -295,12 +311,9 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
} }
} }
wheelActionsLock.withLock {
loadActions(load, invalid, repair) loadActions(load, invalid, repair)
} }
}
private fun parseToZonedDateTime( private fun parseToZonedDateTime(
scheduleType: ScheduledActionData.ScheduleType, scheduleType: ScheduledActionData.ScheduleType,
scheduleContent: String, scheduleContent: String,