mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 08:43:02 +08:00
refactor(action): add shutdown method to ActionCore to close runner client and thread pool
This commit is contained in:
@@ -16,6 +16,7 @@ import work.slhaf.partner.core.action.runner.RunnerClient;
|
||||
import work.slhaf.partner.framework.agent.config.ConfigCenter;
|
||||
import work.slhaf.partner.framework.agent.factory.capability.annotation.CapabilityCore;
|
||||
import work.slhaf.partner.framework.agent.factory.capability.annotation.CapabilityMethod;
|
||||
import work.slhaf.partner.framework.agent.factory.context.Shutdown;
|
||||
import work.slhaf.partner.framework.agent.state.State;
|
||||
import work.slhaf.partner.framework.agent.state.StateSerializable;
|
||||
import work.slhaf.partner.framework.agent.state.StateValue;
|
||||
@@ -23,10 +24,7 @@ import work.slhaf.partner.framework.agent.state.StateValue;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@SuppressWarnings("FieldMayBeFinal")
|
||||
@@ -58,6 +56,31 @@ public class ActionCore implements StateSerializable {
|
||||
register();
|
||||
}
|
||||
|
||||
@Shutdown
|
||||
public void shutdown() {
|
||||
try {
|
||||
runnerClient.close();
|
||||
} catch (Exception e) {
|
||||
log.warn("runner client close error", e);
|
||||
}
|
||||
try {
|
||||
platformExecutor.shutdown();
|
||||
virtualExecutor.shutdown();
|
||||
|
||||
int count = 0;
|
||||
if (!platformExecutor.awaitTermination(8, TimeUnit.SECONDS)) {
|
||||
count += platformExecutor.shutdownNow().size();
|
||||
}
|
||||
if (!virtualExecutor.awaitTermination(8, TimeUnit.SECONDS)) {
|
||||
count += virtualExecutor.shutdownNow().size();
|
||||
}
|
||||
if (count != 0) {
|
||||
log.warn("{} tasks still running", count);
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
@CapabilityMethod
|
||||
public void putAction(@NonNull ExecutableAction executableAction) {
|
||||
actionPool.removeIf(data -> data.getUuid().equals(executableAction.getUuid())); // 用来应对 ScheduledActionData 的重新排列
|
||||
|
||||
@@ -115,8 +115,6 @@ public class LocalRunnerClient extends RunnerClient implements AutoCloseable {
|
||||
this.mcpDescWatcher = descWatcher;
|
||||
this.dynamicActionMcpManager = dynamicManager;
|
||||
this.mcpConfigWatcher = configWatcher;
|
||||
|
||||
setupShutdownHook();
|
||||
}
|
||||
|
||||
private void registerPolicyProviders() {
|
||||
@@ -168,10 +166,6 @@ public class LocalRunnerClient extends RunnerClient implements AutoCloseable {
|
||||
clientRegistry.register(id, client);
|
||||
}
|
||||
|
||||
private void setupShutdownHook() {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (!closed.compareAndSet(false, true)) {
|
||||
|
||||
@@ -36,7 +36,7 @@ import java.util.concurrent.ExecutorService;
|
||||
* </ol>
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class RunnerClient {
|
||||
public abstract class RunnerClient implements AutoCloseable {
|
||||
|
||||
protected final String ACTION_PATH;
|
||||
|
||||
|
||||
@@ -53,4 +53,8 @@ public class SandboxRunnerClient extends RunnerClient {
|
||||
throw new UnsupportedOperationException("Unimplemented method 'persistSerialize'");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import work.slhaf.partner.framework.agent.factory.capability.annotation.InjectCa
|
||||
import work.slhaf.partner.framework.agent.factory.component.abstracts.AbstractAgentModule;
|
||||
import work.slhaf.partner.framework.agent.factory.component.annotation.Init;
|
||||
import work.slhaf.partner.framework.agent.factory.component.annotation.InjectModule;
|
||||
import work.slhaf.partner.framework.agent.factory.context.Shutdown;
|
||||
import work.slhaf.partner.module.action.executor.entity.*;
|
||||
|
||||
import java.util.*;
|
||||
@@ -43,6 +44,8 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
||||
private ExecutorService platformExecutor;
|
||||
private RunnerClient runnerClient;
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
|
||||
@Init
|
||||
public void init() {
|
||||
virtualExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
|
||||
@@ -60,6 +63,11 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
||||
blockManager.emitActionRecoveredBlock(recoveredActions);
|
||||
}
|
||||
|
||||
@Shutdown
|
||||
public void shutdown() {
|
||||
closed.set(true);
|
||||
}
|
||||
|
||||
public void execute(Action action) {
|
||||
|
||||
Future<?> future = virtualExecutor.submit(actionExecutionRouter(action));
|
||||
@@ -119,11 +127,16 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
||||
}
|
||||
|
||||
private void handleStateAction(StateAction stateAction) {
|
||||
if (closed.get()) {
|
||||
return;
|
||||
}
|
||||
blockManager.emitStateActionTriggeredBlock(stateAction);
|
||||
stateAction.getTrigger().onTrigger();
|
||||
}
|
||||
|
||||
private void handleExecutableAction(ExecutableAction executableAction) {
|
||||
actionCapability.putAction(executableAction);
|
||||
|
||||
val source = executableAction.getSource();
|
||||
val status = executableAction.getStatus();
|
||||
if (status != Action.Status.PREPARE && status != Action.Status.EXECUTING) {
|
||||
@@ -180,6 +193,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
||||
};
|
||||
stageCursor.init();
|
||||
do {
|
||||
if (closed.get()) {
|
||||
return;
|
||||
}
|
||||
val metaActions = actionChain.get(executableAction.getExecutingStage());
|
||||
val recognizerRecord = startRecognizerIfNeeded(executableAction, phaser);
|
||||
val listeningRecord = executeAndListening(metaActions, phaser, executableAction, source);
|
||||
@@ -187,6 +203,9 @@ public class ActionExecutor extends AbstractAgentModule.Standalone {
|
||||
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
|
||||
// 导致新行动的 phaser 投放阶段错乱无法阻塞的场景
|
||||
// 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题
|
||||
if (closed.get()) {
|
||||
return;
|
||||
}
|
||||
synchronized (listeningRecord.accepting()) {
|
||||
listeningRecord.accepting().set(false);
|
||||
// 立即尝试推进,本次推进中,如果前方仍有未执行 stage,将执行一次阶段推进
|
||||
|
||||
@@ -277,7 +277,6 @@ public class ActionPlanner extends AbstractAgentModule.Running<PartnerRunningFlo
|
||||
}
|
||||
|
||||
private void executeImmediateWithWatcher(ImmediateExecutableAction action) {
|
||||
actionCapability.putAction(action);
|
||||
actionExecutor.execute(action);
|
||||
|
||||
AtomicBoolean notified = new AtomicBoolean(false);
|
||||
|
||||
Reference in New Issue
Block a user