diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt index 0796f60f..4312bafa 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt @@ -112,39 +112,36 @@ class ActionScheduler : AgentRunningSubModule, Void>() return } - val parseToZonedDateTime = parseToZonedDateTime( - actionData.scheduleType, - actionData.scheduleContent, - ZonedDateTime.now() - ) ?: run { - logFailedStatus(actionData) - return - } + checkThenExecute { + val parseToZonedDateTime = parseToZonedDateTime( + actionData.scheduleType, + actionData.scheduleContent, + it + ) ?: run { + logFailedStatus(actionData) + return@checkThenExecute + } - checkTimeAndLoad() - - wheelActionsLock.withLock { val hour = parseToZonedDateTime.hour actionsGroupByHour[hour].add(actionData) if (recordHour == hour) { state.value = WheelState.ACTIVE } + } } private fun launchWheel() { - suspend fun tickOnTrigger(tick: Int, previousTick: Int) { + fun tickOnTrigger(tick: Int, previousTick: Int) { if (tick > previousTick) { val toTrigger = linkedSetOf() - wheelActionsLock.withLock { - for (i in (previousTick + 1)..tick) { - val bucket = wheel[i] - if (bucket.isNotEmpty()) { - toTrigger.addAll(bucket) - actionsGroupByHour[recordHour].removeAll(bucket) - bucket.clear() // 避免重复触发 - } + for (i in (previousTick + 1)..tick) { + val bucket = wheel[i] + if (bucket.isNotEmpty()) { + toTrigger.addAll(bucket) + actionsGroupByHour[recordHour].removeAll(bucket) + bucket.clear() // 避免重复触发 } } @@ -154,10 +151,10 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - suspend fun CoroutineScope.wheel() { + suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime) { // 计算当前距离时内下次任务的剩余时间, 秒级推进 - val now = ZonedDateTime.now() - var tick = now.minute * 60 + now.second + val launchingHour = launchingTime.hour + var tick = launchingTime.minute * 60 + launchingTime.second var lastTickAdvanceTime = System.nanoTime() while (isActive) { // tick 推进(nano -> second) @@ -172,11 +169,24 @@ class ActionScheduler : AgentRunningSubModule, Void>() tick = (tick + step).coerceAtMost(wheel.lastIndex) lastTickAdvanceTime = current - // 取当前 tick、推进过程中经过的 tick 对应任务,异步启动 - tickOnTrigger(tick, previousTick) + var shouldBreak: Boolean? = null + checkThenExecute { - // 推进到顶时停止循环、当前时无任务时停止循环 - if (tick >= wheel.lastIndex || actionsGroupByHour[recordHour].isEmpty()) { + if (recordHour != launchingHour) { + // recordHour 在外部受到更新,不等于启动时的 hour,则需要停止 + shouldBreak = true + return@checkThenExecute + } + // 推进到顶时停止循环、当前时无任务时停止循环 + if (tick >= wheel.lastIndex || actionsGroupByHour[recordHour].isEmpty()) { + shouldBreak = true + return@checkThenExecute + } + + // 取当前 tick、推进过程中经过的 tick 对应任务,异步启动 + tickOnTrigger(tick, previousTick) + } + if (shouldBreak!!) { break } @@ -189,6 +199,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() val seconds = Duration.between( currentTime, currentTime.truncatedTo(ChronoUnit.HOURS).plusHours(1) ).toMillis() + // withTimeoutOrNull 内部已处理 seconds 小于 0 的情况 withTimeoutOrNull(seconds) { state.first { it == WheelState.ACTIVE } } @@ -198,35 +209,42 @@ class ActionScheduler : AgentRunningSubModule, Void>() timeWheelScope.launch { while (isActive) { // 判断是否该步入下一小时 - val currentTime = checkTimeAndLoad() + var shouldWait: Boolean? = null + var currentTime: ZonedDateTime? = null + checkThenExecute { + currentTime = it + shouldWait = actionsGroupByHour[recordHour].isEmpty() + } // 如果该时无任务则等待,插入事件可提前唤醒 - if (actionsGroupByHour[recordHour].isEmpty()) { + if (shouldWait!!) { // 计算距离下一小时的时间,等待 - wait(currentTime) + currentTime?.let { wait(it) } continue } // 唤醒进行时间轮循环 - wheel() + wheel(currentTime!!) } } } - suspend fun checkTimeAndLoad(): ZonedDateTime { - val currentTime = ZonedDateTime.now() - val currentDay = currentTime.dayOfMonth - if (currentDay != recordDay) { - recordDay = currentDay - recordHour = currentTime.hour - loadDayActions() - loadHourActions() - } else if (currentTime.hour != recordHour) { - recordHour = currentTime.hour - loadHourActions() + suspend fun checkThenExecute(then: (currentTime: ZonedDateTime) -> Unit) = + wheelActionsLock.withLock { + val currentTime = ZonedDateTime.now() + val currentDay = currentTime.dayOfMonth + if (currentDay != recordDay) { + recordDay = currentDay + recordHour = currentTime.hour + loadDayActions() + loadHourActions() + } else if (currentTime.hour != recordHour) { + recordHour = currentTime.hour + loadHourActions() + } + + then(currentTime) } - return currentTime - } fun loadActions( load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit, @@ -259,7 +277,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - suspend fun loadHourActions() { + fun loadHourActions() { val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second wheel[secondsTime].add(actionData) @@ -275,12 +293,10 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - wheelActionsLock.withLock { - loadActions(load, invalid, repair) - } + loadActions(load, invalid, repair) } - suspend fun loadDayActions() { + fun loadDayActions() { val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData -> actionsGroupByHour[latestExecutingTime.hour].add(actionData) } @@ -295,10 +311,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - wheelActionsLock.withLock { - loadActions(load, invalid, repair) - } - + loadActions(load, invalid, repair) } private fun parseToZonedDateTime(