mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
fix(action): stabilize scheduler wheel lifecycle
This commit is contained in:
@@ -15,12 +15,14 @@ import work.slhaf.partner.core.action.ActionCapability
|
||||
import work.slhaf.partner.core.action.entity.Action
|
||||
import work.slhaf.partner.core.action.entity.Schedulable
|
||||
import work.slhaf.partner.core.action.entity.SchedulableExecutableAction
|
||||
import work.slhaf.partner.framework.agent.config.ConfigCenter
|
||||
import work.slhaf.partner.framework.agent.factory.capability.annotation.InjectCapability
|
||||
import work.slhaf.partner.framework.agent.factory.component.abstracts.AbstractAgentModule
|
||||
import work.slhaf.partner.framework.agent.factory.component.annotation.Init
|
||||
import work.slhaf.partner.framework.agent.factory.component.annotation.InjectModule
|
||||
import work.slhaf.partner.module.action.executor.ActionExecutor
|
||||
import java.io.Closeable
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
import java.time.ZonedDateTime
|
||||
import java.time.temporal.ChronoUnit
|
||||
@@ -70,8 +72,9 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
val doneCondition: (Schedulable) -> Boolean = { schedulable ->
|
||||
if (schedulable is Action) {
|
||||
schedulable.status == Action.Status.FAILED || schedulable.status == Action.Status.SUCCESS
|
||||
} else {
|
||||
true
|
||||
}
|
||||
true
|
||||
}
|
||||
timeWheel = TimeWheel(listScheduledActions, onTrigger, doneCondition)
|
||||
}
|
||||
@@ -126,6 +129,12 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
val onTrigger: (toTrigger: Set<Schedulable>) -> Unit,
|
||||
val doneCondition: (schedulable: Schedulable) -> Boolean
|
||||
) : Closeable {
|
||||
private val tracePath: Path = ConfigCenter.paths.stateDir
|
||||
.resolve("trace")
|
||||
.resolve("time-wheel")
|
||||
.normalize()
|
||||
.toAbsolutePath()
|
||||
|
||||
private val schedulableGroupByHour = Array<MutableSet<Schedulable>>(24) { mutableSetOf() }
|
||||
private val wheel = Array<MutableSet<Schedulable>>(60 * 60) { mutableSetOf() }
|
||||
private var recordHour: Int = -1
|
||||
@@ -151,15 +160,14 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
logFailedStatus(schedulableData)
|
||||
return@checkThenExecute
|
||||
}
|
||||
log.debug("Action next execution time: {}", parseToZonedDateTime)
|
||||
log.debug("${schedulableData.uuid} next execution time: {}", parseToZonedDateTime)
|
||||
val hour = parseToZonedDateTime.hour
|
||||
schedulableGroupByHour[hour].add(schedulableData)
|
||||
log.debug("Action scheduled at {}", hour)
|
||||
if (it.hour == hour) {
|
||||
val wheelOffset = parseToZonedDateTime.minute * 60 + parseToZonedDateTime.second
|
||||
wheel[wheelOffset].add(schedulableData)
|
||||
state.value = WheelState.ACTIVE
|
||||
log.debug("Action scheduled at wheel offset {}", wheelOffset)
|
||||
log.debug("${schedulableData.uuid} scheduled at wheel offset {}", wheelOffset)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -293,7 +301,7 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
launchingHour,
|
||||
includeRemainingHour = tick >= wheel.lastIndex
|
||||
)
|
||||
if (tick >= wheel.lastIndex || schedulableGroupByHour[launchingHour].isEmpty()) {
|
||||
if (tick >= wheel.lastIndex) {
|
||||
state.value = WheelState.SLEEPING
|
||||
shouldBreak = true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user