refactor(Action): generalize ActionScheduler to Schedulable and add StateAction trigger execution path

This commit is contained in:
2026-02-18 15:20:52 +08:00
parent a1bc784da5
commit 11ea1045f4
2 changed files with 115 additions and 92 deletions

View File

@@ -31,6 +31,7 @@ sealed interface Schedulable {
val scheduleType: ScheduleType
val scheduleContent: String
val uuid: String
enum class ScheduleType {
CYCLE,

View File

@@ -17,9 +17,10 @@ import work.slhaf.partner.api.agent.factory.module.annotation.Init
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule
import work.slhaf.partner.core.action.ActionCapability
import work.slhaf.partner.core.action.entity.ExecutableAction
import work.slhaf.partner.core.action.ActionCore
import work.slhaf.partner.core.action.entity.Schedulable
import work.slhaf.partner.core.action.entity.SchedulableExecutableAction
import work.slhaf.partner.core.action.entity.StateAction
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput
import java.io.Closeable
@@ -30,7 +31,7 @@ import java.util.stream.Collectors
import kotlin.jvm.optionals.getOrNull
@AgentSubModule
class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>, Void>() {
class ActionScheduler : AgentRunningSubModule<Set<Schedulable>, Void>() {
@InjectCapability
private lateinit var actionCapability: ActionCapability
@@ -49,17 +50,34 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
@Init
fun init() {
val listScheduledActions: () -> Set<SchedulableExecutableAction> = {
actionCapability.listActions(null, null)
.stream()
.filter { it is SchedulableExecutableAction }
.map { it as SchedulableExecutableAction }
.collect(Collectors.toSet())
fun loadScheduledActions() {
val listScheduledActions: () -> Set<SchedulableExecutableAction> = {
actionCapability.listActions(null, null)
.stream()
.filter { it is SchedulableExecutableAction }
.map { it as SchedulableExecutableAction }
.collect(Collectors.toSet())
}
// TODO 3. 重构 trigger 内容,在替换为 Set<Schedulable> 后,需要进行类型判定,确认是自行执行,还是交给 actionExecutor
val onTrigger: (Set<Schedulable>) -> Unit = { schedulableSet ->
val executableActions = mutableSetOf<SchedulableExecutableAction>()
val stateActions = mutableSetOf<StateAction>()
for (schedulable in schedulableSet) {
when (schedulable) {
is SchedulableExecutableAction -> executableActions.add(schedulable)
is StateAction -> stateActions.add(schedulable)
}
}
actionExecutor.execute(ActionExecutorInput(executableActions))
actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)
.execute { stateActions.forEach { it.trigger.onTrigger() } }
}
timeWheel = TimeWheel(listScheduledActions, onTrigger)
}
val onTrigger: (Set<SchedulableExecutableAction>) -> Unit = { actionExecutor.execute(ActionExecutorInput(it)) }
timeWheel = TimeWheel(listScheduledActions, onTrigger)
loadScheduledActions()
setupShutdownHook()
}
@@ -71,27 +89,30 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
})
}
// TODO 如果要将 TimeWheel 作为 Agent 内部的循环周期,那么不依赖 Action 链路的内容,将不适合参与到 ActionExecutor因此需要将 ActionData 的触发类型进行分类SILENT TRIGGER仅限更新 ActionData 内部状态,通过属性 copy 完成不开放过多权限防止序列化失败、EXECUTOR、AGENT TURN。考虑将时间轮下放至 ActionCapability作为底层行动语义的一部分
override fun execute(scheduledActionDataSet: Set<SchedulableExecutableAction>?): Void? {
override fun execute(schedulableSet: Set<Schedulable>?): Void? {
// TODO 1. 将输入参数重构为 Set<Schedulable>,在 for 循环中依据计划字段放入时间轮
schedulerScope.launch {
scheduledActionDataSet?.run {
for (scheduledActionData in scheduledActionDataSet) {
log.debug("New action to schedule: {}", scheduledActionData)
actionCapability.putAction(scheduledActionData)
timeWheel.schedule(scheduledActionData)
schedulableSet?.run {
for (schedulableData in schedulableSet) {
log.debug("New data to schedule: {}", schedulableData)
timeWheel.schedule(schedulableData)
if (schedulableData is SchedulableExecutableAction) {
actionCapability.putAction(schedulableData)
}
}
}
}
return null
}
// TODO 2. 重构为 Set<Schedulable>
private class TimeWheel(
val listScheduledActions: () -> Set<SchedulableExecutableAction>,
val onTrigger: (toTrigger: Set<SchedulableExecutableAction>) -> Unit
val listSource: () -> Set<Schedulable>,
val onTrigger: (toTrigger: Set<Schedulable>) -> Unit
) : Closeable {
private val actionsGroupByHour = Array<MutableSet<SchedulableExecutableAction>>(24) { mutableSetOf() }
private val wheel = Array<MutableSet<SchedulableExecutableAction>>(60 * 60) { mutableSetOf() }
private val schedulableGroupByHour = Array<MutableSet<Schedulable>>(24) { mutableSetOf() }
private val wheel = Array<MutableSet<Schedulable>>(60 * 60) { mutableSetOf() }
private var recordHour: Int = -1
private var recordDay: Int = -1
private val state = MutableStateFlow(WheelState.SLEEPING)
@@ -102,37 +123,30 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
private val cronDefinition: CronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)
private val cronParser: CronParser = CronParser(cronDefinition)
/**
* 根据 primaryActions 建立时间轮,并只加载当天任务,同时启动 tick 线程
*/
init {
// 启动时间轮
launchWheel()
wheel()
}
suspend fun schedule(actionData: SchedulableExecutableAction) {
if (actionData.status != ExecutableAction.Status.PREPARE) {
return
}
suspend fun schedule(schedulableData: Schedulable) {
checkThenExecute {
val parseToZonedDateTime = parseToZonedDateTime(
actionData.scheduleType,
actionData.scheduleContent,
schedulableData.scheduleType,
schedulableData.scheduleContent,
it
) ?: run {
logFailedStatus(actionData)
logFailedStatus(schedulableData)
return@checkThenExecute
}
log.debug("Action next execution time: {}", parseToZonedDateTime)
val hour = parseToZonedDateTime.hour
actionsGroupByHour[hour].add(actionData)
schedulableGroupByHour[hour].add(schedulableData)
log.debug("Action scheduled at {}", hour)
if (it.hour == hour) {
val wheelOffset = parseToZonedDateTime.minute * 60 + parseToZonedDateTime.second
wheel[wheelOffset].add(actionData)
wheel[wheelOffset].add(schedulableData)
state.value = WheelState.ACTIVE
log.debug("Action scheduled at wheel offset {}", wheelOffset)
}
@@ -140,17 +154,22 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
}
}
private fun launchWheel() {
private fun wheel() {
fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set<SchedulableExecutableAction>? {
data class WheelStepResult(
val toTrigger: Set<Schedulable>?,
val shouldBreak: Boolean
)
fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set<Schedulable>? {
if (tick > previousTick) {
val toTrigger = mutableSetOf<SchedulableExecutableAction>()
val toTrigger = mutableSetOf<Schedulable>()
for (i in previousTick..tick) {
val bucket = wheel[i]
if (bucket.isNotEmpty()) {
toTrigger.addAll(bucket)
val bucketUuids = bucket.asSequence().map { it.uuid }.toHashSet()
actionsGroupByHour[triggerHour].removeIf { it.uuid in bucketUuids }
schedulableGroupByHour[triggerHour].removeIf { it.uuid in bucketUuids }
bucket.clear() // 避免重复触发
}
}
@@ -178,37 +197,42 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
// 2) 推进节拍器:按“理论秒”前进 step 次
nextTickNanos += step.toLong() * 1_000_000_000L
var shouldBreak = false
var toTrigger: Set<SchedulableExecutableAction>? = null
val stepResult = run {
var shouldBreak = false
var toTrigger: Set<Schedulable>? = null
checkThenExecute(false) {
if (it.hour != launchingHour) {
shouldBreak = true
toTrigger = collectToTrigger(wheel.lastIndex, previousTick, launchingHour)
log.debug(
"Hour changed, previousTick: {}, tick: {}, toTriggerSize: {}",
previousTick,
tick,
toTrigger?.size
)
return@checkThenExecute
}
toTrigger = collectToTrigger(tick, previousTick, launchingHour)
if (tick >= wheel.lastIndex || schedulableGroupByHour[launchingHour].isEmpty()) {
state.value = WheelState.SLEEPING
shouldBreak = true
}
checkThenExecute(false) {
if (it.hour != launchingHour) {
shouldBreak = true
toTrigger = collectToTrigger(wheel.lastIndex, previousTick, launchingHour)
log.debug(
"Hour changed, previousTick: {}, tick: {}, toTriggerSize: {}",
previousTick,
tick,
toTrigger?.size
)
return@checkThenExecute
}
toTrigger = collectToTrigger(tick, previousTick, launchingHour)
WheelStepResult(toTrigger, shouldBreak)
}
if (tick >= wheel.lastIndex || actionsGroupByHour[launchingHour].isEmpty()) {
state.value = WheelState.SLEEPING
shouldBreak = true
return@checkThenExecute
stepResult.toTrigger?.let { trigger ->
timeWheelScope.launch {
onTrigger(trigger)
}
}
toTrigger?.takeIf { it.isNotEmpty() }?.let {
onTrigger(it)
log.debug("Executing action at hour {} tick {}", launchingHour, tick)
}
if (shouldBreak) {
if (stepResult.shouldBreak) {
log.debug("Wheel stopped at tick {}", tick)
break
}
@@ -243,7 +267,7 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
var primaryTickAdvanceTime: Long? = null
checkThenExecute {
currentTime = it
shouldWait = actionsGroupByHour[it.hour].isEmpty()
shouldWait = schedulableGroupByHour[it.hour].isEmpty()
// 由于 wheel 的启动时间可能存在延迟,而时内推进由 nanoTime 保证不会漏发,
// 正常的时序结束又由 tick 是否触顶、当前时是否存在额外任务触发,
// 而启动时无触发保障,此时一并初始化 tick 推进时间,足以应对 check 与 wheel 间的这段时间间隔
@@ -266,24 +290,24 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
suspend fun checkThenExecute(finallyToExecute: Boolean = true, then: (currentTime: ZonedDateTime) -> Unit) =
wheelActionsLock.withLock {
fun loadActions(
source: Set<SchedulableExecutableAction>,
source: Set<Schedulable>,
now: ZonedDateTime,
load: (latestExecutingTime: ZonedDateTime, actionData: SchedulableExecutableAction) -> Unit,
load: (latestExecutingTime: ZonedDateTime, schedulableData: Schedulable) -> Unit,
repair: () -> Unit
) {
val runLoading = {
for (actionData in source) {
for (schedulableData in source) {
val nextExecutingTime =
parseToZonedDateTime(
actionData.scheduleType,
actionData.scheduleContent,
schedulableData.scheduleType,
schedulableData.scheduleContent,
now
) ?: run {
logFailedStatus(actionData)
logFailedStatus(schedulableData)
continue
}
load(nextExecutingTime, actionData)
load(nextExecutingTime, schedulableData)
}
}
@@ -292,12 +316,12 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
}
fun loadHourActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, SchedulableExecutableAction) -> Unit =
{ latestExecutionTime, actionData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(actionData)
log.debug("Action loaded to hour: {}", actionData)
}
val load: (ZonedDateTime, Schedulable) -> Unit =
{ latestExecutionTime, schedulableData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(schedulableData)
log.debug("Action loaded to hour: {}", schedulableData)
}
val repair: () -> Unit = {
for (set in wheel) {
@@ -305,23 +329,23 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
}
}
loadActions(actionsGroupByHour[currentTime.hour], currentTime, load, repair)
loadActions(schedulableGroupByHour[currentTime.hour], currentTime, load, repair)
}
fun loadDayActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, SchedulableExecutableAction) -> Unit =
{ latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
log.debug("Action loaded to day: {}", actionData)
}
val load: (ZonedDateTime, Schedulable) -> Unit =
{ latestExecutingTime, schedulableData ->
schedulableGroupByHour[latestExecutingTime.hour].add(schedulableData)
log.debug("Action loaded to day: {}", schedulableData)
}
val repair: () -> Unit = {
for (set in actionsGroupByHour) {
for (set in schedulableGroupByHour) {
set.clear()
}
}
loadActions(listScheduledActions(), currentTime, load, repair)
loadActions(listSource(), currentTime, load, repair)
}
fun refreshIfNeeded(now: ZonedDateTime) {
@@ -382,13 +406,11 @@ class ActionScheduler : AgentRunningSubModule<Set<SchedulableExecutableAction>,
}
private fun logFailedStatus(actionData: SchedulableExecutableAction) {
private fun logFailedStatus(scheduleData: Schedulable) {
log.warn(
"行动未加载,uuid: {}, source: {}, tendency: {}, scheduleContent: {}",
actionData.uuid,
actionData.source,
actionData.tendency,
actionData.scheduleContent,
"行动未加载,scheduleType: {}, scheduleContent: {}",
scheduleData.scheduleType,
scheduleData.scheduleContent,
)
}