diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt index 06d76d12..ee02d6cb 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt @@ -22,6 +22,7 @@ import work.slhaf.partner.core.action.entity.ScheduledActionData import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput import java.io.Closeable +import java.time.Duration import java.time.ZonedDateTime import java.time.temporal.ChronoUnit import java.util.stream.Collectors @@ -85,7 +86,8 @@ class ActionScheduler : AgentRunningSubModule, Void>() private val actionsGroupByHour = Array>(24) { mutableSetOf() } private val wheel = Array>(60 * 60) { mutableSetOf() } - private var currentHour: Int = 0 + private var recordHour: Int = -1 + private var recordDay: Int = -1 private val state = MutableStateFlow(WheelState.SLEEPING) private val wheelActionsLock = Mutex() @@ -98,9 +100,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() * 根据 primaryActions 建立时间轮,并只加载当天任务,同时启动 tick 线程 */ init { - // 加载 primaryActions 进 actionsGroupByHour - loadDayActions() - // 依据当前时间移动至合适的 hour 并启动时间轮 + // 启动时间轮 launchWheel() } @@ -109,19 +109,21 @@ class ActionScheduler : AgentRunningSubModule, Void>() return } - wheelActionsLock.withLock { - val parseToZonedDateTime = parseToZonedDateTime( - actionData.scheduleType, - actionData.scheduleContent, - ZonedDateTime.now() - ) ?: run { - logFailedStatus(actionData) - return - } + val parseToZonedDateTime = parseToZonedDateTime( + actionData.scheduleType, + actionData.scheduleContent, + ZonedDateTime.now() + ) ?: run { + logFailedStatus(actionData) + return + } + checkTimeAndLoad() + + wheelActionsLock.withLock { val hour = parseToZonedDateTime.hour actionsGroupByHour[hour].add(actionData) - if (currentHour == hour) { + if (recordHour == hour) { state.value = WheelState.ACTIVE } } @@ -137,7 +139,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() val bucket = wheel[i] if (bucket.isNotEmpty()) { toTrigger.addAll(bucket) - actionsGroupByHour[currentHour].removeAll(bucket) + actionsGroupByHour[recordHour].removeAll(bucket) bucket.clear() // 避免重复触发 } } @@ -149,104 +151,85 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } + suspend fun CoroutineScope.wheel() { + // 计算当前距离时内下次任务的剩余时间, 秒级推进 + val now = ZonedDateTime.now() + var tick = now.minute * 60 + now.second + var lastTickAdvanceTime = System.nanoTime() + while (isActive) { + // tick 推进(nano -> second) + val current = System.nanoTime() + val step = ((current - lastTickAdvanceTime) / 1_000_000_000L).toInt() + if (step <= 0) { + delay(50) // 避免空转 + continue + } + + val previousTick = tick + tick = (tick + step).coerceAtMost(wheel.lastIndex) + lastTickAdvanceTime = current + + // 取当前 tick、推进过程中经过的 tick 对应任务,异步启动 + tickOnTrigger(tick, previousTick) + + // 推进到顶时停止循环、当前时无任务时停止循环 + if (tick >= wheel.lastIndex || actionsGroupByHour[recordHour].isEmpty()) { + break + } + + // 休眠一秒 + delay(1000) + } + } + + suspend fun wait(currentTime: ZonedDateTime) { + val seconds = Duration.between( + currentTime, currentTime.truncatedTo(ChronoUnit.HOURS).plusHours(1) + ).toMillis() + withTimeoutOrNull(seconds) { + state.first { it == WheelState.ACTIVE } + } + state.value = WheelState.SLEEPING + } + timeWheelScope.launch { while (isActive) { // 判断是否该步入下一小时 - val actionsLoadingTime = loadHourActions() - currentHour = actionsLoadingTime.hour + val currentTime = checkTimeAndLoad() // 如果该时无任务则等待,插入事件可提前唤醒 - if (actionsGroupByHour[currentHour].isEmpty()) { + if (actionsGroupByHour[recordHour].isEmpty()) { // 计算距离下一小时的时间,等待 - val seconds = java.time.Duration.between( - actionsLoadingTime, actionsLoadingTime.truncatedTo(ChronoUnit.HOURS).plusHours(1) - ).toMillis() - withTimeoutOrNull(seconds) { - state.first { it == WheelState.ACTIVE } - } - state.value = WheelState.SLEEPING + wait(currentTime) continue } // 唤醒进行时间轮循环 - // 计算当前距离时内下次任务的剩余时间, 秒级推进 - val now = ZonedDateTime.now() - var tick = now.minute * 60 + now.second - var lastTickAdvanceTime = System.nanoTime() - while (isActive) { - // tick 推进(nano -> second) - val current = System.nanoTime() - val step = ((current - lastTickAdvanceTime) / 1_000_000_000L).toInt() - if (step <= 0) { - delay(50) // 避免空转 - continue - } - - val previousTick = tick - tick = (tick + step).coerceAtMost(wheel.lastIndex) - lastTickAdvanceTime = current - - // 取当前 tick、推进过程中经过的 tick 对应任务,异步启动 - tickOnTrigger(tick, previousTick) - - // 推进到顶时停止循环、当前时无任务时停止循环 - if (tick >= wheel.lastIndex || actionsGroupByHour[currentHour].isEmpty()) { - break - } - - // 休眠一秒 - delay(1000) - } + wheel() } - } } - private suspend fun loadHourActions(): ZonedDateTime { - val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> - val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second - wheel[secondsTime].add(actionData) - } - - val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after -> - before.hour != after.hour - } - - val repair: () -> Unit = { - for (set in wheel) { - set.clear() - } - } - - wheelActionsLock.withLock { - return loadActions(load, invalid, repair) + suspend fun checkTimeAndLoad(): ZonedDateTime { + val currentTime = ZonedDateTime.now() + val currentDay = currentTime.dayOfMonth + if (currentDay != recordDay) { + recordDay = currentDay + recordHour = currentTime.hour + loadDayActions() + loadHourActions() + } else if (currentTime.hour != recordHour) { + recordHour = currentTime.hour + loadHourActions() } + return currentTime } - private fun loadDayActions() { - val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData -> - actionsGroupByHour[latestExecutingTime.hour].add(actionData) - } - - val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after -> - before.dayOfYear != after.dayOfYear - } - - val repair: () -> Unit = { - for (set in actionsGroupByHour) { - set.clear() - } - } - - loadActions(load, invalid, repair) - - } - - private fun loadActions( + fun loadActions( load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit, invalid: (before: ZonedDateTime, after: ZonedDateTime) -> Boolean, repair: () -> Unit - ): ZonedDateTime { + ) { val runLoading = { val now = ZonedDateTime.now() for (actionData in primaryActions) { @@ -271,7 +254,47 @@ class ActionScheduler : AgentRunningSubModule, Void>() repair() runLoading() } - return after + } + + suspend fun loadHourActions() { + val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> + val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second + wheel[secondsTime].add(actionData) + } + + val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after -> + before.hour != after.hour + } + + val repair: () -> Unit = { + for (set in wheel) { + set.clear() + } + } + + wheelActionsLock.withLock { + loadActions(load, invalid, repair) + } + } + + suspend fun loadDayActions() { + val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData -> + actionsGroupByHour[latestExecutingTime.hour].add(actionData) + } + + val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after -> + before.dayOfMonth != after.dayOfMonth + } + + val repair: () -> Unit = { + for (set in actionsGroupByHour) { + set.clear() + } + } + + wheelActionsLock.withLock { + loadActions(load, invalid, repair) + } }