refactor(ActionExecutor): enforce action timeout with cancellation and move timeout defaults into Action base model

This commit is contained in:
2026-03-07 20:15:41 +08:00
parent d905c4ace1
commit 25ddc6f181
2 changed files with 22 additions and 6 deletions

View File

@@ -27,6 +27,11 @@ sealed class Action {
*/
abstract val description: String
abstract val timeout: Duration
val timeoutMills: Long
get() = timeout.inWholeMilliseconds
/**
* 行动状态
*/
@@ -66,6 +71,7 @@ sealed interface Schedulable {
val scheduleContent: String
val uuid: String
val timeout: Duration
val timeoutMills: Long
var enabled: Boolean
enum class ScheduleType {
@@ -105,6 +111,7 @@ sealed class ExecutableAction : Action() {
*/
val additionalContext: MutableMap<Int, MutableList<String>> = mutableMapOf()
override val timeout: Duration = 10.minutes
}
/**
@@ -118,7 +125,6 @@ data class SchedulableExecutableAction @JvmOverloads constructor(
override val source: String,
override val scheduleType: Schedulable.ScheduleType,
override val scheduleContent: String,
override val timeout: Duration = 10.minutes
) : ExecutableAction(), Schedulable {
override var enabled = true
@@ -157,7 +163,7 @@ data class ImmediateExecutableAction(
) : ExecutableAction()
/**
* 用于计时的一次性触发或者针对某一数据源进行内容更新的行动
* 用于计时的一次性或周期性触发或者针对某一数据源进行内容更新的行动
*/
data class StateAction @JvmOverloads constructor(
override val source: String,

View File

@@ -16,9 +16,7 @@ import work.slhaf.partner.module.modules.action.scheduler.ActionScheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +48,19 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
}
public void execute(Action action) {
virtualExecutor.execute(actionExecutionRouter(action));
Future<?> future = virtualExecutor.submit(actionExecutionRouter(action));
virtualExecutor.execute(() -> {
try {
future.get(action.getTimeoutMills(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
action.setStatus(Action.Status.FAILED);
log.warn("Action timeout, uuid: {}", action.getUuid());
} catch (Exception ignored) {
}
});
}
private Runnable actionExecutionRouter(Action action) {