diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index 669d5aa1..5907e274 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -3,12 +3,20 @@ package work.slhaf.partner.core.action.runner; import cn.hutool.core.io.FileUtil; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import io.modelcontextprotocol.client.McpClient; import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; +import io.modelcontextprotocol.client.transport.ServerParameters; +import io.modelcontextprotocol.client.transport.StdioClientTransport; +import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.spec.McpClientTransport; import io.modelcontextprotocol.spec.McpSchema; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.UnknownNullability; import work.slhaf.partner.core.action.entity.McpData; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; @@ -22,6 +30,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -36,10 +45,13 @@ import static work.slhaf.partner.common.Constant.Path.TMP_ACTION_DIR_LOCAL; @Slf4j public class LocalRunnerClient extends RunnerClient { + private final Map mcpClients = new HashMap<>(); + public LocalRunnerClient(ConcurrentHashMap existedMetaActions, ExecutorService executor, @Nullable String actionWatchPath) { super(existedMetaActions, executor); ActionWatchService watchService = new ActionWatchService(actionWatchPath); watchService.launch(); + setupShutdownHook(); } @Override @@ -204,6 +216,113 @@ public class LocalRunnerClient extends RunnerClient { return result; } + /** + * 该部分主要发生在扫描到新的MCP Server描述文件时出现的注册逻辑 + * + * @param id MCP Client 的 id + * @param mcpServerParams MCP Server 的参数 + */ + private void registerMcpClient(String id, McpServerParams mcpServerParams) { + McpClientTransport clientTransport = createTransport(mcpServerParams); + McpSyncClient client = McpClient.sync(clientTransport) + .requestTimeout(Duration.ofSeconds(mcpServerParams.timeout)) + .clientInfo(new McpSchema.Implementation(id, "PARTNER")) + // 行动程序(现 MCP Tool)的描述文本将直接由resources返回 + // 原因: ToolChange 发送的内容侧重调用,缺少可承担描述文本的字段 + // ResourcesChange 事件传递的 Resource 可以由 Client 读取内容 + // 预计在 Server 侧,收到客户端发送的新的行动程序信息,该信息由客户端处补充后,将其放置在指定位置 + // 并写入描述文件、发起 ResourcesChange 事件 + .toolsChangeConsumer(tools -> updateExistedMetaActions(id, tools)) + .build(); + mcpClients.put(id, client); + } + + private void updateExistedMetaActions(String id, @UnknownNullability List tools) { + for (McpSchema.Tool tool : tools) { + MetaActionInfo info = buildMetaActionInfo(tool); + String actionKey = id + "::" + tool.name(); + existedMetaActions.put(actionKey, info); + } + } + + private static @NotNull MetaActionInfo buildMetaActionInfo(McpSchema.Tool tool) { + MetaActionInfo info = new MetaActionInfo(); + info.setDescription(tool.description()); + Map outputSchema = tool.outputSchema(); + info.setResponseSchema(outputSchema == null ? JSONObject.of() : JSONObject.from(outputSchema)); + info.setParams(tool.inputSchema().properties()); + + JSONObject meta = JSONObject.from(tool.meta()); + info.setIo(meta.getBoolean("io")); + info.setPreActions(meta.getList("pre", String.class)); + info.setPostActions(meta.getList("post", String.class)); + info.setStrictDependencies(meta.getBoolean("strict")); + info.setTags(meta.getList("tag", String.class)); + return info; + } + + private McpClientTransport createTransport(McpServerParams mcpServerParams) { + return switch (mcpServerParams) { + case StdioMcpServerParams params -> { + ServerParameters serverParameters = ServerParameters.builder(params.command) + .env(params.env) + .args(params.args) + .build(); + yield new StdioClientTransport(serverParameters, McpJsonMapper.getDefault()); + } + case HttpMcpServerParams params -> { + McpSyncHttpClientRequestCustomizer customizer = (builder, method, endpoint, body, context) -> { + params.headers.forEach(builder::setHeader); + }; + yield HttpClientSseClientTransport.builder(params.baseUri) + .httpRequestCustomizer(customizer) + .sseEndpoint(params.endpoint) + .build(); + } + }; + } + + private void setupShutdownHook() { + this.mcpClients.forEach((id, client) -> { + client.close(); + log.info("[{}] MCP-Client 已关闭", id); + }); + } + + private sealed abstract static class McpServerParams permits HttpMcpServerParams, StdioMcpServerParams { + private final int timeout; + + private McpServerParams(int timeout) { + this.timeout = timeout; + } + } + + private final static class HttpMcpServerParams extends McpServerParams { + private final String baseUri; + private final String endpoint; + private final Map headers; + + private HttpMcpServerParams(int timeout, String baseUri, String endpoint, Map header) { + super(timeout); + this.baseUri = baseUri; + this.endpoint = endpoint; + this.headers = header; + } + } + + private final static class StdioMcpServerParams extends McpServerParams { + private final String command; + private final Map env; + private final List args; + + private StdioMcpServerParams(int timeout, String command, Map env, List args) { + super(timeout); + this.command = command; + this.env = env; + this.args = args; + } + } + @Data private static class SystemExecResult { private boolean ok; @@ -211,6 +330,7 @@ public class LocalRunnerClient extends RunnerClient { private List resultList; } + //TODO 逻辑待更新,用以适配 MCP 服务的及时发现与注册 private class ActionWatchService { private final HashMap registeredPaths = new HashMap<>(); @@ -378,14 +498,13 @@ public class LocalRunnerClient extends RunnerClient { try { MetaActionInfo actionInfo = new MetaActionInfo(); existedMetaActions.put(f.getName(), actionInfo); - log.info("行动程序[{}]已加载", actionInfo.getKey()); +// log.info("行动程序[{}]已加载", actionInfo.getKey()); } catch (ActionLoadFailedException e) { log.warn("行动程序加载失败", e); } } } - } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java index aa66df8f..f410083d 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/RunnerClient.java @@ -1,27 +1,8 @@ package work.slhaf.partner.core.action.runner; import com.alibaba.fastjson2.JSONObject; -import io.modelcontextprotocol.client.McpClient; -import io.modelcontextprotocol.client.McpSyncClient; -import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; -import io.modelcontextprotocol.client.transport.ServerParameters; -import io.modelcontextprotocol.client.transport.StdioClientTransport; -import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; -import io.modelcontextprotocol.common.McpTransportContext; -import io.modelcontextprotocol.json.McpJsonMapper; -import io.modelcontextprotocol.json.TypeRef; -import io.modelcontextprotocol.server.McpServer; -import io.modelcontextprotocol.server.McpStatelessAsyncServer; -import io.modelcontextprotocol.server.McpStatelessServerHandler; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpStatelessServerTransport; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.UnknownNullability; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; import work.slhaf.partner.core.action.entity.McpData; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaAction.Result; @@ -29,15 +10,8 @@ import work.slhaf.partner.core.action.entity.MetaAction.ResultStatus; import work.slhaf.partner.core.action.entity.MetaActionInfo; import java.io.IOException; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; /** * 执行客户端抽象类 @@ -61,8 +35,6 @@ public abstract class RunnerClient { protected final ConcurrentHashMap existedMetaActions; protected final ExecutorService executor; - protected final Map mcpClients = new HashMap<>(); - protected final Map localMcpServers = new HashMap<>(); /** * ActionCore 将注入虚拟线程池 @@ -70,18 +42,6 @@ public abstract class RunnerClient { public RunnerClient(ConcurrentHashMap existedMetaActions, ExecutorService executor) { this.existedMetaActions = existedMetaActions; this.executor = executor; - setupShutdownHook(); - } - - protected void setupShutdownHook() { - this.mcpClients.forEach((id, client) -> { - client.close(); - log.info("[{}] MCP-Client 已关闭", id); - }); - this.localMcpServers.forEach((id, server) -> { - server.close(); - log.info("[{}] MCP-Server 已关闭", id); - }); } /** @@ -98,85 +58,6 @@ public abstract class RunnerClient { result.setStatus(response.isOk() ? ResultStatus.SUCCESS : ResultStatus.FAILED); } - protected void registerMcpClient(String id, McpServerParams mcpServerParams) { - McpClientTransport clientTransport = createTransport(mcpServerParams); - McpSyncClient client = McpClient.sync(clientTransport) - .requestTimeout(Duration.ofSeconds(mcpServerParams.timeout)) - .clientInfo(new McpSchema.Implementation(id, "PARTNER")) - // 行动程序(现 MCP Tool)的描述文本将直接由resources返回 - // 原因: ToolChange 发送的内容侧重调用,缺少可承担描述文本的字段 - // ResourcesChange 事件传递的 Resource 可以由 Client 读取内容 - // 预计在 Server 侧,收到客户端发送的新的行动程序信息,该信息由客户端处补充后,将其放置在指定位置 - // 并写入描述文件、发起 ResourcesChange 事件 - .toolsChangeConsumer(tools -> updateExistedMetaActions(id, tools)) - .build(); - mcpClients.put(id, client); - } - - private void updateExistedMetaActions(String id, @UnknownNullability List tools) { - for (McpSchema.Tool tool : tools) { - MetaActionInfo info = buildMetaActionInfo(tool); - String actionKey = id + "::" + tool.name(); - existedMetaActions.put(actionKey, info); - } - } - - private static @NotNull MetaActionInfo buildMetaActionInfo(McpSchema.Tool tool) { - MetaActionInfo info = new MetaActionInfo(); - info.setDescription(tool.description()); - Map outputSchema = tool.outputSchema(); - info.setResponseSchema(outputSchema == null ? JSONObject.of() : JSONObject.from(outputSchema)); - info.setParams(tool.inputSchema().properties()); - - JSONObject meta = JSONObject.from(tool.meta()); - info.setIo(meta.getBoolean("io")); - info.setPreActions(meta.getList("pre", String.class)); - info.setPostActions(meta.getList("post", String.class)); - info.setStrictDependencies(meta.getBoolean("strict")); - info.setTags(meta.getList("tag", String.class)); - return info; - } - - private McpClientTransport createTransport(McpServerParams mcpServerParams) { - return switch (mcpServerParams) { - case InProcessMcpServerParams params -> { - InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); - createInProcessMcpServer(params.id, pair.serverSide); - yield pair.clientSide; - } - case StdioMcpServerParams params -> { - ServerParameters serverParameters = ServerParameters.builder(params.command) - .env(params.env) - .args(params.args) - .build(); - yield new StdioClientTransport(serverParameters, McpJsonMapper.getDefault()); - } - case HttpMcpServerParams params -> { - McpSyncHttpClientRequestCustomizer customizer = (builder, method, endpoint, body, context) -> { - params.headers.forEach(builder::setHeader); - }; - yield HttpClientSseClientTransport.builder(params.baseUri) - .httpRequestCustomizer(customizer) - .sseEndpoint(params.endpoint) - .build(); - } - }; - } - - private void createInProcessMcpServer(String id, InProcessMcpTransport serverSide) { - McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() - .tools(true) - .resources(true, true) - .build(); - - McpStatelessAsyncServer server = McpServer.async(serverSide) - .capabilities(serverCapabilities) - .serverInfo(id, "PARTNER") - .build(); - - localMcpServers.put(id, server); - } - protected abstract RunnerResponse doRun(MetaAction metaAction); public abstract String buildTmpPath(MetaAction tempAction, String codeType); @@ -196,186 +77,4 @@ public abstract class RunnerClient { private String data; } - protected sealed abstract static class McpServerParams permits HttpMcpServerParams, InProcessMcpServerParams, StdioMcpServerParams { - private final int timeout; - - private McpServerParams(int timeout) { - this.timeout = timeout; - } - } - - protected final static class HttpMcpServerParams extends McpServerParams { - private final String baseUri; - private final String endpoint; - private final Map headers; - - protected HttpMcpServerParams(int timeout, String baseUri, String endpoint, Map header) { - super(timeout); - this.baseUri = baseUri; - this.endpoint = endpoint; - this.headers = header; - } - } - - protected final static class StdioMcpServerParams extends McpServerParams { - private final String command; - private final Map env; - private final List args; - - protected StdioMcpServerParams(int timeout, String command, Map env, List args) { - super(timeout); - this.command = command; - this.env = env; - this.args = args; - } - } - - protected final static class InProcessMcpServerParams extends McpServerParams { - private final String id; - - protected InProcessMcpServerParams(int timeout, String id) { - super(timeout); - this.id = id; - } - } - - public static final class InProcessMcpTransport implements McpClientTransport, McpStatelessServerTransport { - - // 每个 transport 只处理一条 inbound 流 - private final Sinks.Many inbound = - Sinks.many().unicast().onBackpressureBuffer(); - - private final AtomicBoolean clientConnected = new AtomicBoolean(false); - private final AtomicBoolean serverConnected = new AtomicBoolean(false); - - /** - * 对端 - */ - private volatile InProcessMcpTransport peer; - - private volatile McpStatelessServerHandler serverHandler; - - public record Pair(InProcessMcpTransport clientSide, InProcessMcpTransport serverSide) { - } - - public static Pair pair() { - InProcessMcpTransport client = new InProcessMcpTransport(); - InProcessMcpTransport server = new InProcessMcpTransport(); - - client.peer = server; - server.peer = client; - - return new Pair(client, server); - } - - /* ====================================================== - * Internal receive: peer.sendMessage -> this.receive - * ====================================================== */ - private void receive(McpSchema.JSONRPCMessage message) { - if (inbound.tryEmitNext(message).isFailure()) { - throw new RuntimeException("Failed to receive message: " + message); - } - } - - /* ====================================================== - * Client → Server sendMessage - * ====================================================== */ - @Override - public Mono sendMessage(McpSchema.JSONRPCMessage message) { - InProcessMcpTransport p = this.peer; - if (p == null) { - return Mono.error(new IllegalStateException("Transport is not linked")); - } - return Mono.fromRunnable(() -> p.receive(message)); - } - - /* ====================================================== - * Client connect(handler) 处理 server → client 消息 - * ====================================================== */ - @Override - public Mono connect(Function, Mono> handler) { - if (!clientConnected.compareAndSet(false, true)) { - return Mono.error(new IllegalStateException("Client already connected")); - } - - return inbound.asFlux() - .concatMap(msg -> - handler.apply(Mono.just(msg)) - // handler may emit response message → send back to server - .flatMap(resp -> resp != null ? sendMessage(resp) : Mono.empty()) - ) - .doFinally(sig -> clientConnected.set(false)) - .then(); - } - - @Override - public void setExceptionHandler(Consumer handler) { - McpClientTransport.super.setExceptionHandler(handler); - } - - /* ====================================================== - * Server: bind stateless handler = process client → server inbound - * ====================================================== */ - @Override - public void setMcpHandler(McpStatelessServerHandler handler) { - this.serverHandler = handler; - - if (!serverConnected.compareAndSet(false, true)) { - throw new IllegalStateException("Server already connected"); - } - - // 订阅 client → server 消息 - inbound.asFlux() - .concatMap(this::handleServerMessage) - .doFinally(sig -> serverConnected.set(false)) - .subscribe(); - } - - /** - * Server 端处理 JSONRPCMessage - */ - private Mono handleServerMessage(McpSchema.JSONRPCMessage msg) { - // 创建 transport context(简单实现即可) - McpTransportContext ctx = key -> null; - - if (msg instanceof McpSchema.JSONRPCRequest req) { - return serverHandler.handleRequest(ctx, req) - .flatMap(this::sendMessage); - } - - if (msg instanceof McpSchema.JSONRPCNotification noti) { - return serverHandler.handleNotification(ctx, noti); - } - - return Mono.empty(); - } - - /* ====================================================== - * other boilerplate - * ====================================================== */ - - @Override - public void close() { - McpClientTransport.super.close(); - } - - @Override - public Mono closeGracefully() { - inbound.tryEmitComplete(); - clientConnected.set(false); - serverConnected.set(false); - return Mono.empty(); - } - - @Override - public T unmarshalFrom(Object data, TypeRef typeRef) { - return McpJsonMapper.getDefault().convertValue(data, typeRef); - } - - @Override - public List protocolVersions() { - return McpClientTransport.super.protocolVersions(); - } - } - } diff --git a/Partner-Main/src/test/java/experimental/InProcessMcpTransport.java b/Partner-Main/src/test/java/experimental/InProcessMcpTransport.java new file mode 100644 index 00000000..8ca56f69 --- /dev/null +++ b/Partner-Main/src/test/java/experimental/InProcessMcpTransport.java @@ -0,0 +1,190 @@ +package experimental; + +import io.modelcontextprotocol.client.McpClient; +import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.server.McpServer; +import io.modelcontextprotocol.server.McpStatelessServerFeatures; +import io.modelcontextprotocol.server.McpStatelessServerHandler; +import io.modelcontextprotocol.server.McpStatelessSyncServer; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpStatelessServerTransport; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; + +public final class InProcessMcpTransport implements McpClientTransport, McpStatelessServerTransport { + + // 每个 transport 只处理一条 inbound 流 + private final Sinks.Many inbound = + Sinks.many().unicast().onBackpressureBuffer(); + + private final AtomicBoolean clientConnected = new AtomicBoolean(false); + private final AtomicBoolean serverConnected = new AtomicBoolean(false); + + /** + * 对端 + */ + private volatile InProcessMcpTransport peer; + + private volatile McpStatelessServerHandler serverHandler; + + public record Pair(InProcessMcpTransport clientSide, InProcessMcpTransport serverSide) { + } + + public static Pair pair() { + InProcessMcpTransport client = new InProcessMcpTransport(); + InProcessMcpTransport server = new InProcessMcpTransport(); + + client.peer = server; + server.peer = client; + + return new Pair(client, server); + } + + /* ====================================================== + * Internal receive: peer.sendMessage -> this.receive + * ====================================================== */ + private void receive(McpSchema.JSONRPCMessage message) { + if (inbound.tryEmitNext(message).isFailure()) { + throw new RuntimeException("Failed to receive message: " + message); + } + } + + /* ====================================================== + * Client → Server sendMessage + * ====================================================== */ + @Override + public Mono sendMessage(McpSchema.JSONRPCMessage message) { + InProcessMcpTransport p = this.peer; + if (p == null) { + return Mono.error(new IllegalStateException("Transport is not linked")); + } + return Mono.fromRunnable(() -> p.receive(message)); + } + + /* ====================================================== + * Client connect(handler) 处理 server → client 消息 + * ====================================================== */ + @Override + public Mono connect(Function, Mono> handler) { + if (!clientConnected.compareAndSet(false, true)) { + return Mono.error(new IllegalStateException("Client already connected")); + } + + return inbound.asFlux() + .concatMap(msg -> + handler.apply(Mono.just(msg)) + // handler may emit response message → send back to server + .flatMap(resp -> resp != null ? sendMessage(resp) : Mono.empty()) + ) + .doFinally(sig -> clientConnected.set(false)) + .then(); + } + + @Override + public void setExceptionHandler(Consumer handler) { + McpClientTransport.super.setExceptionHandler(handler); + } + + /* ====================================================== + * Server: bind stateless handler = process client → server inbound + * ====================================================== */ + @Override + public void setMcpHandler(McpStatelessServerHandler handler) { + this.serverHandler = handler; + + if (!serverConnected.compareAndSet(false, true)) { + throw new IllegalStateException("Server already connected"); + } + + // 订阅 client → server 消息 + inbound.asFlux() + .concatMap(this::handleServerMessage) + .doFinally(sig -> serverConnected.set(false)) + .subscribe(); + } + + /** + * Server 端处理 JSONRPCMessage + */ + private Mono handleServerMessage(McpSchema.JSONRPCMessage msg) { + // 创建 transport context(简单实现即可) + McpTransportContext ctx = key -> null; + + if (msg instanceof McpSchema.JSONRPCRequest req) { + return serverHandler.handleRequest(ctx, req) + .flatMap(this::sendMessage); + } + + if (msg instanceof McpSchema.JSONRPCNotification noti) { + return serverHandler.handleNotification(ctx, noti); + } + + return Mono.empty(); + } + + /* ====================================================== + * other boilerplate + * ====================================================== */ + + @Override + public void close() { + McpClientTransport.super.close(); + } + + @Override + public Mono closeGracefully() { + inbound.tryEmitComplete(); + clientConnected.set(false); + serverConnected.set(false); + return Mono.empty(); + } + + @Override + public T unmarshalFrom(Object data, TypeRef typeRef) { + return McpJsonMapper.getDefault().convertValue(data, typeRef); + } + + @Override + public List protocolVersions() { + return McpClientTransport.super.protocolVersions(); + } +} + +class InProcessMcpTransportTest { + @Test + void inProcessMcpTransportTest() { + InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); + InProcessMcpTransport clientSide = pair.clientSide(); + InProcessMcpTransport serverSide = pair.serverSide(); + McpStatelessSyncServer server = McpServer.sync(serverSide) + .capabilities(McpSchema.ServerCapabilities.builder().tools(true).build()) + .build(); + server.addTool(McpStatelessServerFeatures.SyncToolSpecification.builder() + .tool(McpSchema.Tool.builder().name("111").build()).callHandler((mcpTransportContext, callToolRequest) -> { + System.out.println(111); + return McpSchema.CallToolResult.builder().addContent(new McpSchema.TextContent("111")).build(); + }).build()); + McpSyncClient client = McpClient.sync(clientSide) + .build(); + + List tools = client.listTools().tools(); + McpSchema.Tool tool = tools.getFirst(); + System.out.println(tool.toString()); + + McpSchema.CallToolResult callToolResult = client.callTool(McpSchema.CallToolRequest.builder().name(tool.name()).build()); + System.out.println(callToolResult.content().toString()); + + client.close(); + server.close(); + } +} diff --git a/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/RunnerClientTest.java b/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/RunnerClientTest.java index 8605d547..040069a4 100644 --- a/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/RunnerClientTest.java +++ b/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/RunnerClientTest.java @@ -1,95 +1,16 @@ package work.slhaf.partner.core.action.runner; import com.alibaba.fastjson2.JSONObject; -import io.modelcontextprotocol.client.McpClient; -import io.modelcontextprotocol.client.McpSyncClient; -import io.modelcontextprotocol.server.McpServer; -import io.modelcontextprotocol.server.McpStatelessServerFeatures; -import io.modelcontextprotocol.server.McpStatelessSyncServer; -import io.modelcontextprotocol.spec.McpSchema; -import org.junit.jupiter.api.Test; import work.slhaf.partner.core.action.entity.McpData; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; import java.io.IOException; -import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; public class RunnerClientTest { - @Test - void httpMcpClientTest() { - TestRunnerClient testClient = new TestRunnerClient(); - RunnerClient.HttpMcpServerParams params = new RunnerClient.HttpMcpServerParams(20, "https://dashscope.aliyuncs.com", "/api/v1/mcps/WebSearch/sse", Map.of("Authorization", "Bearer sk-xxx")); - testClient.registerMcpClient("test", params); - McpSyncClient client = testClient.mcpClients.values().stream().toList().getFirst(); - List tools = client.listTools().tools(); - System.out.println(tools); - McpSchema.CallToolResult query = client.callTool(McpSchema.CallToolRequest.builder().name(tools.getFirst().name()).arguments(Map.of("query", "123")).build()); - for (McpSchema.Content content : query.content()) { - System.out.println("\r\n---\r\n"); - System.out.println(content); - } - } - - @Test - void stdioMcpClientTest() { - TestRunnerClient testClient = new TestRunnerClient(); - RunnerClient.StdioMcpServerParams params = new RunnerClient.StdioMcpServerParams(20, "uvx", Map.of("http_proxy", "http://127.0.0.1:7897", "https_proxy", "http://127.0.0.1:7897"), List.of("mcp-server-fetch")); - testClient.registerMcpClient("test", params); - McpSyncClient client = testClient.mcpClients.values().stream().toList().getFirst(); - List tools = client.listTools().tools(); - System.out.println(tools); - McpSchema.CallToolResult query = client.callTool(McpSchema.CallToolRequest.builder().name(tools.getFirst().name()).arguments(Map.of("url", "https://gitea.slhaf.work")).build()); - System.out.println(query.toString()); - } - - @Test - void schemaTest() { - TestRunnerClient testClient = new TestRunnerClient(); - RunnerClient.StdioMcpServerParams params = new RunnerClient.StdioMcpServerParams(20, "uvx", Map.of("http_proxy", "http://127.0.0.1:7897", "https_proxy", "http://127.0.0.1:7897"), List.of("mcp-server-fetch")); - testClient.registerMcpClient("test", params); - McpSyncClient client = testClient.mcpClients.values().stream().toList().getFirst(); - List tools = client.listTools().tools(); - System.out.println("\r\n ------ \r\n"); - McpSchema.Tool first = tools.getFirst(); - Map paramsSchema = first.inputSchema().properties(); - System.out.println(paramsSchema.toString()); - System.out.println("\r\n ------ \r\n"); - Map outputSchema = first.outputSchema(); - System.out.println(outputSchema); - } - - @Test - void inProcessMcpTransportTest() { - RunnerClient.InProcessMcpTransport.Pair pair = RunnerClient.InProcessMcpTransport.pair(); - RunnerClient.InProcessMcpTransport clientSide = pair.clientSide(); - RunnerClient.InProcessMcpTransport serverSide = pair.serverSide(); - McpStatelessSyncServer server = McpServer.sync(serverSide) - .capabilities(McpSchema.ServerCapabilities.builder().tools(true).build()) - .build(); - server.addTool(McpStatelessServerFeatures.SyncToolSpecification.builder() - .tool(McpSchema.Tool.builder().name("111").build()).callHandler((mcpTransportContext, callToolRequest) -> { - System.out.println(111); - return McpSchema.CallToolResult.builder().addContent(new McpSchema.TextContent("111")).build(); - }).build()); - McpSyncClient client = McpClient.sync(clientSide) - .build(); - - List tools = client.listTools().tools(); - McpSchema.Tool tool = tools.getFirst(); - System.out.println(tool.toString()); - - McpSchema.CallToolResult callToolResult = client.callTool(McpSchema.CallToolRequest.builder().name(tool.name()).build()); - System.out.println(callToolResult.content().toString()); - - client.close(); - server.close(); - } - private static class TestRunnerClient extends RunnerClient { public TestRunnerClient() {