diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt index 4312bafa..7562b35f 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt @@ -124,7 +124,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() val hour = parseToZonedDateTime.hour actionsGroupByHour[hour].add(actionData) - if (recordHour == hour) { + if (it.hour == hour) { state.value = WheelState.ACTIVE } @@ -133,22 +133,20 @@ class ActionScheduler : AgentRunningSubModule, Void>() private fun launchWheel() { - fun tickOnTrigger(tick: Int, previousTick: Int) { + fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set? { if (tick > previousTick) { - val toTrigger = linkedSetOf() + val toTrigger = mutableSetOf() for (i in (previousTick + 1)..tick) { val bucket = wheel[i] if (bucket.isNotEmpty()) { toTrigger.addAll(bucket) - actionsGroupByHour[recordHour].removeAll(bucket) + actionsGroupByHour[triggerHour].removeAll(bucket) bucket.clear() // 避免重复触发 } } - - if (toTrigger.isNotEmpty()) { - onTrigger(toTrigger) - } + return toTrigger } + return null } suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime) { @@ -170,22 +168,27 @@ class ActionScheduler : AgentRunningSubModule, Void>() lastTickAdvanceTime = current var shouldBreak: Boolean? = null + var toTrigger: Set? = null checkThenExecute { - if (recordHour != launchingHour) { - // recordHour 在外部受到更新,不等于启动时的 hour,则需要停止 + if (it.hour != launchingHour) { + // recordHout 已更新,此时后执行无意义,不等于启动时的 hour,则需要停止 shouldBreak = true return@checkThenExecute } + + toTrigger = collectToTrigger(tick, previousTick, it.hour) + // 推进到顶时停止循环、当前时无任务时停止循环 - if (tick >= wheel.lastIndex || actionsGroupByHour[recordHour].isEmpty()) { + if (tick >= wheel.lastIndex || actionsGroupByHour[it.hour].isEmpty()) { + state.value = WheelState.SLEEPING shouldBreak = true return@checkThenExecute } // 取当前 tick、推进过程中经过的 tick 对应任务,异步启动 - tickOnTrigger(tick, previousTick) } + toTrigger?.let { onTrigger(it) } if (shouldBreak!!) { break } @@ -203,7 +206,6 @@ class ActionScheduler : AgentRunningSubModule, Void>() withTimeoutOrNull(seconds) { state.first { it == WheelState.ACTIVE } } - state.value = WheelState.SLEEPING } timeWheelScope.launch { @@ -213,7 +215,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() var currentTime: ZonedDateTime? = null checkThenExecute { currentTime = it - shouldWait = actionsGroupByHour[recordHour].isEmpty() + shouldWait = actionsGroupByHour[it.hour].isEmpty() } // 如果该时无任务则等待,插入事件可提前唤醒 @@ -229,89 +231,74 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - suspend fun checkThenExecute(then: (currentTime: ZonedDateTime) -> Unit) = - wheelActionsLock.withLock { - val currentTime = ZonedDateTime.now() - val currentDay = currentTime.dayOfMonth - if (currentDay != recordDay) { - recordDay = currentDay - recordHour = currentTime.hour - loadDayActions() - loadHourActions() - } else if (currentTime.hour != recordHour) { - recordHour = currentTime.hour - loadHourActions() + suspend fun checkThenExecute(then: (currentTime: ZonedDateTime) -> Unit) = wheelActionsLock.withLock { + fun loadActions( + now: ZonedDateTime, + load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit, + repair: () -> Unit + ) { + val runLoading = { + for (actionData in listScheduledActions()) { + val latestExecutingTime = + parseToZonedDateTime( + actionData.scheduleType, + actionData.scheduleContent, + now + ) ?: run { + logFailedStatus(actionData) + continue + } + + load(latestExecutingTime, actionData) + } } - then(currentTime) - } - - fun loadActions( - load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit, - invalid: (before: ZonedDateTime, after: ZonedDateTime) -> Boolean, - repair: () -> Unit - ) { - val runLoading = { - val now = ZonedDateTime.now() - for (actionData in listScheduledActions()) { - val latestExecutingTime = - parseToZonedDateTime( - actionData.scheduleType, - actionData.scheduleContent, - now - ) ?: run { - logFailedStatus(actionData) - continue - } - - load(latestExecutingTime, actionData) - } - } - - val before = ZonedDateTime.now() - runLoading() - val after = ZonedDateTime.now() - if (invalid(before, after)) { repair() runLoading() } - } - fun loadHourActions() { - val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> - val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second - wheel[secondsTime].add(actionData) - } - - val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after -> - before.hour != after.hour - } - - val repair: () -> Unit = { - for (set in wheel) { - set.clear() + fun loadHourActions(currentTime: ZonedDateTime) { + val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> + val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second + wheel[secondsTime].add(actionData) } - } - loadActions(load, invalid, repair) - } - - fun loadDayActions() { - val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData -> - actionsGroupByHour[latestExecutingTime.hour].add(actionData) - } - - val invalid: (ZonedDateTime, ZonedDateTime) -> Boolean = { before, after -> - before.dayOfMonth != after.dayOfMonth - } - - val repair: () -> Unit = { - for (set in actionsGroupByHour) { - set.clear() + val repair: () -> Unit = { + for (set in wheel) { + set.clear() + } } + + loadActions(currentTime, load, repair) } - loadActions(load, invalid, repair) + fun loadDayActions(currentTime: ZonedDateTime) { + val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData -> + actionsGroupByHour[latestExecutingTime.hour].add(actionData) + } + + val repair: () -> Unit = { + for (set in actionsGroupByHour) { + set.clear() + } + } + + loadActions(currentTime, load, repair) + } + + val currentTime = ZonedDateTime.now() + val currentDay = currentTime.dayOfMonth + if (currentDay != recordDay) { + recordDay = currentDay + recordHour = currentTime.hour + loadDayActions(currentTime) + loadHourActions(currentTime) + } else if (currentTime.hour != recordHour) { + recordHour = currentTime.hour + loadHourActions(currentTime) + } + + then(currentTime) } private fun parseToZonedDateTime(