mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
refactor(gateway): init and load config from config center for WebSocketGateway
This commit is contained in:
@@ -8,11 +8,9 @@ import org.java_websocket.framing.Framedata;
|
|||||||
import org.java_websocket.handshake.ClientHandshake;
|
import org.java_websocket.handshake.ClientHandshake;
|
||||||
import org.java_websocket.server.WebSocketServer;
|
import org.java_websocket.server.WebSocketServer;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import work.slhaf.partner.api.agent.runtime.config.AgentConfigLoader;
|
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.AgentGateway;
|
import work.slhaf.partner.api.agent.runtime.interaction.AgentGateway;
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.data.InputData;
|
import work.slhaf.partner.api.agent.runtime.interaction.data.InputData;
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.data.InteractionEvent;
|
import work.slhaf.partner.api.agent.runtime.interaction.data.InteractionEvent;
|
||||||
import work.slhaf.partner.common.config.PartnerAgentConfigLoader;
|
|
||||||
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
|
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
@@ -23,7 +21,7 @@ import java.util.concurrent.Executors;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class WebSocketGateway extends WebSocketServer implements AgentGateway<InputData, PartnerRunningFlowContext> {
|
public class WebSocketGateway extends WebSocketServer implements AgentGateway<InputData, PartnerRunningFlowContext> {
|
||||||
|
|
||||||
private static final long HEARTBEAT_INTERVAL = 10_000;
|
private final long heartbeatInterval;
|
||||||
|
|
||||||
@ToString.Exclude
|
@ToString.Exclude
|
||||||
private final ConcurrentHashMap<String, WebSocket> userSessions = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, WebSocket> userSessions = new ConcurrentHashMap<>();
|
||||||
@@ -32,12 +30,9 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<In
|
|||||||
// 记录最后一次收到Pong的时间
|
// 记录最后一次收到Pong的时间
|
||||||
private final ConcurrentHashMap<WebSocket, Long> lastPongTimes = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<WebSocket, Long> lastPongTimes = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public WebSocketGateway() {
|
public WebSocketGateway(int port, long heartbeatInterval) {
|
||||||
this(((PartnerAgentConfigLoader) AgentConfigLoader.INSTANCE).getConfig().getWebSocketConfig().getPort());
|
|
||||||
}
|
|
||||||
|
|
||||||
private WebSocketGateway(int port) {
|
|
||||||
super(new InetSocketAddress(port));
|
super(new InetSocketAddress(port));
|
||||||
|
this.heartbeatInterval = heartbeatInterval;
|
||||||
this.setReuseAddr(true);
|
this.setReuseAddr(true);
|
||||||
this.executor = Executors.newSingleThreadExecutor();
|
this.executor = Executors.newSingleThreadExecutor();
|
||||||
}
|
}
|
||||||
@@ -71,7 +66,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<In
|
|||||||
executor.execute(() -> {
|
executor.execute(() -> {
|
||||||
while (!Thread.interrupted()) {
|
while (!Thread.interrupted()) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(HEARTBEAT_INTERVAL);
|
Thread.sleep(heartbeatInterval);
|
||||||
checkConnections();
|
checkConnections();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
@@ -90,7 +85,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<In
|
|||||||
|
|
||||||
// 检查上次Pong响应是否超时(2倍心跳间隔)
|
// 检查上次Pong响应是否超时(2倍心跳间隔)
|
||||||
Long lastPong = lastPongTimes.get(conn);
|
Long lastPong = lastPongTimes.get(conn);
|
||||||
if (lastPong != null && now - lastPong > HEARTBEAT_INTERVAL * 2) {
|
if (lastPong != null && now - lastPong > heartbeatInterval * 2) {
|
||||||
log.warn("Connection {} timed out, closing...", conn.getRemoteSocketAddress());
|
log.warn("Connection {} timed out, closing...", conn.getRemoteSocketAddress());
|
||||||
conn.close(1001, "No Pong response");
|
conn.close(1001, "No Pong response");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,48 @@
|
|||||||
|
package work.slhaf.partner.runtime.interaction;
|
||||||
|
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
import work.slhaf.partner.api.agent.runtime.config.Config;
|
||||||
|
import work.slhaf.partner.api.agent.runtime.config.ConfigRegistration;
|
||||||
|
import work.slhaf.partner.api.agent.runtime.config.Configurable;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class WebSocketGatewayRegistry implements Configurable {
|
||||||
|
// TODO 在 Agent 入口处,针对这类内容提供统一注册
|
||||||
|
@Override
|
||||||
|
public @NotNull Map<Path, ConfigRegistration<? extends Config>> declare() {
|
||||||
|
return Map.of(Path.of("gateway", "websocket.json"), new WebSocketRegistration());
|
||||||
|
}
|
||||||
|
|
||||||
|
static class WebSocketRegistration implements ConfigRegistration<WebSocketConfig> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@NotNull
|
||||||
|
public Class<WebSocketConfig> type() {
|
||||||
|
return WebSocketConfig.class;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(@NotNull WebSocketConfig config) {
|
||||||
|
new WebSocketGateway(config.port, config.heartbeatInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public WebSocketConfig defaultConfig() {
|
||||||
|
return new WebSocketConfig(29600, 10_000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class WebSocketConfig extends Config {
|
||||||
|
final int port;
|
||||||
|
final int heartbeatInterval;
|
||||||
|
|
||||||
|
WebSocketConfig(int port, int heartbeatInterval) {
|
||||||
|
this.port = port;
|
||||||
|
this.heartbeatInterval = heartbeatInterval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user