From 88a14f36b2f5c6cf7522ed737be1dbc544775664 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Mon, 22 Dec 2025 14:56:23 +0800 Subject: [PATCH] refactor(runner): relocate InProcessMcpTransport to experimental and move local MCP client logic into LocalRunnerClient MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Context: Recent changes blurred the responsibility boundary between RunnerClient and LocalRunnerClient. This refactor moves local MCP client–specific logic into LocalRunnerClient and isolates InProcessMcpTransport and related code under the experimental package. RunnerClient only defines indispensable methods and attributes. --- .../core/action/runner/LocalRunnerClient.java | 123 ++++++- .../core/action/runner/RunnerClient.java | 301 ------------------ .../experimental/InProcessMcpTransport.java | 190 +++++++++++ .../core/action/runner/RunnerClientTest.java | 79 ----- 4 files changed, 311 insertions(+), 382 deletions(-) create mode 100644 Partner-Main/src/test/java/experimental/InProcessMcpTransport.java diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index 669d5aa1..5907e274 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -3,12 +3,20 @@ package work.slhaf.partner.core.action.runner; import cn.hutool.core.io.FileUtil; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import io.modelcontextprotocol.client.McpClient; import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; +import io.modelcontextprotocol.client.transport.ServerParameters; +import io.modelcontextprotocol.client.transport.StdioClientTransport; +import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.spec.McpClientTransport; import io.modelcontextprotocol.spec.McpSchema; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.UnknownNullability; import work.slhaf.partner.core.action.entity.McpData; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; @@ -22,6 +30,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -36,10 +45,13 @@ import static work.slhaf.partner.common.Constant.Path.TMP_ACTION_DIR_LOCAL; @Slf4j public class LocalRunnerClient extends RunnerClient { + private final Map mcpClients = new HashMap<>(); + public LocalRunnerClient(ConcurrentHashMap existedMetaActions, ExecutorService executor, @Nullable String actionWatchPath) { super(existedMetaActions, executor); ActionWatchService watchService = new ActionWatchService(actionWatchPath); watchService.launch(); + setupShutdownHook(); } @Override @@ -204,6 +216,113 @@ public class LocalRunnerClient extends RunnerClient { return result; } + /** + * 该部分主要发生在扫描到新的MCP Server描述文件时出现的注册逻辑 + * + * @param id MCP Client 的 id + * @param mcpServerParams MCP Server 的参数 + */ + private void registerMcpClient(String id, McpServerParams mcpServerParams) { + McpClientTransport clientTransport = createTransport(mcpServerParams); + McpSyncClient client = McpClient.sync(clientTransport) + .requestTimeout(Duration.ofSeconds(mcpServerParams.timeout)) + .clientInfo(new McpSchema.Implementation(id, "PARTNER")) + // 行动程序(现 MCP Tool)的描述文本将直接由resources返回 + // 原因: ToolChange 发送的内容侧重调用,缺少可承担描述文本的字段 + // ResourcesChange 事件传递的 Resource 可以由 Client 读取内容 + // 预计在 Server 侧,收到客户端发送的新的行动程序信息,该信息由客户端处补充后,将其放置在指定位置 + // 并写入描述文件、发起 ResourcesChange 事件 + .toolsChangeConsumer(tools -> updateExistedMetaActions(id, tools)) + .build(); + mcpClients.put(id, client); + } + + private void updateExistedMetaActions(String id, @UnknownNullability List tools) { + for (McpSchema.Tool tool : tools) { + MetaActionInfo info = buildMetaActionInfo(tool); + String actionKey = id + "::" + tool.name(); + existedMetaActions.put(actionKey, info); + } + } + + private static @NotNull MetaActionInfo buildMetaActionInfo(McpSchema.Tool tool) { + MetaActionInfo info = new MetaActionInfo(); + info.setDescription(tool.description()); + Map outputSchema = tool.outputSchema(); + info.setResponseSchema(outputSchema == null ? JSONObject.of() : JSONObject.from(outputSchema)); + info.setParams(tool.inputSchema().properties()); + + JSONObject meta = JSONObject.from(tool.meta()); + info.setIo(meta.getBoolean("io")); + info.setPreActions(meta.getList("pre", String.class)); + info.setPostActions(meta.getList("post", String.class)); + info.setStrictDependencies(meta.getBoolean("strict")); + info.setTags(meta.getList("tag", String.class)); + return info; + } + + private McpClientTransport createTransport(McpServerParams mcpServerParams) { + return switch (mcpServerParams) { + case StdioMcpServerParams params -> { + ServerParameters serverParameters = ServerParameters.builder(params.command) + .env(params.env) + .args(params.args) + .build(); + yield new StdioClientTransport(serverParameters, McpJsonMapper.getDefault()); + } + case HttpMcpServerParams params -> { + McpSyncHttpClientRequestCustomizer customizer = (builder, method, endpoint, body, context) -> { + params.headers.forEach(builder::setHeader); + }; + yield HttpClientSseClientTransport.builder(params.baseUri) + .httpRequestCustomizer(customizer) + .sseEndpoint(params.endpoint) + .build(); + } + }; + } + + private void setupShutdownHook() { + this.mcpClients.forEach((id, client) -> { + client.close(); + log.info("[{}] MCP-Client 已关闭", id); + }); + } + + private sealed abstract static class McpServerParams permits HttpMcpServerParams, StdioMcpServerParams { + private final int timeout; + + private McpServerParams(int timeout) { + this.timeout = timeout; + } + } + + private final static class HttpMcpServerParams extends McpServerParams { + private final String baseUri; + private final String endpoint; + private final Map headers; + + private HttpMcpServerParams(int timeout, String baseUri, String endpoint, Map header) { + super(timeout); + this.baseUri = baseUri; + this.endpoint = endpoint; + this.headers = header; + } + } + + private final static class StdioMcpServerParams extends McpServerParams { + private final String command; + private final Map env; + private final List args; + + private StdioMcpServerParams(int timeout, String command, Map env, List args) { + super(timeout); + this.command = command; + this.env = env; + this.args = args; + } + } + @Data private static class SystemExecResult { private boolean ok; @@ -211,6 +330,7 @@ public class LocalRunnerClient extends RunnerClient { private List resultList; } + //TODO 逻辑待更新,用以适配 MCP 服务的及时发现与注册 private class ActionWatchService { private final HashMap registeredPaths = new HashMap<>(); @@ -378,14 +498,13 @@ public class LocalRunnerClient extends RunnerClient { try { MetaActionInfo actionInfo = new MetaActionInfo(); existedMetaActions.put(f.getName(), actionInfo); - log.info("行动程序[{}]已加载", actionInfo.getKey()); +// log.info("行动程序[{}]已加载", actionInfo.getKey()); } catch (ActionLoadFailedException e) { log.warn("行动程序加载失败", e); } } } - } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java index aa66df8f..f410083d 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java @@ -1,27 +1,8 @@ package work.slhaf.partner.core.action.runner; import com.alibaba.fastjson2.JSONObject; -import io.modelcontextprotocol.client.McpClient; -import io.modelcontextprotocol.client.McpSyncClient; -import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; -import io.modelcontextprotocol.client.transport.ServerParameters; -import io.modelcontextprotocol.client.transport.StdioClientTransport; -import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; -import io.modelcontextprotocol.common.McpTransportContext; -import io.modelcontextprotocol.json.McpJsonMapper; -import io.modelcontextprotocol.json.TypeRef; -import io.modelcontextprotocol.server.McpServer; -import io.modelcontextprotocol.server.McpStatelessAsyncServer; -import io.modelcontextprotocol.server.McpStatelessServerHandler; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpStatelessServerTransport; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.UnknownNullability; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; import work.slhaf.partner.core.action.entity.McpData; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaAction.Result; @@ -29,15 +10,8 @@ import work.slhaf.partner.core.action.entity.MetaAction.ResultStatus; import work.slhaf.partner.core.action.entity.MetaActionInfo; import java.io.IOException; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; /** * 执行客户端抽象类 @@ -61,8 +35,6 @@ public abstract class RunnerClient { protected final ConcurrentHashMap existedMetaActions; protected final ExecutorService executor; - protected final Map mcpClients = new HashMap<>(); - protected final Map localMcpServers = new HashMap<>(); /** * ActionCore 将注入虚拟线程池 @@ -70,18 +42,6 @@ public abstract class RunnerClient { public RunnerClient(ConcurrentHashMap existedMetaActions, ExecutorService executor) { this.existedMetaActions = existedMetaActions; this.executor = executor; - setupShutdownHook(); - } - - protected void setupShutdownHook() { - this.mcpClients.forEach((id, client) -> { - client.close(); - log.info("[{}] MCP-Client 已关闭", id); - }); - this.localMcpServers.forEach((id, server) -> { - server.close(); - log.info("[{}] MCP-Server 已关闭", id); - }); } /** @@ -98,85 +58,6 @@ public abstract class RunnerClient { result.setStatus(response.isOk() ? ResultStatus.SUCCESS : ResultStatus.FAILED); } - protected void registerMcpClient(String id, McpServerParams mcpServerParams) { - McpClientTransport clientTransport = createTransport(mcpServerParams); - McpSyncClient client = McpClient.sync(clientTransport) - .requestTimeout(Duration.ofSeconds(mcpServerParams.timeout)) - .clientInfo(new McpSchema.Implementation(id, "PARTNER")) - // 行动程序(现 MCP Tool)的描述文本将直接由resources返回 - // 原因: ToolChange 发送的内容侧重调用,缺少可承担描述文本的字段 - // ResourcesChange 事件传递的 Resource 可以由 Client 读取内容 - // 预计在 Server 侧,收到客户端发送的新的行动程序信息,该信息由客户端处补充后,将其放置在指定位置 - // 并写入描述文件、发起 ResourcesChange 事件 - .toolsChangeConsumer(tools -> updateExistedMetaActions(id, tools)) - .build(); - mcpClients.put(id, client); - } - - private void updateExistedMetaActions(String id, @UnknownNullability List tools) { - for (McpSchema.Tool tool : tools) { - MetaActionInfo info = buildMetaActionInfo(tool); - String actionKey = id + "::" + tool.name(); - existedMetaActions.put(actionKey, info); - } - } - - private static @NotNull MetaActionInfo buildMetaActionInfo(McpSchema.Tool tool) { - MetaActionInfo info = new MetaActionInfo(); - info.setDescription(tool.description()); - Map outputSchema = tool.outputSchema(); - info.setResponseSchema(outputSchema == null ? JSONObject.of() : JSONObject.from(outputSchema)); - info.setParams(tool.inputSchema().properties()); - - JSONObject meta = JSONObject.from(tool.meta()); - info.setIo(meta.getBoolean("io")); - info.setPreActions(meta.getList("pre", String.class)); - info.setPostActions(meta.getList("post", String.class)); - info.setStrictDependencies(meta.getBoolean("strict")); - info.setTags(meta.getList("tag", String.class)); - return info; - } - - private McpClientTransport createTransport(McpServerParams mcpServerParams) { - return switch (mcpServerParams) { - case InProcessMcpServerParams params -> { - InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); - createInProcessMcpServer(params.id, pair.serverSide); - yield pair.clientSide; - } - case StdioMcpServerParams params -> { - ServerParameters serverParameters = ServerParameters.builder(params.command) - .env(params.env) - .args(params.args) - .build(); - yield new StdioClientTransport(serverParameters, McpJsonMapper.getDefault()); - } - case HttpMcpServerParams params -> { - McpSyncHttpClientRequestCustomizer customizer = (builder, method, endpoint, body, context) -> { - params.headers.forEach(builder::setHeader); - }; - yield HttpClientSseClientTransport.builder(params.baseUri) - .httpRequestCustomizer(customizer) - .sseEndpoint(params.endpoint) - .build(); - } - }; - } - - private void createInProcessMcpServer(String id, InProcessMcpTransport serverSide) { - McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() - .tools(true) - .resources(true, true) - .build(); - - McpStatelessAsyncServer server = McpServer.async(serverSide) - .capabilities(serverCapabilities) - .serverInfo(id, "PARTNER") - .build(); - - localMcpServers.put(id, server); - } - protected abstract RunnerResponse doRun(MetaAction metaAction); public abstract String buildTmpPath(MetaAction tempAction, String codeType); @@ -196,186 +77,4 @@ public abstract class RunnerClient { private String data; } - protected sealed abstract static class McpServerParams permits HttpMcpServerParams, InProcessMcpServerParams, StdioMcpServerParams { - private final int timeout; - - private McpServerParams(int timeout) { - this.timeout = timeout; - } - } - - protected final static class HttpMcpServerParams extends McpServerParams { - private final String baseUri; - private final String endpoint; - private final Map headers; - - protected HttpMcpServerParams(int timeout, String baseUri, String endpoint, Map header) { - super(timeout); - this.baseUri = baseUri; - this.endpoint = endpoint; - this.headers = header; - } - } - - protected final static class StdioMcpServerParams extends McpServerParams { - private final String command; - private final Map env; - private final List args; - - protected StdioMcpServerParams(int timeout, String command, Map env, List args) { - super(timeout); - this.command = command; - this.env = env; - this.args = args; - } - } - - protected final static class InProcessMcpServerParams extends McpServerParams { - private final String id; - - protected InProcessMcpServerParams(int timeout, String id) { - super(timeout); - this.id = id; - } - } - - public static final class InProcessMcpTransport implements McpClientTransport, McpStatelessServerTransport { - - // 每个 transport 只处理一条 inbound 流 - private final Sinks.Many inbound = - Sinks.many().unicast().onBackpressureBuffer(); - - private final AtomicBoolean clientConnected = new AtomicBoolean(false); - private final AtomicBoolean serverConnected = new AtomicBoolean(false); - - /** - * 对端 - */ - private volatile InProcessMcpTransport peer; - - private volatile McpStatelessServerHandler serverHandler; - - public record Pair(InProcessMcpTransport clientSide, InProcessMcpTransport serverSide) { - } - - public static Pair pair() { - InProcessMcpTransport client = new InProcessMcpTransport(); - InProcessMcpTransport server = new InProcessMcpTransport(); - - client.peer = server; - server.peer = client; - - return new Pair(client, server); - } - - /* ====================================================== - * Internal receive: peer.sendMessage -> this.receive - * ====================================================== */ - private void receive(McpSchema.JSONRPCMessage message) { - if (inbound.tryEmitNext(message).isFailure()) { - throw new RuntimeException("Failed to receive message: " + message); - } - } - - /* ====================================================== - * Client → Server sendMessage - * ====================================================== */ - @Override - public Mono sendMessage(McpSchema.JSONRPCMessage message) { - InProcessMcpTransport p = this.peer; - if (p == null) { - return Mono.error(new IllegalStateException("Transport is not linked")); - } - return Mono.fromRunnable(() -> p.receive(message)); - } - - /* ====================================================== - * Client connect(handler) 处理 server → client 消息 - * ====================================================== */ - @Override - public Mono connect(Function, Mono> handler) { - if (!clientConnected.compareAndSet(false, true)) { - return Mono.error(new IllegalStateException("Client already connected")); - } - - return inbound.asFlux() - .concatMap(msg -> - handler.apply(Mono.just(msg)) - // handler may emit response message → send back to server - .flatMap(resp -> resp != null ? sendMessage(resp) : Mono.empty()) - ) - .doFinally(sig -> clientConnected.set(false)) - .then(); - } - - @Override - public void setExceptionHandler(Consumer handler) { - McpClientTransport.super.setExceptionHandler(handler); - } - - /* ====================================================== - * Server: bind stateless handler = process client → server inbound - * ====================================================== */ - @Override - public void setMcpHandler(McpStatelessServerHandler handler) { - this.serverHandler = handler; - - if (!serverConnected.compareAndSet(false, true)) { - throw new IllegalStateException("Server already connected"); - } - - // 订阅 client → server 消息 - inbound.asFlux() - .concatMap(this::handleServerMessage) - .doFinally(sig -> serverConnected.set(false)) - .subscribe(); - } - - /** - * Server 端处理 JSONRPCMessage - */ - private Mono handleServerMessage(McpSchema.JSONRPCMessage msg) { - // 创建 transport context(简单实现即可) - McpTransportContext ctx = key -> null; - - if (msg instanceof McpSchema.JSONRPCRequest req) { - return serverHandler.handleRequest(ctx, req) - .flatMap(this::sendMessage); - } - - if (msg instanceof McpSchema.JSONRPCNotification noti) { - return serverHandler.handleNotification(ctx, noti); - } - - return Mono.empty(); - } - - /* ====================================================== - * other boilerplate - * ====================================================== */ - - @Override - public void close() { - McpClientTransport.super.close(); - } - - @Override - public Mono closeGracefully() { - inbound.tryEmitComplete(); - clientConnected.set(false); - serverConnected.set(false); - return Mono.empty(); - } - - @Override - public T unmarshalFrom(Object data, TypeRef typeRef) { - return McpJsonMapper.getDefault().convertValue(data, typeRef); - } - - @Override - public List protocolVersions() { - return McpClientTransport.super.protocolVersions(); - } - } - } diff --git a/Partner-Main/src/test/java/experimental/InProcessMcpTransport.java b/Partner-Main/src/test/java/experimental/InProcessMcpTransport.java new file mode 100644 index 00000000..8ca56f69 --- /dev/null +++ b/Partner-Main/src/test/java/experimental/InProcessMcpTransport.java @@ -0,0 +1,190 @@ +package experimental; + +import io.modelcontextprotocol.client.McpClient; +import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.server.McpServer; +import io.modelcontextprotocol.server.McpStatelessServerFeatures; +import io.modelcontextprotocol.server.McpStatelessServerHandler; +import io.modelcontextprotocol.server.McpStatelessSyncServer; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpStatelessServerTransport; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; + +public final class InProcessMcpTransport implements McpClientTransport, McpStatelessServerTransport { + + // 每个 transport 只处理一条 inbound 流 + private final Sinks.Many inbound = + Sinks.many().unicast().onBackpressureBuffer(); + + private final AtomicBoolean clientConnected = new AtomicBoolean(false); + private final AtomicBoolean serverConnected = new AtomicBoolean(false); + + /** + * 对端 + */ + private volatile InProcessMcpTransport peer; + + private volatile McpStatelessServerHandler serverHandler; + + public record Pair(InProcessMcpTransport clientSide, InProcessMcpTransport serverSide) { + } + + public static Pair pair() { + InProcessMcpTransport client = new InProcessMcpTransport(); + InProcessMcpTransport server = new InProcessMcpTransport(); + + client.peer = server; + server.peer = client; + + return new Pair(client, server); + } + + /* ====================================================== + * Internal receive: peer.sendMessage -> this.receive + * ====================================================== */ + private void receive(McpSchema.JSONRPCMessage message) { + if (inbound.tryEmitNext(message).isFailure()) { + throw new RuntimeException("Failed to receive message: " + message); + } + } + + /* ====================================================== + * Client → Server sendMessage + * ====================================================== */ + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + InProcessMcpTransport p = this.peer; + if (p == null) { + return Mono.error(new IllegalStateException("Transport is not linked")); + } + return Mono.fromRunnable(() -> p.receive(message)); + } + + /* ====================================================== + * Client connect(handler) 处理 server → client 消息 + * ====================================================== */ + @Override + public Mono connect(Function, Mono> handler) { + if (!clientConnected.compareAndSet(false, true)) { + return Mono.error(new IllegalStateException("Client already connected")); + } + + return inbound.asFlux() + .concatMap(msg -> + handler.apply(Mono.just(msg)) + // handler may emit response message → send back to server + .flatMap(resp -> resp != null ? sendMessage(resp) : Mono.empty()) + ) + .doFinally(sig -> clientConnected.set(false)) + .then(); + } + + @Override + public void setExceptionHandler(Consumer handler) { + McpClientTransport.super.setExceptionHandler(handler); + } + + /* ====================================================== + * Server: bind stateless handler = process client → server inbound + * ====================================================== */ + @Override + public void setMcpHandler(McpStatelessServerHandler handler) { + this.serverHandler = handler; + + if (!serverConnected.compareAndSet(false, true)) { + throw new IllegalStateException("Server already connected"); + } + + // 订阅 client → server 消息 + inbound.asFlux() + .concatMap(this::handleServerMessage) + .doFinally(sig -> serverConnected.set(false)) + .subscribe(); + } + + /** + * Server 端处理 JSONRPCMessage + */ + private Mono handleServerMessage(McpSchema.JSONRPCMessage msg) { + // 创建 transport context(简单实现即可) + McpTransportContext ctx = key -> null; + + if (msg instanceof McpSchema.JSONRPCRequest req) { + return serverHandler.handleRequest(ctx, req) + .flatMap(this::sendMessage); + } + + if (msg instanceof McpSchema.JSONRPCNotification noti) { + return serverHandler.handleNotification(ctx, noti); + } + + return Mono.empty(); + } + + /* ====================================================== + * other boilerplate + * ====================================================== */ + + @Override + public void close() { + McpClientTransport.super.close(); + } + + @Override + public Mono closeGracefully() { + inbound.tryEmitComplete(); + clientConnected.set(false); + serverConnected.set(false); + return Mono.empty(); + } + + @Override + public T unmarshalFrom(Object data, TypeRef typeRef) { + return McpJsonMapper.getDefault().convertValue(data, typeRef); + } + + @Override + public List protocolVersions() { + return McpClientTransport.super.protocolVersions(); + } +} + +class InProcessMcpTransportTest { + @Test + void inProcessMcpTransportTest() { + InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); + InProcessMcpTransport clientSide = pair.clientSide(); + InProcessMcpTransport serverSide = pair.serverSide(); + McpStatelessSyncServer server = McpServer.sync(serverSide) + .capabilities(McpSchema.ServerCapabilities.builder().tools(true).build()) + .build(); + server.addTool(McpStatelessServerFeatures.SyncToolSpecification.builder() + .tool(McpSchema.Tool.builder().name("111").build()).callHandler((mcpTransportContext, callToolRequest) -> { + System.out.println(111); + return McpSchema.CallToolResult.builder().addContent(new McpSchema.TextContent("111")).build(); + }).build()); + McpSyncClient client = McpClient.sync(clientSide) + .build(); + + List tools = client.listTools().tools(); + McpSchema.Tool tool = tools.getFirst(); + System.out.println(tool.toString()); + + McpSchema.CallToolResult callToolResult = client.callTool(McpSchema.CallToolRequest.builder().name(tool.name()).build()); + System.out.println(callToolResult.content().toString()); + + client.close(); + server.close(); + } +} diff --git a/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/RunnerClientTest.java b/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/RunnerClientTest.java index 8605d547..040069a4 100644 --- a/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/RunnerClientTest.java +++ b/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/RunnerClientTest.java @@ -1,95 +1,16 @@ package work.slhaf.partner.core.action.runner; import com.alibaba.fastjson2.JSONObject; -import io.modelcontextprotocol.client.McpClient; -import io.modelcontextprotocol.client.McpSyncClient; -import io.modelcontextprotocol.server.McpServer; -import io.modelcontextprotocol.server.McpStatelessServerFeatures; -import io.modelcontextprotocol.server.McpStatelessSyncServer; -import io.modelcontextprotocol.spec.McpSchema; -import org.junit.jupiter.api.Test; import work.slhaf.partner.core.action.entity.McpData; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; import java.io.IOException; -import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; public class RunnerClientTest { - @Test - void httpMcpClientTest() { - TestRunnerClient testClient = new TestRunnerClient(); - RunnerClient.HttpMcpServerParams params = new RunnerClient.HttpMcpServerParams(20, "https://dashscope.aliyuncs.com", "/api/v1/mcps/WebSearch/sse", Map.of("Authorization", "Bearer sk-xxx")); - testClient.registerMcpClient("test", params); - McpSyncClient client = testClient.mcpClients.values().stream().toList().getFirst(); - List tools = client.listTools().tools(); - System.out.println(tools); - McpSchema.CallToolResult query = client.callTool(McpSchema.CallToolRequest.builder().name(tools.getFirst().name()).arguments(Map.of("query", "123")).build()); - for (McpSchema.Content content : query.content()) { - System.out.println("\r\n---\r\n"); - System.out.println(content); - } - } - - @Test - void stdioMcpClientTest() { - TestRunnerClient testClient = new TestRunnerClient(); - RunnerClient.StdioMcpServerParams params = new RunnerClient.StdioMcpServerParams(20, "uvx", Map.of("http_proxy", "http://127.0.0.1:7897", "https_proxy", "http://127.0.0.1:7897"), List.of("mcp-server-fetch")); - testClient.registerMcpClient("test", params); - McpSyncClient client = testClient.mcpClients.values().stream().toList().getFirst(); - List tools = client.listTools().tools(); - System.out.println(tools); - McpSchema.CallToolResult query = client.callTool(McpSchema.CallToolRequest.builder().name(tools.getFirst().name()).arguments(Map.of("url", "https://gitea.slhaf.work")).build()); - System.out.println(query.toString()); - } - - @Test - void schemaTest() { - TestRunnerClient testClient = new TestRunnerClient(); - RunnerClient.StdioMcpServerParams params = new RunnerClient.StdioMcpServerParams(20, "uvx", Map.of("http_proxy", "http://127.0.0.1:7897", "https_proxy", "http://127.0.0.1:7897"), List.of("mcp-server-fetch")); - testClient.registerMcpClient("test", params); - McpSyncClient client = testClient.mcpClients.values().stream().toList().getFirst(); - List tools = client.listTools().tools(); - System.out.println("\r\n ------ \r\n"); - McpSchema.Tool first = tools.getFirst(); - Map paramsSchema = first.inputSchema().properties(); - System.out.println(paramsSchema.toString()); - System.out.println("\r\n ------ \r\n"); - Map outputSchema = first.outputSchema(); - System.out.println(outputSchema); - } - - @Test - void inProcessMcpTransportTest() { - RunnerClient.InProcessMcpTransport.Pair pair = RunnerClient.InProcessMcpTransport.pair(); - RunnerClient.InProcessMcpTransport clientSide = pair.clientSide(); - RunnerClient.InProcessMcpTransport serverSide = pair.serverSide(); - McpStatelessSyncServer server = McpServer.sync(serverSide) - .capabilities(McpSchema.ServerCapabilities.builder().tools(true).build()) - .build(); - server.addTool(McpStatelessServerFeatures.SyncToolSpecification.builder() - .tool(McpSchema.Tool.builder().name("111").build()).callHandler((mcpTransportContext, callToolRequest) -> { - System.out.println(111); - return McpSchema.CallToolResult.builder().addContent(new McpSchema.TextContent("111")).build(); - }).build()); - McpSyncClient client = McpClient.sync(clientSide) - .build(); - - List tools = client.listTools().tools(); - McpSchema.Tool tool = tools.getFirst(); - System.out.println(tool.toString()); - - McpSchema.CallToolResult callToolResult = client.callTool(McpSchema.CallToolRequest.builder().name(tool.name()).build()); - System.out.println(callToolResult.content().toString()); - - client.close(); - server.close(); - } - private static class TestRunnerClient extends RunnerClient { public TestRunnerClient() {