mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 08:43:02 +08:00
feat(scheduler): add cancel method to ActionScheduler
This commit is contained in:
@@ -30,11 +30,13 @@ import kotlin.jvm.optionals.getOrNull
|
|||||||
import kotlin.time.Duration.Companion.milliseconds
|
import kotlin.time.Duration.Companion.milliseconds
|
||||||
|
|
||||||
class ActionScheduler : AbstractAgentModule.Standalone() {
|
class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||||
|
|
||||||
@InjectCapability
|
@InjectCapability
|
||||||
private lateinit var actionCapability: ActionCapability
|
private lateinit var actionCapability: ActionCapability
|
||||||
|
|
||||||
@InjectModule
|
@InjectModule
|
||||||
private lateinit var actionExecutor: ActionExecutor
|
private lateinit var actionExecutor: ActionExecutor
|
||||||
|
|
||||||
private lateinit var timeWheel: TimeWheel
|
private lateinit var timeWheel: TimeWheel
|
||||||
private val runtimeSchedulables: MutableSet<Schedulable> =
|
private val runtimeSchedulables: MutableSet<Schedulable> =
|
||||||
Collections.synchronizedSet(mutableSetOf())
|
Collections.synchronizedSet(mutableSetOf())
|
||||||
@@ -96,6 +98,29 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun cancel(actionId: String): Boolean {
|
||||||
|
val runtimeMatches = synchronized(runtimeSchedulables) {
|
||||||
|
runtimeSchedulables.filter { it.uuid == actionId }.toSet()
|
||||||
|
}
|
||||||
|
val persistedMatches = actionCapability.listActions(null, null)
|
||||||
|
.asSequence()
|
||||||
|
.filterIsInstance<SchedulableExecutableAction>()
|
||||||
|
.filter { it.uuid == actionId }
|
||||||
|
.toSet()
|
||||||
|
|
||||||
|
val matches = LinkedHashSet<Schedulable>()
|
||||||
|
matches.addAll(runtimeMatches)
|
||||||
|
matches.addAll(persistedMatches)
|
||||||
|
matches.forEach { it.enabled = false }
|
||||||
|
|
||||||
|
val removedFromWheel = if (::timeWheel.isInitialized) {
|
||||||
|
runBlocking { timeWheel.cancel(actionId) }
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
return matches.isNotEmpty() || removedFromWheel
|
||||||
|
}
|
||||||
|
|
||||||
private class TimeWheel(
|
private class TimeWheel(
|
||||||
val listSource: () -> Set<Schedulable>,
|
val listSource: () -> Set<Schedulable>,
|
||||||
val onTrigger: (toTrigger: Set<Schedulable>) -> Unit,
|
val onTrigger: (toTrigger: Set<Schedulable>) -> Unit,
|
||||||
@@ -139,6 +164,37 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
suspend fun cancel(actionId: String): Boolean = wheelActionsLock.withLock {
|
||||||
|
var found = false
|
||||||
|
for (bucket in schedulableGroupByHour) {
|
||||||
|
var bucketFound = false
|
||||||
|
bucket.removeIf {
|
||||||
|
if (it.uuid == actionId) {
|
||||||
|
it.enabled = false
|
||||||
|
bucketFound = true
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
found = found || bucketFound
|
||||||
|
}
|
||||||
|
for (bucket in wheel) {
|
||||||
|
var bucketFound = false
|
||||||
|
bucket.removeIf {
|
||||||
|
if (it.uuid == actionId) {
|
||||||
|
it.enabled = false
|
||||||
|
bucketFound = true
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
found = found || bucketFound
|
||||||
|
}
|
||||||
|
found
|
||||||
|
}
|
||||||
|
|
||||||
private fun wheel() {
|
private fun wheel() {
|
||||||
data class WheelStepResult(
|
data class WheelStepResult(
|
||||||
val toTrigger: Set<Schedulable>?,
|
val toTrigger: Set<Schedulable>?,
|
||||||
|
|||||||
Reference in New Issue
Block a user