diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt index dd71b924..06d76d12 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/action/dispatcher/scheduler/ActionScheduler.kt @@ -8,6 +8,8 @@ import com.cronutils.parser.CronParser import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.first +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.slf4j.LoggerFactory import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule @@ -19,6 +21,7 @@ import work.slhaf.partner.core.action.entity.ActionData import work.slhaf.partner.core.action.entity.ScheduledActionData import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput +import java.io.Closeable import java.time.ZonedDateTime import java.time.temporal.ChronoUnit import java.util.stream.Collectors @@ -35,6 +38,9 @@ class ActionScheduler : AgentRunningSubModule, Void>() private lateinit var timeWheel: TimeWheel + private val schedulerScope = + CoroutineScope(Dispatchers.Default + SupervisorJob() + CoroutineName("ActionScheduler")) + companion object { private val log = LoggerFactory.getLogger(ActionScheduler::class.java) } @@ -49,13 +55,24 @@ class ActionScheduler : AgentRunningSubModule, Void>() timeWheel = TimeWheel(actions) { actionDataSet -> actionExecutor.execute(ActionExecutorInput(actionDataSet)) } + + setupShutdownHook() + } + + private fun setupShutdownHook() { + Runtime.getRuntime().addShutdownHook(Thread { + timeWheel.close() + schedulerScope.cancel() + }) } override fun execute(scheduledActionDataSet: Set?): Void? { - scheduledActionDataSet?.run { - for (scheduledActionData in scheduledActionDataSet) { - actionCapability.putAction(scheduledActionData) - timeWheel.schedule(scheduledActionData) + schedulerScope.launch { + scheduledActionDataSet?.run { + for (scheduledActionData in scheduledActionDataSet) { + actionCapability.putAction(scheduledActionData) + timeWheel.schedule(scheduledActionData) + } } } return null @@ -64,13 +81,16 @@ class ActionScheduler : AgentRunningSubModule, Void>() private class TimeWheel( val primaryActions: Set, val onTrigger: (Set) -> Unit - ) { + ) : Closeable { private val actionsGroupByHour = Array>(24) { mutableSetOf() } private val wheel = Array>(60 * 60) { mutableSetOf() } private var currentHour: Int = 0 private val state = MutableStateFlow(WheelState.SLEEPING) + private val wheelActionsLock = Mutex() + private val timeWheelScope = CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("TimeWheel")) + private val cronDefinition: CronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ) private val cronParser: CronParser = CronParser(cronDefinition) @@ -84,38 +104,42 @@ class ActionScheduler : AgentRunningSubModule, Void>() launchWheel() } - fun schedule(actionData: ScheduledActionData) { + suspend fun schedule(actionData: ScheduledActionData) { if (actionData.status != ActionData.ActionStatus.PREPARE) { return } - val parseToZonedDateTime = parseToZonedDateTime( - actionData.scheduleType, - actionData.scheduleContent, - ZonedDateTime.now() - ) ?: run { - logFailedStatus(actionData) - return - } + wheelActionsLock.withLock { + val parseToZonedDateTime = parseToZonedDateTime( + actionData.scheduleType, + actionData.scheduleContent, + ZonedDateTime.now() + ) ?: run { + logFailedStatus(actionData) + return + } - val hour = parseToZonedDateTime.hour - actionsGroupByHour[hour].add(actionData) - if (currentHour == hour) { - state.value = WheelState.ACTIVE + val hour = parseToZonedDateTime.hour + actionsGroupByHour[hour].add(actionData) + if (currentHour == hour) { + state.value = WheelState.ACTIVE + } } } private fun launchWheel() { - val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("TimeWheel")) - fun tickOnTrigger(tick: Int, previousTick: Int) { + + suspend fun tickOnTrigger(tick: Int, previousTick: Int) { if (tick > previousTick) { val toTrigger = linkedSetOf() - for (i in (previousTick + 1)..tick) { - val bucket = wheel[i] - if (bucket.isNotEmpty()) { - toTrigger.addAll(bucket) - actionsGroupByHour[currentHour].removeAll(bucket) - bucket.clear() // 避免重复触发 + wheelActionsLock.withLock { + for (i in (previousTick + 1)..tick) { + val bucket = wheel[i] + if (bucket.isNotEmpty()) { + toTrigger.addAll(bucket) + actionsGroupByHour[currentHour].removeAll(bucket) + bucket.clear() // 避免重复触发 + } } } @@ -125,7 +149,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - scope.launch { + timeWheelScope.launch { while (isActive) { // 判断是否该步入下一小时 val actionsLoadingTime = loadHourActions() @@ -178,7 +202,7 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - private fun loadHourActions(): ZonedDateTime { + private suspend fun loadHourActions(): ZonedDateTime { val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData -> val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second wheel[secondsTime].add(actionData) @@ -194,7 +218,9 @@ class ActionScheduler : AgentRunningSubModule, Void>() } } - return loadActions(load, invalid, repair) + wheelActionsLock.withLock { + return loadActions(load, invalid, repair) + } } private fun loadDayActions() { @@ -292,6 +318,10 @@ class ActionScheduler : AgentRunningSubModule, Void>() ) } + override fun close() { + timeWheelScope.cancel() + } + private enum class WheelState { ACTIVE, SLEEPING,