mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
refactor(runner): apply execution policy wrapping in MCP transport and reload on policy changes
This commit is contained in:
@@ -153,7 +153,7 @@ public class LocalRunnerClient extends RunnerClient implements AutoCloseable {
|
||||
}
|
||||
|
||||
private void registerMcpClient(McpClientRegistry clientRegistry, McpTransportFactory transportFactory, String id, McpTransportConfig transportConfig) {
|
||||
val client = io.modelcontextprotocol.client.McpClient.sync(transportFactory.create(transportConfig, null))
|
||||
val client = io.modelcontextprotocol.client.McpClient.sync(transportFactory.create(transportConfig))
|
||||
.requestTimeout(java.time.Duration.ofSeconds(transportConfig.timeout()))
|
||||
.clientInfo(new io.modelcontextprotocol.spec.McpSchema.Implementation(id, "PARTNER"))
|
||||
.build();
|
||||
|
||||
@@ -5,8 +5,11 @@ import io.modelcontextprotocol.client.McpClient;
|
||||
import io.modelcontextprotocol.client.McpSyncClient;
|
||||
import io.modelcontextprotocol.spec.McpSchema;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||
import work.slhaf.partner.core.action.runner.LocalRunnerClient;
|
||||
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy;
|
||||
import work.slhaf.partner.core.action.runner.policy.RunnerExecutionPolicyListener;
|
||||
import work.slhaf.partner.core.action.runner.support.DirectoryWatchSupport;
|
||||
|
||||
import java.io.File;
|
||||
@@ -22,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
@Slf4j
|
||||
public class McpConfigWatcher implements AutoCloseable {
|
||||
public class McpConfigWatcher implements AutoCloseable, RunnerExecutionPolicyListener {
|
||||
|
||||
private final Path root;
|
||||
private final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions;
|
||||
@@ -43,7 +46,7 @@ public class McpConfigWatcher implements AutoCloseable {
|
||||
this.mcpClientRegistry = mcpClientRegistry;
|
||||
this.mcpTransportFactory = mcpTransportFactory;
|
||||
this.mcpMetaRegistry = mcpMetaRegistry;
|
||||
this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, false, () -> loadInitial())
|
||||
this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, false, this::loadInitial)
|
||||
.onCreate(this::handleCreate)
|
||||
.onModify((thisDir, context) -> checkAndReload(true))
|
||||
.onDelete(this::handleDelete)
|
||||
@@ -108,7 +111,7 @@ public class McpConfigWatcher implements AutoCloseable {
|
||||
}
|
||||
|
||||
private void registerMcpClient(String id, McpTransportConfig transportConfig) {
|
||||
McpSyncClient client = McpClient.sync(mcpTransportFactory.create(transportConfig, null))
|
||||
McpSyncClient client = McpClient.sync(mcpTransportFactory.create(transportConfig))
|
||||
.requestTimeout(Duration.ofSeconds(transportConfig.timeout()))
|
||||
.clientInfo(new McpSchema.Implementation(id, "PARTNER"))
|
||||
.build();
|
||||
@@ -289,6 +292,11 @@ public class McpConfigWatcher implements AutoCloseable {
|
||||
watchSupport.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPolicyChanged(@NotNull ExecutionPolicy policy) {
|
||||
checkAndReload(false);
|
||||
}
|
||||
|
||||
private record McpConfigFileRecord(long lastModified, long length, Map<String, McpTransportConfig> paramsCacheMap) {
|
||||
private McpConfigFileRecord(long lastModified, long length) {
|
||||
this(lastModified, length, new HashMap<>());
|
||||
|
||||
@@ -4,32 +4,35 @@ import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
|
||||
import io.modelcontextprotocol.client.transport.ServerParameters;
|
||||
import io.modelcontextprotocol.client.transport.StdioClientTransport;
|
||||
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
|
||||
import io.modelcontextprotocol.common.McpTransportContext;
|
||||
import io.modelcontextprotocol.json.McpJsonMapper;
|
||||
import io.modelcontextprotocol.spec.McpClientTransport;
|
||||
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry;
|
||||
import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class McpTransportFactory {
|
||||
|
||||
public McpClientTransport create(McpTransportConfig config, ExecutionPolicyRegistry policy) {
|
||||
public McpClientTransport create(McpTransportConfig config) {
|
||||
return switch (config) {
|
||||
case McpTransportConfig.Stdio stdio -> {
|
||||
ServerParameters serverParameters = ServerParameters.builder(stdio.command())
|
||||
.env(stdio.env())
|
||||
.args(stdio.args())
|
||||
List<String> commands = new ArrayList<>();
|
||||
commands.add(stdio.command());
|
||||
commands.addAll(stdio.args());
|
||||
WrappedLaunchSpec wrapped = ExecutionPolicyRegistry.INSTANCE.prepare(commands);
|
||||
Map<String, String> env = new HashMap<>(stdio.env());
|
||||
env.putAll(wrapped.getEnvironment());
|
||||
ServerParameters serverParameters = ServerParameters.builder(wrapped.getCommand())
|
||||
.args(wrapped.getArgs())
|
||||
.env(env)
|
||||
.build();
|
||||
yield new StdioClientTransport(serverParameters, McpJsonMapper.getDefault());
|
||||
}
|
||||
case McpTransportConfig.Http http -> {
|
||||
McpSyncHttpClientRequestCustomizer customizer = new McpSyncHttpClientRequestCustomizer() {
|
||||
@Override
|
||||
public void customize(HttpRequest.Builder builder, String method, URI endpoint, String body, McpTransportContext context) {
|
||||
http.headers().forEach(builder::setHeader);
|
||||
}
|
||||
};
|
||||
McpSyncHttpClientRequestCustomizer customizer = (builder, method, endpoint, body, context) -> http.headers().forEach(builder::setHeader);
|
||||
yield HttpClientSseClientTransport.builder(http.baseUri())
|
||||
.httpRequestCustomizer(customizer)
|
||||
.sseEndpoint(http.endpoint())
|
||||
|
||||
Reference in New Issue
Block a user