fix(ActionScheduler): make scheduling thread-safe with Mutex and cancel scheduler/time wheel scopes on shutdown

This commit is contained in:
2026-02-08 23:07:03 +08:00
parent 7cb565fd1b
commit 882ec43f2b

View File

@@ -8,6 +8,8 @@ import com.cronutils.parser.CronParser
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.slf4j.LoggerFactory
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule
@@ -19,6 +21,7 @@ import work.slhaf.partner.core.action.entity.ActionData
import work.slhaf.partner.core.action.entity.ScheduledActionData
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput
import java.io.Closeable
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.stream.Collectors
@@ -35,6 +38,9 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
private lateinit var timeWheel: TimeWheel
private val schedulerScope =
CoroutineScope(Dispatchers.Default + SupervisorJob() + CoroutineName("ActionScheduler"))
companion object {
private val log = LoggerFactory.getLogger(ActionScheduler::class.java)
}
@@ -49,13 +55,24 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
timeWheel = TimeWheel(actions) { actionDataSet ->
actionExecutor.execute(ActionExecutorInput(actionDataSet))
}
setupShutdownHook()
}
private fun setupShutdownHook() {
Runtime.getRuntime().addShutdownHook(Thread {
timeWheel.close()
schedulerScope.cancel()
})
}
override fun execute(scheduledActionDataSet: Set<ScheduledActionData>?): Void? {
scheduledActionDataSet?.run {
for (scheduledActionData in scheduledActionDataSet) {
actionCapability.putAction(scheduledActionData)
timeWheel.schedule(scheduledActionData)
schedulerScope.launch {
scheduledActionDataSet?.run {
for (scheduledActionData in scheduledActionDataSet) {
actionCapability.putAction(scheduledActionData)
timeWheel.schedule(scheduledActionData)
}
}
}
return null
@@ -64,13 +81,16 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
private class TimeWheel(
val primaryActions: Set<ScheduledActionData>,
val onTrigger: (Set<ScheduledActionData>) -> Unit
) {
) : Closeable {
private val actionsGroupByHour = Array<MutableSet<ScheduledActionData>>(24) { mutableSetOf() }
private val wheel = Array<MutableSet<ScheduledActionData>>(60 * 60) { mutableSetOf() }
private var currentHour: Int = 0
private val state = MutableStateFlow(WheelState.SLEEPING)
private val wheelActionsLock = Mutex()
private val timeWheelScope = CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("TimeWheel"))
private val cronDefinition: CronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)
private val cronParser: CronParser = CronParser(cronDefinition)
@@ -84,38 +104,42 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
launchWheel()
}
fun schedule(actionData: ScheduledActionData) {
suspend fun schedule(actionData: ScheduledActionData) {
if (actionData.status != ActionData.ActionStatus.PREPARE) {
return
}
val parseToZonedDateTime = parseToZonedDateTime(
actionData.scheduleType,
actionData.scheduleContent,
ZonedDateTime.now()
) ?: run {
logFailedStatus(actionData)
return
}
wheelActionsLock.withLock {
val parseToZonedDateTime = parseToZonedDateTime(
actionData.scheduleType,
actionData.scheduleContent,
ZonedDateTime.now()
) ?: run {
logFailedStatus(actionData)
return
}
val hour = parseToZonedDateTime.hour
actionsGroupByHour[hour].add(actionData)
if (currentHour == hour) {
state.value = WheelState.ACTIVE
val hour = parseToZonedDateTime.hour
actionsGroupByHour[hour].add(actionData)
if (currentHour == hour) {
state.value = WheelState.ACTIVE
}
}
}
private fun launchWheel() {
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("TimeWheel"))
fun tickOnTrigger(tick: Int, previousTick: Int) {
suspend fun tickOnTrigger(tick: Int, previousTick: Int) {
if (tick > previousTick) {
val toTrigger = linkedSetOf<ScheduledActionData>()
for (i in (previousTick + 1)..tick) {
val bucket = wheel[i]
if (bucket.isNotEmpty()) {
toTrigger.addAll(bucket)
actionsGroupByHour[currentHour].removeAll(bucket)
bucket.clear() // 避免重复触发
wheelActionsLock.withLock {
for (i in (previousTick + 1)..tick) {
val bucket = wheel[i]
if (bucket.isNotEmpty()) {
toTrigger.addAll(bucket)
actionsGroupByHour[currentHour].removeAll(bucket)
bucket.clear() // 避免重复触发
}
}
}
@@ -125,7 +149,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
}
scope.launch {
timeWheelScope.launch {
while (isActive) {
// 判断是否该步入下一小时
val actionsLoadingTime = loadHourActions()
@@ -178,7 +202,7 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
}
private fun loadHourActions(): ZonedDateTime {
private suspend fun loadHourActions(): ZonedDateTime {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(actionData)
@@ -194,7 +218,9 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
}
}
return loadActions(load, invalid, repair)
wheelActionsLock.withLock {
return loadActions(load, invalid, repair)
}
}
private fun loadDayActions() {
@@ -292,6 +318,10 @@ class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>()
)
}
override fun close() {
timeWheelScope.cancel()
}
private enum class WheelState {
ACTIVE,
SLEEPING,