mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 08:43:02 +08:00
refactor(agent): support register registry shutdown hooks while agent launching
This commit is contained in:
@@ -46,7 +46,6 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<In
|
||||
return;
|
||||
}
|
||||
this.start();
|
||||
setShutDownHook();
|
||||
startHeartbeatThread();
|
||||
AgentRuntime.INSTANCE.registerResponseChannel(getChannelName(), this);
|
||||
}
|
||||
@@ -106,32 +105,6 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<In
|
||||
log.debug("Received Pong from {}", conn.getRemoteSocketAddress());
|
||||
}
|
||||
|
||||
private void setShutDownHook() {
|
||||
try {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
try {
|
||||
// 先断开所有客户端连接
|
||||
for (WebSocket webSocket : getConnections()) {
|
||||
try {
|
||||
webSocket.close(1001, "Server shutting down");
|
||||
} catch (Exception e) {
|
||||
log.warn("关闭客户端连接时出错: ", e);
|
||||
}
|
||||
}
|
||||
//关闭WebSocketServer,给10秒超时时间确保连接正确关闭
|
||||
this.stop(10000);
|
||||
log.info("WebSocketServer 已关闭");
|
||||
} catch (IllegalStateException e) {
|
||||
log.warn("无法添加关闭钩子,JVM可能已在关闭过程中: ", e);
|
||||
} catch (Exception e) {
|
||||
log.error("WebSocketServer关闭失败: ", e);
|
||||
}
|
||||
}));
|
||||
} catch (IllegalStateException e) {
|
||||
log.warn("无法添加关闭钩子,JVM可能已在关闭过程中: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
|
||||
log.info("新连接: {}", webSocket.getRemoteSocketAddress());
|
||||
|
||||
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import work.slhaf.partner.framework.agent.config.ConfigCenter;
|
||||
import work.slhaf.partner.framework.agent.exception.AgentLaunchFailedException;
|
||||
import work.slhaf.partner.framework.agent.factory.AgentRegisterFactory;
|
||||
import work.slhaf.partner.framework.agent.factory.context.AgentContext;
|
||||
import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistration;
|
||||
import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistry;
|
||||
import work.slhaf.partner.framework.agent.model.ModelRuntimeRegistry;
|
||||
@@ -58,6 +59,21 @@ public final class Agent {
|
||||
for (AgentGatewayRegistration registration : gatewayRegistrations) {
|
||||
registration.register();
|
||||
}
|
||||
AgentContext.INSTANCE.addPreShutdownHook(
|
||||
"agent-gateway-registry-close",
|
||||
0,
|
||||
AgentGatewayRegistry.INSTANCE::close
|
||||
);
|
||||
AgentContext.INSTANCE.addPostShutdownHook(
|
||||
"state-center-save",
|
||||
0,
|
||||
StateCenter.INSTANCE::save
|
||||
);
|
||||
AgentContext.INSTANCE.addPostShutdownHook(
|
||||
"config-center-close",
|
||||
100,
|
||||
ConfigCenter.INSTANCE::close
|
||||
);
|
||||
Path externalModuleDir = ConfigCenter.INSTANCE.getPaths().getResourcesDir().resolve("module");
|
||||
AgentRegisterFactory.addScanDir(externalModuleDir.toString());
|
||||
AgentRegisterFactory.launch(applicationClass.getPackageName());
|
||||
|
||||
@@ -36,6 +36,8 @@ object AgentContext {
|
||||
get() = _metadata
|
||||
|
||||
private val shutdownHooks = mutableMapOf<ShutdownHookDesc.Type, MutableList<ShutdownHookDesc>>()
|
||||
private val preShutdownHooks = mutableListOf<LifecycleShutdownHookDesc>()
|
||||
private val postShutdownHooks = mutableListOf<LifecycleShutdownHookDesc>()
|
||||
|
||||
init {
|
||||
installShutdownHook()
|
||||
@@ -91,6 +93,14 @@ object AgentContext {
|
||||
return true
|
||||
}
|
||||
|
||||
fun addPreShutdownHook(name: String, order: Int = 0, action: Runnable) {
|
||||
preShutdownHooks.add(LifecycleShutdownHookDesc(name, order, action))
|
||||
}
|
||||
|
||||
fun addPostShutdownHook(name: String, order: Int = 0, action: Runnable) {
|
||||
postShutdownHooks.add(LifecycleShutdownHookDesc(name, order, action))
|
||||
}
|
||||
|
||||
private fun installShutdownHook() {
|
||||
|
||||
class Instances(
|
||||
@@ -157,13 +167,27 @@ object AgentContext {
|
||||
}
|
||||
}
|
||||
|
||||
fun triggerLifecycleHooks(hooks: List<LifecycleShutdownHookDesc>) {
|
||||
val log = LoggerFactory.getLogger(AgentContext::class.java)
|
||||
hooks.sortedBy { it.order }
|
||||
.forEach {
|
||||
try {
|
||||
it.action.run()
|
||||
} catch (e: Exception) {
|
||||
log.error("Failed to invoke lifecycle shutdown hook {}", it.name, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(Thread {
|
||||
val instances = computeInstances()
|
||||
triggerLifecycleHooks(preShutdownHooks)
|
||||
shutdownHooks[ShutdownHookDesc.Type.RUNNING]?.let { trigger(it, instances) }
|
||||
shutdownHooks[ShutdownHookDesc.Type.ADDITIONAL]?.let { trigger(it, instances) }
|
||||
shutdownHooks[ShutdownHookDesc.Type.STANDALONE]?.let { trigger(it, instances) }
|
||||
shutdownHooks[ShutdownHookDesc.Type.SUB]?.let { trigger(it, instances) }
|
||||
shutdownHooks[ShutdownHookDesc.Type.CAPABILITY]?.let { trigger(it, instances) }
|
||||
triggerLifecycleHooks(postShutdownHooks)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -248,3 +272,9 @@ data class ShutdownHookDesc(
|
||||
CAPABILITY
|
||||
}
|
||||
}
|
||||
|
||||
data class LifecycleShutdownHookDesc(
|
||||
val name: String,
|
||||
val order: Int,
|
||||
val action: Runnable
|
||||
)
|
||||
|
||||
@@ -11,7 +11,7 @@ import java.nio.file.Path
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
object AgentGatewayRegistry : Configurable, ConfigRegistration<AgentGatewayRegistryConfig> {
|
||||
object AgentGatewayRegistry : Configurable, ConfigRegistration<AgentGatewayRegistryConfig>, AutoCloseable {
|
||||
|
||||
private val log = LoggerFactory.getLogger(AgentGatewayRegistry::class.java)
|
||||
private val registryLock = ReentrantLock()
|
||||
@@ -56,6 +56,12 @@ object AgentGatewayRegistry : Configurable, ConfigRegistration<AgentGatewayRegis
|
||||
|
||||
override fun defaultConfig(): AgentGatewayRegistryConfig? = null
|
||||
|
||||
override fun close() = registryLock.withLock {
|
||||
val currentChannels = runningChannels.keys.toList()
|
||||
currentChannels.forEach(this::stopChannel)
|
||||
AgentRuntime.setDefaultResponseChannel(LogChannel.channelName)
|
||||
}
|
||||
|
||||
private fun applyConfig(config: AgentGatewayRegistryConfig) {
|
||||
validateConfig(config)
|
||||
reconcileChannels(config.channels)
|
||||
|
||||
Reference in New Issue
Block a user