fix(ActionScheduler): reload day/hour action buckets on time changes via checkTimeAndLoad, and reorganize functions

This commit is contained in:
2026-02-09 00:03:21 +08:00
parent 882ec43f2b
commit b05b665960

View File

@@ -22,6 +22,7 @@ import work.slhaf.partner.core.action.entity.ScheduledActionData
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput
import java.io.Closeable import java.io.Closeable
import java.time.Duration
import java.time.ZonedDateTime import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
import java.util.stream.Collectors import java.util.stream.Collectors
@@ -85,7 +86,8 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
private val actionsGroupByHour = Array<MutableSet<ScheduledActionData>>(24) { mutableSetOf() } private val actionsGroupByHour = Array<MutableSet<ScheduledActionData>>(24) { mutableSetOf() }
private val wheel = Array<MutableSet<ScheduledActionData>>(60 * 60) { mutableSetOf() } private val wheel = Array<MutableSet<ScheduledActionData>>(60 * 60) { mutableSetOf() }
private var currentHour: Int = 0 private var recordHour: Int = -1
private var recordDay: Int = -1
private val state = MutableStateFlow(WheelState.SLEEPING) private val state = MutableStateFlow(WheelState.SLEEPING)
private val wheelActionsLock = Mutex() private val wheelActionsLock = Mutex()
@@ -98,9 +100,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
* 根据 primaryActions 建立时间轮,并只加载当天任务,同时启动 tick 线程 * 根据 primaryActions 建立时间轮,并只加载当天任务,同时启动 tick 线程
*/ */
init { init {
// 加载 primaryActions 进 actionsGroupByHour // 启动时间轮
loadDayActions()
// 依据当前时间移动至合适的 hour 并启动时间轮
launchWheel() launchWheel()
} }
@@ -109,19 +109,21 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
return return
} }
wheelActionsLock.withLock { val parseToZonedDateTime = parseToZonedDateTime(
val parseToZonedDateTime = parseToZonedDateTime( actionData.scheduleType,
actionData.scheduleType, actionData.scheduleContent,
actionData.scheduleContent, ZonedDateTime.now()
ZonedDateTime.now() ) ?: run {
) ?: run { logFailedStatus(actionData)
logFailedStatus(actionData) return
return }
}
checkTimeAndLoad()
wheelActionsLock.withLock {
val hour = parseToZonedDateTime.hour val hour = parseToZonedDateTime.hour
actionsGroupByHour[hour].add(actionData) actionsGroupByHour[hour].add(actionData)
if (currentHour == hour) { if (recordHour == hour) {
state.value = WheelState.ACTIVE state.value = WheelState.ACTIVE
} }
} }
@@ -137,7 +139,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
val bucket = wheel[i] val bucket = wheel[i]
if (bucket.isNotEmpty()) { if (bucket.isNotEmpty()) {
toTrigger.addAll(bucket) toTrigger.addAll(bucket)
actionsGroupByHour[currentHour].removeAll(bucket) actionsGroupByHour[recordHour].removeAll(bucket)
bucket.clear() // 避免重复触发 bucket.clear() // 避免重复触发
} }
} }
@@ -149,104 +151,85 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
} }
} }
suspend fun CoroutineScope.wheel() {
// 计算当前距离时内下次任务的剩余时间, 秒级推进
val now = ZonedDateTime.now()
var tick = now.minute * 60 + now.second
var lastTickAdvanceTime = System.nanoTime()
while (isActive) {
// tick 推进nano -> second
val current = System.nanoTime()
val step = ((current - lastTickAdvanceTime) / 1_000_000_000L).toInt()
if (step <= 0) {
delay(50) // 避免空转
continue
}
val previousTick = tick
tick = (tick + step).coerceAtMost(wheel.lastIndex)
lastTickAdvanceTime = current
// 取当前 tick、推进过程中经过的 tick 对应任务,异步启动
tickOnTrigger(tick, previousTick)
// 推进到顶时停止循环、当前时无任务时停止循环
if (tick >= wheel.lastIndex || actionsGroupByHour[recordHour].isEmpty()) {
break
}
// 休眠一秒
delay(1000)
}
}
suspend fun wait(currentTime: ZonedDateTime) {
val seconds = Duration.between(
currentTime, currentTime.truncatedTo(ChronoUnit.HOURS).plusHours(1)
).toMillis()
withTimeoutOrNull(seconds) {
state.first { it == WheelState.ACTIVE }
}
state.value = WheelState.SLEEPING
}
timeWheelScope.launch { timeWheelScope.launch {
while (isActive) { while (isActive) {
// 判断是否该步入下一小时 // 判断是否该步入下一小时
val actionsLoadingTime = loadHourActions() val currentTime = checkTimeAndLoad()
currentHour = actionsLoadingTime.hour
// 如果该时无任务则等待,插入事件可提前唤醒 // 如果该时无任务则等待,插入事件可提前唤醒
if (actionsGroupByHour[currentHour].isEmpty()) { if (actionsGroupByHour[recordHour].isEmpty()) {
// 计算距离下一小时的时间,等待 // 计算距离下一小时的时间,等待
val seconds = java.time.Duration.between( wait(currentTime)
actionsLoadingTime, actionsLoadingTime.truncatedTo(ChronoUnit.HOURS).plusHours(1)
).toMillis()
withTimeoutOrNull(seconds) {
state.first { it == WheelState.ACTIVE }
}
state.value = WheelState.SLEEPING
continue continue
} }
// 唤醒进行时间轮循环 // 唤醒进行时间轮循环
// 计算当前距离时内下次任务的剩余时间, 秒级推进 wheel()
val now = ZonedDateTime.now()
var tick = now.minute * 60 + now.second
var lastTickAdvanceTime = System.nanoTime()
while (isActive) {
// tick 推进nano -> second
val current = System.nanoTime()
val step = ((current - lastTickAdvanceTime) / 1_000_000_000L).toInt()
if (step <= 0) {
delay(50) // 避免空转
continue
}
val previousTick = tick
tick = (tick + step).coerceAtMost(wheel.lastIndex)
lastTickAdvanceTime = current
// 取当前 tick、推进过程中经过的 tick 对应任务,异步启动
tickOnTrigger(tick, previousTick)
// 推进到顶时停止循环、当前时无任务时停止循环
if (tick >= wheel.lastIndex || actionsGroupByHour[currentHour].isEmpty()) {
break
}
// 休眠一秒
delay(1000)
}
} }
} }
} }
private suspend fun loadHourActions(): ZonedDateTime { suspend fun checkTimeAndLoad(): ZonedDateTime {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> val currentTime = ZonedDateTime.now()
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second val currentDay = currentTime.dayOfMonth
wheel[secondsTime].add(actionData) if (currentDay != recordDay) {
} recordDay = currentDay
recordHour = currentTime.hour
val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after -> loadDayActions()
before.hour != after.hour loadHourActions()
} } else if (currentTime.hour != recordHour) {
recordHour = currentTime.hour
val repair: () -> Unit = { loadHourActions()
for (set in wheel) {
set.clear()
}
}
wheelActionsLock.withLock {
return loadActions(load, invalid, repair)
} }
return currentTime
} }
private fun loadDayActions() { fun loadActions(
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
}
val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after ->
before.dayOfYear != after.dayOfYear
}
val repair: () -> Unit = {
for (set in actionsGroupByHour) {
set.clear()
}
}
loadActions(load, invalid, repair)
}
private fun loadActions(
load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit, load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit,
invalid: (before: ZonedDateTime, after: ZonedDateTime) -> Boolean, invalid: (before: ZonedDateTime, after: ZonedDateTime) -> Boolean,
repair: () -> Unit repair: () -> Unit
): ZonedDateTime { ) {
val runLoading = { val runLoading = {
val now = ZonedDateTime.now() val now = ZonedDateTime.now()
for (actionData in primaryActions) { for (actionData in primaryActions) {
@@ -271,7 +254,47 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
repair() repair()
runLoading() runLoading()
} }
return after }
suspend fun loadHourActions() {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(actionData)
}
val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after ->
before.hour != after.hour
}
val repair: () -> Unit = {
for (set in wheel) {
set.clear()
}
}
wheelActionsLock.withLock {
loadActions(load, invalid, repair)
}
}
suspend fun loadDayActions() {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
}
val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after ->
before.dayOfMonth != after.dayOfMonth
}
val repair: () -> Unit = {
for (set in actionsGroupByHour) {
set.clear()
}
}
wheelActionsLock.withLock {
loadActions(load, invalid, repair)
}
} }