From 328befecca8ee7d8a30896227731c5f0f34fb1e5 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Thu, 9 Apr 2026 12:00:32 +0800 Subject: [PATCH] refactor(action): add shutdown method to ActionCore to close runner client and thread pool --- .../slhaf/partner/core/action/ActionCore.java | 31 ++++++++++++++++--- .../core/action/runner/LocalRunnerClient.java | 6 ---- .../core/action/runner/RunnerClient.java | 2 +- .../action/runner/SandboxRunnerClient.java | 4 +++ .../action/executor/ActionExecutor.java | 19 ++++++++++++ .../module/action/planner/ActionPlanner.java | 1 - 6 files changed, 51 insertions(+), 12 deletions(-) diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java index 2d2d673f..580ef5e6 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/ActionCore.java @@ -16,6 +16,7 @@ import work.slhaf.partner.core.action.runner.RunnerClient; import work.slhaf.partner.framework.agent.config.ConfigCenter; import work.slhaf.partner.framework.agent.factory.capability.annotation.CapabilityCore; import work.slhaf.partner.framework.agent.factory.capability.annotation.CapabilityMethod; +import work.slhaf.partner.framework.agent.factory.context.Shutdown; import work.slhaf.partner.framework.agent.state.State; import work.slhaf.partner.framework.agent.state.StateSerializable; import work.slhaf.partner.framework.agent.state.StateValue; @@ -23,10 +24,7 @@ import work.slhaf.partner.framework.agent.state.StateValue; import java.io.IOException; import java.nio.file.Path; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import java.util.stream.Collectors; @SuppressWarnings("FieldMayBeFinal") @@ -58,6 +56,31 @@ public class ActionCore implements StateSerializable { register(); } + @Shutdown + public void shutdown() { + try { + runnerClient.close(); + } catch (Exception e) { + log.warn("runner client close error", e); + } + try { + platformExecutor.shutdown(); + virtualExecutor.shutdown(); + + int count = 0; + if (!platformExecutor.awaitTermination(8, TimeUnit.SECONDS)) { + count += platformExecutor.shutdownNow().size(); + } + if (!virtualExecutor.awaitTermination(8, TimeUnit.SECONDS)) { + count += virtualExecutor.shutdownNow().size(); + } + if (count != 0) { + log.warn("{} tasks still running", count); + } + } catch (InterruptedException ignored) { + } + } + @CapabilityMethod public void putAction(@NonNull ExecutableAction executableAction) { actionPool.removeIf(data -> data.getUuid().equals(executableAction.getUuid())); // 用来应对 ScheduledActionData 的重新排列 diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index d8134fef..8b726287 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -115,8 +115,6 @@ public class LocalRunnerClient extends RunnerClient implements AutoCloseable { this.mcpDescWatcher = descWatcher; this.dynamicActionMcpManager = dynamicManager; this.mcpConfigWatcher = configWatcher; - - setupShutdownHook(); } private void registerPolicyProviders() { @@ -168,10 +166,6 @@ public class LocalRunnerClient extends RunnerClient implements AutoCloseable { clientRegistry.register(id, client); } - private void setupShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(this::close)); - } - @Override public void close() { if (!closed.compareAndSet(false, true)) { diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java index 8998423b..c2dfc761 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java @@ -36,7 +36,7 @@ import java.util.concurrent.ExecutorService; * */ @Slf4j -public abstract class RunnerClient { +public abstract class RunnerClient implements AutoCloseable { protected final String ACTION_PATH; diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/SandboxRunnerClient.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/SandboxRunnerClient.java index a739753d..2478afd2 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/SandboxRunnerClient.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/SandboxRunnerClient.java @@ -53,4 +53,8 @@ public class SandboxRunnerClient extends RunnerClient { throw new UnsupportedOperationException("Unimplemented method 'persistSerialize'"); } + @Override + public void close() throws Exception { + + } } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java index 75af42b0..ac8b8a62 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/executor/ActionExecutor.java @@ -11,6 +11,7 @@ import work.slhaf.partner.framework.agent.factory.capability.annotation.InjectCa import work.slhaf.partner.framework.agent.factory.component.abstracts.AbstractAgentModule; import work.slhaf.partner.framework.agent.factory.component.annotation.Init; import work.slhaf.partner.framework.agent.factory.component.annotation.InjectModule; +import work.slhaf.partner.framework.agent.factory.context.Shutdown; import work.slhaf.partner.module.action.executor.entity.*; import java.util.*; @@ -43,6 +44,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { private ExecutorService platformExecutor; private RunnerClient runnerClient; + private final AtomicBoolean closed = new AtomicBoolean(false); + @Init public void init() { virtualExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL); @@ -60,6 +63,11 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { blockManager.emitActionRecoveredBlock(recoveredActions); } + @Shutdown + public void shutdown() { + closed.set(true); + } + public void execute(Action action) { Future future = virtualExecutor.submit(actionExecutionRouter(action)); @@ -119,11 +127,16 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { } private void handleStateAction(StateAction stateAction) { + if (closed.get()) { + return; + } blockManager.emitStateActionTriggeredBlock(stateAction); stateAction.getTrigger().onTrigger(); } private void handleExecutableAction(ExecutableAction executableAction) { + actionCapability.putAction(executableAction); + val source = executableAction.getSource(); val status = executableAction.getStatus(); if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) { @@ -180,6 +193,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { }; stageCursor.init(); do { + if (closed.get()) { + return; + } val metaActions = actionChain.get(executableAction.getExecutingStage()); val recognizerRecord = startRecognizerIfNeeded(executableAction, phaser); val listeningRecord = executeAndListening(metaActions, phaser, executableAction, source); @@ -187,6 +203,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone { // synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进 // 导致新行动的 phaser 投放阶段错乱无法阻塞的场景 // 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题 + if (closed.get()) { + return; + } synchronized (listeningRecord.accepting()) { listeningRecord.accepting().set(false); // 立即尝试推进,本次推进中,如果前方仍有未执行 stage,将执行一次阶段推进 diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/planner/ActionPlanner.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/planner/ActionPlanner.java index ab7d9575..baa0edb5 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/planner/ActionPlanner.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/planner/ActionPlanner.java @@ -277,7 +277,6 @@ public class ActionPlanner extends AbstractAgentModule.Running