From 973875b2e7401bac8736ddcec8307803d3ecd3b5 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sat, 2 May 2026 00:02:22 +0800 Subject: [PATCH] feat(onebot): add OneBot v11 reverse websocket adapter --- .../runtime/gateway/WebSocketGateway.java | 2 +- .../onebot/OnebotAdapterBootstrap.java | 16 ++ .../onebot/gateway/OnebotGateway.java | 213 ++++++++++++++++++ .../gateway/OnebotGatewayRegistration.kt | 19 ++ .../onebot/v11/OneBotV11ActionExecutor.kt | 54 +++++ .../external/onebot/v11/OneBotV11Event.kt | 44 ++++ .../onebot/v11/OneBotV11EventCodec.kt | 66 ++++++ 7 files changed, 413 insertions(+), 1 deletion(-) create mode 100644 Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/OnebotAdapterBootstrap.java create mode 100644 Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/gateway/OnebotGateway.java create mode 100644 Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/gateway/OnebotGatewayRegistration.kt create mode 100644 Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11ActionExecutor.kt create mode 100644 Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11Event.kt create mode 100644 Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11EventCodec.kt diff --git a/Partner-Core/src/main/java/work/slhaf/partner/runtime/gateway/WebSocketGateway.java b/Partner-Core/src/main/java/work/slhaf/partner/runtime/gateway/WebSocketGateway.java index 096c8fe0..dd87d928 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/runtime/gateway/WebSocketGateway.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/runtime/gateway/WebSocketGateway.java @@ -130,7 +130,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway { + + private final AtomicReference activeConnection = new AtomicReference<>(); + private final AtomicBoolean launched = new AtomicBoolean(false); + private final String path; + private final String accessToken; + private final AtomicLong lastHeartbeatAt = new AtomicLong(); + + public OnebotGateway(int port, @NotNull String hostname, @NotNull String path, String accessToken) { + super(new InetSocketAddress(hostname, port)); + this.setReuseAddr(true); + this.path = path; + this.accessToken = accessToken; + + OneBotV11ActionExecutor.bindConnectionReference(this.activeConnection); + log.info("Onebot will start with {}: {}, {}", hostname, port, path); + } + + @Override + public void launch() { + if (!launched.compareAndSet(false, true)) { + return; + } + this.start(); + } + + @Override + public PartnerRunningFlowContext parseRunningFlowContext(InputData inputData) { + PartnerRunningFlowContext context = PartnerRunningFlowContext.fromUser(inputData.getSource(), inputData.getContent()); + inputData.getMeta().forEach(context::putUserInfo); + return context; + } + + @Override + public void close() { + try { + for (WebSocket webSocket : getConnections()) { + if (webSocket != null && webSocket.isOpen()) { + webSocket.close(1001, "Server shutting down"); + } + } + if (launched.get()) { + super.stop(1000); + } + } catch (Exception e) { + log.warn("关闭 OneBotGateway 失败", e); + } finally { + activeConnection.set(null); + launched.set(false); + } + } + + @Override + @NotNull + public String getChannelName() { + return "onebot_channel"; + } + + @Override + public void response(@NotNull InteractionEvent event) { + String content = extractResponseContent(event); + if (content == null || content.isBlank()) { + return; + } + + WebSocket conn = activeConnection.get(); + if (conn == null || !conn.isOpen()) { + log.warn("No active OneBot connection for response target: {}", event.getTarget()); + return; + } + + boolean sent = OneBotV11ActionExecutor.sendMessage(event.getTarget(), content); + if (!sent) { + log.warn("Unsupported OneBot response target: {}", event.getTarget()); + } + } + + private String extractResponseContent(InteractionEvent event) { + if (event instanceof ReplyEvent replyEvent) { + return replyEvent.getContent(); + } + if (event instanceof ModuleEvent moduleEvent) { + return moduleEvent.getData().getContent(); + } + if (event instanceof SystemEvent systemEvent) { + return systemEvent.getTitle() + "\n" + systemEvent.getContent(); + } + return null; + } + + @Override + public void onOpen(WebSocket conn, ClientHandshake handshake) { + String actualPath = handshake.getResourceDescriptor(); + + if (!actualPath.equals(path)) { + conn.close(1008, "Unsupported OneBot path: " + actualPath); + return; + } + + if (!verifyAccessToken(handshake)) { + conn.close(1008, "Invalid access token"); + return; + } + + WebSocket old = activeConnection.getAndSet(conn); + if (old != null && old != conn && old.isOpen()) { + old.close(1001, "Replaced by new OneBot connection"); + } + + log.info("OneBot connected: {}", conn.getRemoteSocketAddress()); + } + + private boolean verifyAccessToken(ClientHandshake handshake) { + if (accessToken == null || accessToken.isBlank()) { + return true; + } + + String authorization = handshake.getFieldValue("Authorization"); + return ("Bearer " + accessToken).equals(authorization); + } + + @Override + public void onClose(WebSocket conn, int code, String reason, boolean remote) { + log.info("OneBot connection closed: {}, code={}, reason={}, remote={}", + conn.getRemoteSocketAddress(), code, reason, remote); + activeConnection.compareAndSet(conn, null); + } + + @Override + public void onMessage(WebSocket conn, String message) { + try { + JSONObject json = JSON.parseObject(message); + if (json.containsKey("echo")) { + handleActionResponse(json); + return; + } + + OneBotV11Event event = OneBotV11EventCodec.parse(json); + if (event == null) { + log.debug("Ignore unsupported OneBot payload: {}", message); + return; + } + handleEvent(event); + } catch (Exception e) { + log.warn("Invalid OneBot payload: {}", message, e); + } + } + + private void handleActionResponse(JSONObject response) { + log.debug("OneBot action response: {}", response); + } + + private void handleEvent(OneBotV11Event event) { + if (event instanceof OneBotV11MetaEvent metaEvent) { + if (metaEvent.isHeartbeat()) { + lastHeartbeatAt.set(System.currentTimeMillis()); + log.debug("OneBot heartbeat received"); + } + return; + } + + if (event instanceof OneBotV11MessageEvent messageEvent) { + handleMessageEvent(messageEvent); + return; + } + + log.debug("Ignore unsupported OneBot event: {}", event); + } + + private void handleMessageEvent(OneBotV11MessageEvent event) { + if (!event.isPrivateMessage()) { + log.debug("Ignore non-private OneBot message, type={}", event.getMessageType().getDisplayName()); + return; + } + + InputData inputData = OneBotV11EventCodec.toInputData(event); + if (inputData == null) { + log.debug("Ignore empty OneBot private message"); + return; + } + receive(inputData); + } + + @Override + public void onError(WebSocket conn, Exception ex) { + log.error("OneBotGateway error", ex); + } + + @Override + public void onStart() { + log.info("OneBotGateway 已启动..."); + } +} diff --git a/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/gateway/OnebotGatewayRegistration.kt b/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/gateway/OnebotGatewayRegistration.kt new file mode 100644 index 00000000..7fe320b3 --- /dev/null +++ b/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/gateway/OnebotGatewayRegistration.kt @@ -0,0 +1,19 @@ +package work.slhaf.partner.external.onebot.gateway + +import work.slhaf.partner.framework.agent.interaction.AgentGateway +import work.slhaf.partner.framework.agent.interaction.AgentGatewayRegistration + +class OnebotGatewayRegistration : AgentGatewayRegistration { + override val channelName: String = "onebot_channel" + + override fun create(params: Map): AgentGateway<*, *> { + val port = params["port"]?.toIntOrNull() ?: 29700 + val hostname = params["hostname"] ?: "127.0.0.1" + val path = params["path"] ?: "/onebot/v11/ws" + val token = params["token"] + + require(port > 0) { "port must be greater than 0" } + return OnebotGateway(port, hostname, path, token) + } + +} \ No newline at end of file diff --git a/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11ActionExecutor.kt b/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11ActionExecutor.kt new file mode 100644 index 00000000..4a87bf4e --- /dev/null +++ b/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11ActionExecutor.kt @@ -0,0 +1,54 @@ +package work.slhaf.partner.external.onebot.v11 + +import com.alibaba.fastjson2.JSONObject +import org.java_websocket.WebSocket +import java.util.* +import kotlin.concurrent.atomics.AtomicReference +import kotlin.concurrent.atomics.ExperimentalAtomicApi + +@OptIn(ExperimentalAtomicApi::class) +object OneBotV11ActionExecutor { + + private lateinit var activeConnection: AtomicReference + + @JvmStatic + fun bindConnectionReference(reference: AtomicReference) { + activeConnection = reference + } + + @JvmStatic + fun sendMessage(target: String, message: String): Boolean { + val action = buildSendMessageAction(target, message) ?: return false + activeConnection.load().send(action.toJSONString()) + return true + } + + private fun buildSendMessageAction(target: String, message: String): JSONObject? { + val segments = target.split(":") + if (segments.size < 3 || segments[0] != "onebot") { + return null + } + + val params = JSONObject() + val actionName = when (segments[1]) { + "private" -> { + params["user_id"] = segments[2].toLongOrNull() ?: return null + "send_private_msg" + } + + "group" -> { + params["group_id"] = segments[2].toLongOrNull() ?: return null + "send_group_msg" + } + + else -> return null + } + params["message"] = message + + val action = JSONObject() + action["action"] = actionName + action["params"] = params + action["echo"] = UUID.randomUUID().toString() + return action + } +} diff --git a/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11Event.kt b/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11Event.kt new file mode 100644 index 00000000..f11beb49 --- /dev/null +++ b/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11Event.kt @@ -0,0 +1,44 @@ +package work.slhaf.partner.external.onebot.v11 + +sealed class OneBotV11Event { + abstract val postType: OneBotV11PostType +} + +enum class OneBotV11PostType { + MESSAGE, + META_EVENT +} + +enum class OneBotV11MessageType(val displayName: String) { + PRIVATE("私聊"), + GROUP("群聊"), + UNKNOWN("未知") +} + +data class OneBotV11MessageEvent( + val messageType: OneBotV11MessageType, + val userId: Long, + val groupId: Long?, + val message: Any?, + val rawMessage: String? +) : OneBotV11Event() { + override val postType: OneBotV11PostType = OneBotV11PostType.MESSAGE + + fun isPrivateMessage(): Boolean = messageType == OneBotV11MessageType.PRIVATE + + fun isGroupMessage(): Boolean = messageType == OneBotV11MessageType.GROUP + + fun routeTarget(): String? = when { + isPrivateMessage() -> "onebot:private:$userId" + isGroupMessage() && groupId != null -> "onebot:group:$groupId:$userId" + else -> null + } +} + +data class OneBotV11MetaEvent( + val metaEventType: String +) : OneBotV11Event() { + override val postType: OneBotV11PostType = OneBotV11PostType.META_EVENT + + fun isHeartbeat(): Boolean = metaEventType == "heartbeat" +} diff --git a/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11EventCodec.kt b/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11EventCodec.kt new file mode 100644 index 00000000..65348b30 --- /dev/null +++ b/Partner-External-Modules/Partner-Onebot-Adapter/src/main/java/work/slhaf/partner/external/onebot/v11/OneBotV11EventCodec.kt @@ -0,0 +1,66 @@ +package work.slhaf.partner.external.onebot.v11 + +import com.alibaba.fastjson2.JSONArray +import com.alibaba.fastjson2.JSONObject +import work.slhaf.partner.framework.agent.interaction.data.InputData + +object OneBotV11EventCodec { + + @JvmStatic + fun parse(json: JSONObject): OneBotV11Event? { + return when (json.getString("post_type")) { + "message" -> parseMessage(json) + "meta_event" -> parseMetaEvent(json) + else -> null + } + } + + @JvmStatic + fun toInputData(event: OneBotV11MessageEvent): InputData? { + val target = event.routeTarget() ?: return null + val content = extractText(event).takeIf { it.isNotBlank() } ?: return null + val inputData = InputData(target, content) + inputData.addMeta("message_type", event.messageType.displayName) + event.groupId?.let { inputData.addMeta("group_id", it.toString()) } + return inputData + } + + private fun extractText(event: OneBotV11MessageEvent): String { + event.rawMessage?.takeIf { it.isNotBlank() }?.let { return it } + val message = event.message ?: return "" + return when (message) { + is String -> message + is JSONArray -> message.asSequence() + .filterIsInstance() + .filter { it.getString("type") == "text" } + .mapNotNull { it.getJSONObject("data")?.getString("text") } + .joinToString("") + + else -> message.toString() + } + } + + private fun parseMessage(json: JSONObject): OneBotV11MessageEvent { + return OneBotV11MessageEvent( + messageType = parseMessageType(json.getString("message_type")), + userId = json.getLongValue("user_id"), + groupId = json.getLong("group_id"), + message = json["message"], + rawMessage = json.getString("raw_message") + ) + } + + private fun parseMetaEvent(json: JSONObject): OneBotV11MetaEvent { + return OneBotV11MetaEvent( + metaEventType = json.getString("meta_event_type") ?: "" + ) + } + + private fun parseMessageType(value: String?): OneBotV11MessageType { + return when (value) { + "private" -> OneBotV11MessageType.PRIVATE + "group" -> OneBotV11MessageType.GROUP + else -> OneBotV11MessageType.UNKNOWN + } + } +}