mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
refactor(ActionScheduler): add timeout attribute to Schedulable, and support reschedule Schedulable content when it's fished in ActionScheduler
This commit is contained in:
@@ -3,6 +3,8 @@ package work.slhaf.partner.core.action.entity
|
||||
import work.slhaf.partner.module.modules.action.executor.entity.HistoryAction
|
||||
import java.time.ZonedDateTime
|
||||
import java.util.*
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
|
||||
sealed class Action {
|
||||
/**
|
||||
@@ -63,6 +65,7 @@ sealed interface Schedulable {
|
||||
val scheduleType: ScheduleType
|
||||
val scheduleContent: String
|
||||
val uuid: String
|
||||
val timeout: Duration
|
||||
var enabled: Boolean
|
||||
|
||||
enum class ScheduleType {
|
||||
@@ -114,7 +117,8 @@ data class SchedulableExecutableAction(
|
||||
override val description: String,
|
||||
override val source: String,
|
||||
override val scheduleType: Schedulable.ScheduleType,
|
||||
override val scheduleContent: String
|
||||
override val scheduleContent: String,
|
||||
override val timeout: Duration = 10.minutes
|
||||
) : ExecutableAction(), Schedulable {
|
||||
|
||||
override var enabled = true
|
||||
@@ -132,8 +136,6 @@ data class SchedulableExecutableAction(
|
||||
action.result.reset()
|
||||
}
|
||||
}
|
||||
|
||||
status = Status.PREPARE
|
||||
}
|
||||
|
||||
data class ScheduleHistory(
|
||||
@@ -166,6 +168,7 @@ data class StateAction(
|
||||
override val scheduleContent: String,
|
||||
|
||||
override var enabled: Boolean = true,
|
||||
override val timeout: Duration = 5.minutes,
|
||||
|
||||
val trigger: Trigger
|
||||
) : Action(), Schedulable {
|
||||
|
||||
@@ -61,8 +61,12 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
||||
case StateAction stateAction -> handleStateAction(stateAction);
|
||||
default -> handleUnknownAction(action);
|
||||
}
|
||||
if (action.getStatus() == Action.Status.FAILED) {
|
||||
return;
|
||||
}
|
||||
action.setStatus(Action.Status.SUCCESS);
|
||||
} catch (Exception e) {
|
||||
log.warn("Action execute failure, uuid: {}, description: {}, failure reason: {}", action.getUuid(), action.getDescription(), e.getLocalizedMessage());
|
||||
log.warn("Unexpected action execution failure, uuid: {}, description: {}, failure reason: {}", action.getUuid(), action.getDescription(), e.getLocalizedMessage());
|
||||
action.setStatus(Action.Status.FAILED);
|
||||
}
|
||||
};
|
||||
@@ -155,16 +159,11 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
||||
} while (stageCursor.next());
|
||||
// 结束
|
||||
actionCapability.removePhaserRecord(phaser);
|
||||
if (executableAction.getStatus() != Action.Status.FAILED) {
|
||||
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
|
||||
if (executableAction instanceof SchedulableExecutableAction scheduledActionData) {
|
||||
scheduledActionData.recordAndReset();
|
||||
actionScheduler.schedule(scheduledActionData);
|
||||
} else {
|
||||
executableAction.setStatus(Action.Status.SUCCESS);
|
||||
}
|
||||
// TODO 执行过后需要回写至任务上下文(recentCompletedTask),同时触发自对话信号进行确认并记录以及是否通知用户(触发与否需要机制进行匹配,在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等)
|
||||
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
|
||||
if (executableAction instanceof SchedulableExecutableAction scheduledActionData) {
|
||||
scheduledActionData.recordAndReset();
|
||||
}
|
||||
// TODO 执行过后需要回写至任务上下文(recentCompletedTask),同时触发自对话信号进行确认并记录以及是否通知用户(触发与否需要机制进行匹配,在模块链路可增加 interaction gate 门控,判断此次对话作用于谁、由谁发出、何种性质、是否需要回应等)
|
||||
}
|
||||
|
||||
private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, PhaserRecord phaserRecord, String source) {
|
||||
|
||||
@@ -26,6 +26,7 @@ import java.time.ZonedDateTime
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.stream.Collectors
|
||||
import kotlin.jvm.optionals.getOrNull
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
|
||||
class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
@InjectCapability
|
||||
@@ -55,7 +56,13 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
schedulableSet.filterIsInstance<Action>()
|
||||
.forEach { actionExecutor.execute(it) }
|
||||
}
|
||||
timeWheel = TimeWheel(listScheduledActions, onTrigger)
|
||||
val doneCondition: (Schedulable) -> Boolean = { schedulable ->
|
||||
if (schedulable is Action) {
|
||||
schedulable.status == Action.Status.FAILED || schedulable.status == Action.Status.SUCCESS
|
||||
}
|
||||
true
|
||||
}
|
||||
timeWheel = TimeWheel(listScheduledActions, onTrigger, doneCondition)
|
||||
}
|
||||
loadScheduledActions()
|
||||
setupShutdownHook()
|
||||
@@ -83,7 +90,8 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
|
||||
private class TimeWheel(
|
||||
val listSource: () -> Set<Schedulable>,
|
||||
val onTrigger: (toTrigger: Set<Schedulable>) -> Unit
|
||||
val onTrigger: (toTrigger: Set<Schedulable>) -> Unit,
|
||||
val doneCondition: (schedulable: Schedulable) -> Boolean
|
||||
) : Closeable {
|
||||
private val schedulableGroupByHour = Array<MutableSet<Schedulable>>(24) { mutableSetOf() }
|
||||
private val wheel = Array<MutableSet<Schedulable>>(60 * 60) { mutableSetOf() }
|
||||
@@ -146,6 +154,29 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
return null
|
||||
}
|
||||
|
||||
fun handleToTrigger(toTrigger: Set<Schedulable>) {
|
||||
timeWheelScope.launch {
|
||||
onTrigger(toTrigger)
|
||||
}
|
||||
for (schedulable in toTrigger) timeWheelScope.launch {
|
||||
if (schedulable.scheduleType == Schedulable.ScheduleType.ONCE) {
|
||||
return@launch
|
||||
}
|
||||
|
||||
withTimeoutOrNull(schedulable.timeout) {
|
||||
while (!doneCondition(schedulable)) {
|
||||
delay(50.milliseconds)
|
||||
}
|
||||
}
|
||||
|
||||
if (!schedulable.enabled) {
|
||||
return@launch
|
||||
}
|
||||
|
||||
this@TimeWheel.schedule(schedulable)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime, primaryTickAdvanceTime: Long) {
|
||||
val launchingHour = launchingTime.hour
|
||||
var tick = launchingTime.minute * 60 + launchingTime.second
|
||||
@@ -183,11 +214,7 @@ class ActionScheduler : AbstractAgentModule.Standalone() {
|
||||
}
|
||||
WheelStepResult(toTrigger, shouldBreak)
|
||||
}
|
||||
stepResult.toTrigger?.let { trigger ->
|
||||
timeWheelScope.launch {
|
||||
onTrigger(trigger)
|
||||
}
|
||||
}
|
||||
stepResult.toTrigger?.let { handleToTrigger(it) }
|
||||
if (stepResult.shouldBreak) {
|
||||
log.debug("Wheel stopped at tick {}", tick)
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user