diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/ActionSerializer.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/ActionSerializer.java new file mode 100644 index 00000000..03a14129 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/ActionSerializer.java @@ -0,0 +1,111 @@ +package work.slhaf.partner.core.action.runner; + +import com.alibaba.fastjson2.JSONObject; +import lombok.extern.slf4j.Slf4j; +import lombok.val; +import org.jetbrains.annotations.NotNull; +import work.slhaf.partner.core.action.entity.ActionFileMetaData; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.exception.ActionSerializeFailedException; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +@Slf4j +class ActionSerializer { + + private final String tmpActionPath; + private final String dynamicActionPath; + + ActionSerializer(String tmpActionPath, String dynamicActionPath) { + this.tmpActionPath = tmpActionPath; + this.dynamicActionPath = dynamicActionPath; + } + + static String normalizeCodeType(String codeType) { + if (codeType == null || codeType.isBlank()) { + throw new IllegalArgumentException("codeType 不能为空"); + } + return codeType.startsWith(".") ? codeType : "." + codeType; + } + + private static @NotNull Path createActionDir(String baseName, Path baseDir) { + for (int i = 0; ; i++) { + String dirName = i == 0 ? baseName : baseName + "(" + i + ")"; + Path candidate = baseDir.resolve(dirName); + try { + Files.createDirectory(candidate); + return candidate; + } catch (FileAlreadyExistsException ignored) { + } catch (IOException e) { + throw new ActionSerializeFailedException("无法创建行动目录: " + candidate.toAbsolutePath(), e); + } + } + } + + String buildTmpPath(String actionKey, String codeType) { + return Path.of(tmpActionPath, System.currentTimeMillis() + "-" + actionKey + normalizeCodeType(codeType)).toString(); + } + + void tmpSerialize(MetaAction tempAction, String code, String codeType) throws IOException { + log.debug("行动程序临时序列化: {}", tempAction); + Path path = Path.of(tempAction.getLocation()); + validateTmpLocation(path, codeType); + File file = path.toFile(); + file.createNewFile(); + Files.writeString(path, code); + log.debug("临时序列化完毕"); + } + + private void validateTmpLocation(Path path, String codeType) throws IOException { + String normalizedCodeType = normalizeCodeType(codeType); + String fileName = path.getFileName().toString(); + if (!fileName.endsWith(normalizedCodeType)) { + throw new IOException("临时文件路径与 codeType 不匹配: " + path); + } + } + + void persistSerialize(MetaActionInfo metaActionInfo, ActionFileMetaData fileMetaData) { + log.debug("行动程序持久序列化: {}", metaActionInfo); + val baseDir = Path.of(dynamicActionPath); + + if (!Files.isDirectory(baseDir)) { + throw new ActionSerializeFailedException("目录不存在或不可用: " + baseDir.toAbsolutePath()); + } + + val actionDir = createActionDir(fileMetaData.getName(), baseDir); + val runTmp = actionDir.resolve("run." + fileMetaData.getExt() + ".tmp"); + val descTmp = actionDir.resolve("desc.json.tmp"); + val runFinal = actionDir.resolve("run." + fileMetaData.getExt()); + val descFinal = actionDir.resolve("desc.json"); + + try { + Files.writeString(runTmp, fileMetaData.getContent()); + Files.writeString(descTmp, JSONObject.toJSONString(metaActionInfo)); + Files.move(runTmp, runFinal, StandardCopyOption.ATOMIC_MOVE); + Files.move(descTmp, descFinal, StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e) { + safeDelete(runTmp); + safeDelete(descTmp); + safeDelete(runFinal); + safeDelete(descFinal); + safeDelete(actionDir); + throw new ActionSerializeFailedException("行动文件写入失败", e); + } + log.debug("持久序列化结束"); + } + + private void safeDelete(Path path) { + try { + if (Files.exists(path)) { + Files.delete(path); + } + } catch (IOException ignored) { + } + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/CommandExecutionService.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/CommandExecutionService.java new file mode 100644 index 00000000..f920428f --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/CommandExecutionService.java @@ -0,0 +1,20 @@ +package work.slhaf.partner.core.action.runner; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +interface CommandExecutionService { + + String[] buildCommands(String ext, Map params, String absolutePath); + + Result exec(String... command); + + @Data + class Result { + private boolean ok; + private String total; + private List resultList; + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/DirectoryWatchSupport.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/DirectoryWatchSupport.java new file mode 100644 index 00000000..3df04aa9 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/DirectoryWatchSupport.java @@ -0,0 +1,164 @@ +package work.slhaf.partner.core.action.runner; + +import lombok.extern.slf4j.Slf4j; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.*; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.stream.Stream; + +import static java.nio.file.StandardWatchEventKinds.*; + +@Slf4j +class DirectoryWatchSupport implements Closeable { + + private final Context ctx; + private final Map, EventHandler> handlers = new HashMap<>(); + private final ExecutorService executor; + private final boolean watchAll; + private final InitLoader initLoader; + DirectoryWatchSupport(Context ctx, ExecutorService executor, boolean watchAll, InitLoader initLoader) { + this.ctx = ctx; + this.executor = executor; + this.watchAll = watchAll; + this.initLoader = initLoader; + } + + DirectoryWatchSupport onCreate(EventHandler handler) { + ctx.kinds().add(ENTRY_CREATE); + handlers.put(ENTRY_CREATE, handler); + return this; + } + + DirectoryWatchSupport onModify(EventHandler handler) { + ctx.kinds().add(ENTRY_MODIFY); + handlers.put(ENTRY_MODIFY, handler); + return this; + } + + DirectoryWatchSupport onDelete(EventHandler handler) { + ctx.kinds().add(ENTRY_DELETE); + handlers.put(ENTRY_DELETE, handler); + return this; + } + + DirectoryWatchSupport onOverflow(EventHandler handler) { + ctx.kinds().add(OVERFLOW); + handlers.put(OVERFLOW, handler); + return this; + } + + void start() { + registerPath(); + if (initLoader != null) { + initLoader.load(); + } + executor.execute(buildWatchTask()); + } + + Context context() { + return ctx; + } + + boolean isWatching(Path dir) { + return ctx.watchKeys().values().stream().anyMatch(dir::equals); + } + + void registerDirectory(Path dir) throws IOException { + if (!java.nio.file.Files.isDirectory(dir) || isWatching(dir)) { + return; + } + WatchEvent.Kind[] kindsArray = ctx.kinds().toArray(WatchEvent.Kind[]::new); + WatchKey key = dir.register(ctx.watchService(), kindsArray); + ctx.watchKeys().put(key, dir); + } + + private void registerPath() { + try { + registerDirectory(ctx.root()); + if (!watchAll) { + return; + } + try (Stream walk = Files.list(ctx.root()).filter(Files::isDirectory)) { + for (Path dir : walk.toList()) { + registerDirectory(dir); + } + } + } catch (IOException e) { + log.error("监听目录注册失败: ", e); + } + } + + private Runnable buildWatchTask() { + return () -> { + String rootStr = ctx.root().toString(); + log.info("行动程序目录监听器已启动,监听目录: {}", rootStr); + while (true) { + WatchKey key = null; + try { + key = ctx.watchService().take(); + List> events = key.pollEvents(); + for (WatchEvent event : events) { + WatchEvent.Kind kind = event.kind(); + Object context = event.context(); + log.debug("文件目录监听事件: {} - {} - {}", rootStr, kind.name(), context); + Path thisDir = (Path) key.watchable(); + EventHandler handler = handlers.get(kind); + if (handler == null) { + continue; + } + handler.handle(thisDir, context instanceof Path path ? thisDir.resolve(path) : null); + } + } catch (InterruptedException e) { + log.info("监听线程被中断,准备退出..."); + Thread.currentThread().interrupt(); + break; + } catch (ClosedWatchServiceException e) { + log.info("WatchService 已关闭,监听线程退出。"); + break; + } finally { + if (key != null) { + boolean valid = key.reset(); + if (!valid) { + log.info("WatchKey 已失效,停止监听该目录: {}", key.watchable()); + ctx.watchKeys().remove(key); + if (key.watchable().equals(ctx.root())) { + try { + Files.createDirectories(ctx.root()); + registerPath(); + if (initLoader != null) { + initLoader.load(); + } + } catch (IOException e) { + log.error("重建根目录并重新注册监听失败: {}", ctx.root(), e); + } + } + } + } + } + } + }; + } + + @Override + public void close() throws IOException { + ctx.watchService().close(); + ctx.watchKeys().clear(); + } + + interface EventHandler { + void handle(Path thisDir, Path context); + } + + interface InitLoader { + void load(); + } + + record Context(Path root, WatchService watchService, Map watchKeys, Set> kinds) { + Context(Path root) throws IOException { + this(root, FileSystems.getDefault().newWatchService(), new HashMap<>(), new LinkedHashSet<>()); + } + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/DynamicActionMcpManager.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/DynamicActionMcpManager.java new file mode 100644 index 00000000..5bd1aad9 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/DynamicActionMcpManager.java @@ -0,0 +1,349 @@ +package work.slhaf.partner.core.action.runner; + +import cn.hutool.core.io.FileUtil; +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson2.JSONObject; +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.server.McpServer; +import io.modelcontextprotocol.server.McpStatelessAsyncServer; +import io.modelcontextprotocol.server.McpStatelessServerFeatures; +import io.modelcontextprotocol.spec.McpSchema; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import work.slhaf.partner.common.mcp.InProcessMcpTransport; +import work.slhaf.partner.core.action.entity.MetaActionInfo; +import work.slhaf.partner.core.action.exception.ActionInitFailedException; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +class DynamicActionMcpManager implements AutoCloseable { + + private final Path root; + private final ConcurrentHashMap existedMetaActions; + private final CommandExecutionService commandExecutionService; + private final McpStatelessAsyncServer server; + private final InProcessMcpTransport clientTransport; + private final DirectoryWatchSupport watchSupport; + + DynamicActionMcpManager(Path root, + ConcurrentHashMap existedMetaActions, + ExecutorService executor, + CommandExecutionService commandExecutionService) throws IOException { + this.root = root; + this.existedMetaActions = existedMetaActions; + this.commandExecutionService = commandExecutionService; + InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); + this.clientTransport = pair.clientSide(); + McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() + .tools(true) + .build(); + this.server = McpServer.async(pair.serverSide()) + .capabilities(serverCapabilities) + .jsonMapper(McpJsonMapper.getDefault()) + .build(); + this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, true, this::loadExisting) + .onCreate(this::handleCreate) + .onModify(this::handleModify) + .onDelete(this::handleDelete) + .onOverflow((thisDir, context) -> reconcile()); + } + + McpTransportConfig.InProcess clientConfig(int timeout) { + return new McpTransportConfig.InProcess(timeout, clientTransport); + } + + void start() { + watchSupport.start(); + log.info("DynamicActionMcp 文件监听注册完毕"); + } + + private void loadExisting() { + File file = root.toFile(); + if (file.isFile()) { + throw new ActionInitFailedException("未找到目录: " + root); + } + File[] files = file.listFiles(); + if (files == null) { + throw new ActionInitFailedException("未正常读取目录: " + root); + } + for (File dir : files) { + if (!normalPath(dir.toPath())) { + continue; + } + addAction(dir.getName(), dir.toPath()); + } + } + + private boolean isTmp(Path context) { + return context.getFileName().endsWith(".tmp"); + } + + private void handleModify(Path thisDir, Path context) { + if (context == null || isTmp(context) || !normalPath(thisDir)) { + return; + } + modify(thisDir, context); + } + + private void handleCreate(Path thisDir, Path context) { + if (context == null || isTmp(context)) { + return; + } + if (thisDir.equals(root) && Files.isDirectory(context)) { + try { + watchSupport.registerDirectory(context); + } catch (IOException e) { + log.error("监听目录注册失败: {}", context); + } + } + if (normalPath(thisDir)) { + modify(thisDir, context); + } + if (Files.isDirectory(context) && normalPath(context)) { + File[] files = context.toFile().listFiles(); + if (files == null) { + log.warn("目录无法访问: {}", context); + return; + } + for (File file : files) { + modify(context, file.toPath()); + } + } + } + + private void handleDelete(Path thisDir, Path context) { + if (context == null || isTmp(context)) { + return; + } + if (thisDir.equals(root)) { + String name = context.getFileName().toString(); + Path candidate = root.resolve(name); + if (Files.isDirectory(candidate)) { + return; + } + removeAction(name); + AtomicReference toRemove = new AtomicReference<>(); + watchSupport.context().watchKeys().forEach((key, path) -> { + if (path.getFileName().toString().equals(name)) { + key.cancel(); + toRemove.set(key); + } + }); + if (toRemove.get() != null) { + watchSupport.context().watchKeys().remove(toRemove.get()); + } + return; + } + if (!thisDir.equals(root) && !normalPath(thisDir)) { + removeAction(thisDir.getFileName().toString()); + } + } + + private void reconcile() { + Set existed = existedMetaActions.keySet().stream() + .filter(actionKey -> actionKey.startsWith("local::")) + .map(actionKey -> actionKey.split("::")[1]) + .collect(Collectors.toSet()); + Set currentDirs = new HashSet<>(); + try (Stream stream = Files.list(root).filter(Files::isDirectory)) { + stream.forEach(path -> { + String name = path.getFileName().toString(); + currentDirs.add(name); + boolean contains = existed.contains(name); + boolean normal = normalPath(path); + if (contains && !normal) { + removeAction(name); + } + if (!contains) { + boolean alreadyWatching = watchSupport.isWatching(path); + if (!alreadyWatching) { + try { + watchSupport.registerDirectory(path); + } catch (IOException e) { + log.error("监听目录注册失败: {}", path); + } + } + if (normal) { + addAction(name, path); + } + } + }); + } catch (IOException e) { + log.error("目录无法读取: {}", root); + return; + } + for (String existedName : existed) { + if (!currentDirs.contains(existedName)) { + removeAction(existedName); + } + } + } + + private void modify(Path thisDir, Path context) { + String fileName = context.getFileName().toString(); + if (fileName.equals("desc.json")) { + handleMetaModify(thisDir); + } + if (fileName.startsWith("run.")) { + handleProgramModify(thisDir); + } + } + + private void handleProgramModify(Path thisDir) { + String name = thisDir.getFileName().toString(); + String actionKey = "local::" + name; + if (existedMetaActions.containsKey(actionKey)) { + return; + } + if (!addAction(name, thisDir)) { + removeAction(name); + } + } + + private void handleMetaModify(Path thisDir) { + String name = thisDir.getFileName().toString(); + if (!addAction(name, thisDir)) { + removeAction(name); + } + } + + private boolean addAction(String name, Path dir) { + File program = null; + try (Stream stream = Files.list(dir)) { + for (Path path : stream.toList()) { + if (isTmp(path)) { + continue; + } + if (path.getFileName().toString().startsWith("run.")) { + program = path.toFile(); + } + } + } catch (Exception e) { + log.error("添加 action 失败", e); + return false; + } + MetaActionInfo info; + try { + info = JSONUtil.readJSONObject(dir.resolve("desc.json").toFile(), StandardCharsets.UTF_8).toBean(MetaActionInfo.class); + } catch (Exception e) { + log.error("desc.json 加载失败: {}", dir); + return false; + } + String actionKey = "local::" + name; + existedMetaActions.put(actionKey, info); + server.addTool(buildAsyncToolSpecification(info, program, actionKey, name)).subscribe(); + return true; + } + + private void removeAction(String name) { + existedMetaActions.remove("local::" + name); + server.removeTool(name).subscribe(); + } + + private boolean normalPath(Path path) { + File[] files = loadFiles(path); + if (files == null || files.length < 2) { + return false; + } + boolean desc = false; + int run = 0; + for (File file : files) { + String fileName = file.getName(); + if (fileName.endsWith(".tmp")) { + continue; + } + if (fileName.equals("desc.json")) { + desc = true; + } + if (fileName.startsWith("run.")) { + run++; + } + } + return run == 1 && desc; + } + + private File[] loadFiles(Path path) { + if (!Files.isDirectory(path)) { + return null; + } + return path.toFile().listFiles(); + } + + private McpStatelessServerFeatures.AsyncToolSpecification buildAsyncToolSpecification(MetaActionInfo info, File program, String actionKey, String name) { + Map additional = Map.of( + "pre", info.getPreActions(), + "post", info.getPostActions(), + "strict_pre", info.isStrictDependencies(), + "io", info.isIo() + ); + McpSchema.Tool tool = McpSchema.Tool.builder() + .name(name) + .description(info.getDescription()) + .inputSchema(McpJsonMapper.getDefault(), JSONObject.toJSONString(info.getParams())) + .outputSchema(info.getResponseSchema()) + .title(actionKey) + .meta(additional) + .build(); + return McpStatelessServerFeatures.AsyncToolSpecification.builder() + .tool(tool) + .callHandler(buildToolHandler(program)) + .build(); + } + + private BiFunction> buildToolHandler(File program) { + return (mcpTransportContext, callToolRequest) -> { + Map arguments = callToolRequest.arguments(); + if (arguments == null) { + arguments = Map.of(); + } + String ext = FileUtil.getSuffix(program); + String[] commands = commandExecutionService.buildCommands(ext, arguments, program.getAbsolutePath()); + if (commands == null) { + return Mono.just(McpSchema.CallToolResult.builder() + .addTextContent("未知文件类型: " + program.getName()) + .isError(true) + .build()); + } + return Mono.fromCallable(() -> { + CommandExecutionService.Result execResult = commandExecutionService.exec(commands); + McpSchema.CallToolResult.Builder builder = McpSchema.CallToolResult.builder() + .isError(!execResult.isOk()); + List resultList = execResult.getResultList(); + if (resultList != null && !resultList.isEmpty()) { + builder.textContent(resultList); + builder.structuredContent(resultList); + } else { + builder.addTextContent(execResult.getTotal()); + builder.structuredContent(execResult.getTotal()); + } + return builder.build(); + }).subscribeOn(Schedulers.boundedElastic()); + }; + } + + @Override + public void close() { + try { + watchSupport.close(); + } catch (IOException ignored) { + } + server.close(); + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalProcessCommandExecutionService.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalProcessCommandExecutionService.java new file mode 100644 index 00000000..fa0938b5 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalProcessCommandExecutionService.java @@ -0,0 +1,81 @@ +package work.slhaf.partner.core.action.runner; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +class LocalProcessCommandExecutionService implements CommandExecutionService { + + @Override + public String[] buildCommands(String ext, Map params, String absolutePath) { + String command = switch (ext) { + case "py" -> "python"; + case "sh" -> "bash"; + default -> null; + }; + if (command == null) { + return null; + } + int paramSize = params == null ? 0 : params.size(); + String[] commands = new String[paramSize + 2]; + commands[0] = command; + commands[1] = absolutePath; + AtomicInteger paramCount = new AtomicInteger(2); + if (params != null) { + params.forEach((param, value) -> commands[paramCount.getAndIncrement()] = "--" + param + "=" + value); + } + return commands; + } + + @Override + public Result exec(String... command) { + Result result = new Result(); + List output = new ArrayList<>(); + List error = new ArrayList<>(); + + try { + Process process = new ProcessBuilder(command) + .redirectErrorStream(false) + .start(); + + Thread stdoutThread = new Thread(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + output.add(line); + } + } catch (Exception ignored) { + } + }); + + Thread stderrThread = new Thread(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { + String line; + while ((line = reader.readLine()) != null) { + error.add(line); + } + } catch (Exception ignored) { + } + }); + + stdoutThread.start(); + stderrThread.start(); + + int exitCode = process.waitFor(); + stdoutThread.join(); + stderrThread.join(); + + result.setOk(exitCode == 0); + result.setResultList(output.isEmpty() ? error : output); + result.setTotal(String.join("\n", output.isEmpty() ? error : output)); + } catch (Exception e) { + result.setOk(false); + result.setTotal(e.getMessage()); + } + + return result; + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index 97670ee1..c7375c75 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -1,230 +1,130 @@ package work.slhaf.partner.core.action.runner; -import cn.hutool.core.io.FileUtil; -import cn.hutool.json.JSONUtil; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; -import io.modelcontextprotocol.client.McpClient; -import io.modelcontextprotocol.client.McpSyncClient; -import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; -import io.modelcontextprotocol.client.transport.ServerParameters; -import io.modelcontextprotocol.client.transport.StdioClientTransport; -import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; -import io.modelcontextprotocol.common.McpTransportContext; -import io.modelcontextprotocol.json.McpJsonMapper; -import io.modelcontextprotocol.server.McpServer; -import io.modelcontextprotocol.server.McpStatelessAsyncServer; -import io.modelcontextprotocol.server.McpStatelessServerFeatures; -import io.modelcontextprotocol.spec.McpClientTransport; -import io.modelcontextprotocol.spec.McpSchema; -import javassist.NotFoundException; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import lombok.val; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import work.slhaf.partner.common.mcp.InProcessMcpTransport; import work.slhaf.partner.core.action.entity.ActionFileMetaData; import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.exception.ActionInitFailedException; -import work.slhaf.partner.core.action.exception.ActionSerializeFailedException; -import java.io.BufferedReader; -import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URI; -import java.net.http.HttpRequest; -import java.nio.charset.StandardCharsets; -import java.nio.file.*; -import java.time.Duration; -import java.util.*; +import java.nio.file.Path; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.concurrent.atomic.AtomicBoolean; import static work.slhaf.partner.common.util.PathUtil.buildPathStr; @Slf4j -public class LocalRunnerClient extends RunnerClient { +public class LocalRunnerClient extends RunnerClient implements AutoCloseable { - private final String TMP_ACTION_PATH; - private final String DYNAMIC_ACTION_PATH; - private final String MCP_SERVER_PATH; - private final String MCP_DESC_PATH; + static final String MCP_NAME_DESC = "mcp-desc"; + static final String MCP_NAME_DYNAMIC = "mcp-dynamic"; - private final String MCP_NAME_DESC = "mcp-desc"; - private final String MCP_NAME_DYNAMIC = "mcp-dynamic"; + private final String tmpActionPath; + private final String dynamicActionPath; + private final String mcpServerPath; + private final String mcpDescPath; - /** - * 存储包括 DescMcp、DynamicActionMcp、CommonMcp 在内的所有 MCP Server 对应的客户端 - *
- * 自身需要针对 CommonMcp 维护一个存储 McpServers.json 文件的目录 - *
- * 相关目录按照以下格式组织: - *

- * MCP_SERVER_PATH/mcp-server.json - *

- */ - private final Map mcpClients = new HashMap<>(); - /** - * 动态生成的行动程序都将挂载至该 McpServer - *
- * 相关目录按照以下格式进行组织: - *

- * DYNAMIC_ACTION_PATH/action 名称/ - *

- * 每个action子目录下,除了相关的程序文件外,将额外提供一个 .meta.json 文件来提供相关描述文件, - * 该描述文件将携带 McpTools、MetaActionInfo 相关的所有信息, - * 故 McpDescServer 将只负责 Common Mcp Servers 的额外描述文件 - */ - private McpStatelessAsyncServer dynamicActionMcpServer; - /** - * 负责监听常规 MCP Server 的描述文件(描述文件主要用于添加原本 MCP Tools 不携带的信息,如前置依赖、后置依赖、是否 IO 密集等 - *
- * 目录按照以下格式组织: - *

- * MCP_DESC_PATH/server::toolName.desc.json - *

- * 该 MCP Server-Client 的作用为: 与 CommonMcp Clients 配合,补齐第三方 MCP 服务的描述信息 - */ - private McpStatelessAsyncServer mcpDescServer; + private final McpClientRegistry mcpClientRegistry; + private final McpTransportFactory mcpTransportFactory; + private final CommandExecutionService commandExecutionService; + private final ActionSerializer actionSerializer; + private final OriginExecutionService originExecutionService; + private final McpActionExecutor mcpActionExecutor; + private final McpMetaRegistry mcpMetaRegistry; + private final McpDescWatcher mcpDescWatcher; + private final DynamicActionMcpManager dynamicActionMcpManager; + private final McpConfigWatcher mcpConfigWatcher; + private final AtomicBoolean closed = new AtomicBoolean(false); public LocalRunnerClient(ConcurrentHashMap existedMetaActions, ExecutorService executor, @Nullable String baseActionPath) { super(existedMetaActions, executor, baseActionPath); - this.TMP_ACTION_PATH = buildPathStr(ACTION_PATH, "tmp"); - this.DYNAMIC_ACTION_PATH = buildPathStr(ACTION_PATH, "dynamic"); - this.MCP_SERVER_PATH = buildPathStr(ACTION_PATH, "mcp"); - this.MCP_DESC_PATH = buildPathStr(MCP_SERVER_PATH, "desc"); + this.tmpActionPath = buildPathStr(ACTION_PATH, "tmp"); + this.dynamicActionPath = buildPathStr(ACTION_PATH, "dynamic"); + this.mcpServerPath = buildPathStr(ACTION_PATH, "mcp"); + this.mcpDescPath = buildPathStr(mcpServerPath, "desc"); - createPath(TMP_ACTION_PATH); - createPath(DYNAMIC_ACTION_PATH); - createPath(MCP_SERVER_PATH); - createPath(MCP_DESC_PATH); + createPath(tmpActionPath); + createPath(dynamicActionPath); + createPath(mcpServerPath); + createPath(mcpDescPath); + + McpClientRegistry clientRegistry = new McpClientRegistry(); + McpTransportFactory transportFactory = new McpTransportFactory(); + CommandExecutionService commandService = new LocalProcessCommandExecutionService(); + ActionSerializer serializer = new ActionSerializer(tmpActionPath, dynamicActionPath); + OriginExecutionService originService = new OriginExecutionService(commandService); + McpActionExecutor actionExecutor = new McpActionExecutor(clientRegistry); + + McpMetaRegistry metaRegistry = null; + McpDescWatcher descWatcher = null; + DynamicActionMcpManager dynamicManager = null; + McpConfigWatcher configWatcher = null; try { - registerDescMcp(); - registerDynamicActionMcp(); - registerCommonMcp(); - } catch (IOException e) { - throw new ActionInitFailedException("目录监听器启动失败", e); + metaRegistry = new McpMetaRegistry(existedMetaActions); + registerMcpClient(clientRegistry, transportFactory, MCP_NAME_DESC, metaRegistry.clientConfig(MCP_NAME_DESC, 10)); + log.info("DescMcp 注册完毕"); + + descWatcher = new McpDescWatcher(Path.of(mcpDescPath), metaRegistry, executor); + descWatcher.start(); + + dynamicManager = new DynamicActionMcpManager( + Path.of(dynamicActionPath), + existedMetaActions, + executor, + commandService + ); + registerMcpClient(clientRegistry, transportFactory, MCP_NAME_DYNAMIC, dynamicManager.clientConfig(10)); + log.info("DynamicActionMcp 注册完毕"); + dynamicManager.start(); + + configWatcher = new McpConfigWatcher( + Path.of(mcpServerPath), + existedMetaActions, + clientRegistry, + transportFactory, + metaRegistry, + executor + ); + configWatcher.start(); + } catch (Exception e) { + closeQuietly(configWatcher); + closeQuietly(dynamicManager); + closeQuietly(descWatcher); + closeQuietly(metaRegistry); + closeQuietly(clientRegistry); + throw new ActionInitFailedException("LocalRunnerClient 初始化失败", e); } + + this.mcpClientRegistry = clientRegistry; + this.mcpTransportFactory = transportFactory; + this.commandExecutionService = commandService; + this.actionSerializer = serializer; + this.originExecutionService = originService; + this.mcpActionExecutor = actionExecutor; + this.mcpMetaRegistry = metaRegistry; + this.mcpDescWatcher = descWatcher; + this.dynamicActionMcpManager = dynamicManager; + this.mcpConfigWatcher = configWatcher; + setupShutdownHook(); } - private static @NotNull Path createActionDir(String baseName, Path baseDir) { - Path actionDir = null; - - // 原子地“抢占”目录名 - for (int i = 0; ; i++) { - String dirName = (i == 0) ? baseName : baseName + "(" + i + ")"; - Path candidate = baseDir.resolve(dirName); - - try { - Files.createDirectory(candidate); // 原子操作 - actionDir = candidate; - break; - } catch (FileAlreadyExistsException ignored) { - // 继续尝试下一个名字 - } catch (IOException e) { - throw new ActionSerializeFailedException( - "无法创建行动目录: " + candidate.toAbsolutePath(), e - ); - } - } - return actionDir; - } - - private void registerCommonMcp() throws IOException { - val ctx = new WatchContext(Path.of(MCP_SERVER_PATH), FileSystems.getDefault().newWatchService()); - val common = new LocalWatchEventProcessor.Common(existedMetaActions, mcpClients, mcpClients.get(MCP_NAME_DESC), ctx); - new LocalWatchServiceBuild.BuildRegistry(ctx) - .initialLoad(common.buildLoad()) - .registerCreate(common.buildCreate()) - .registerDelete(common.buildDelete()) - .registerModify(common.buildModify()) - .registerOverflow(common.buildOverflow()) - .commit(executor); - log.info("CommonMcp 文件监听注册完毕"); - } - - private void registerDescMcp() throws IOException { - InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); - McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() - .resources(true, true) - .build(); - mcpDescServer = McpServer.async(pair.serverSide()) - .capabilities(serverCapabilities) - .jsonMapper(McpJsonMapper.getDefault()) - .build(); - registerDescMcpWatch(); - log.info("DescMcp 文件监听注册完毕"); - registerMcpClient(MCP_NAME_DESC, pair.clientSide(), 10); - log.info("DescMcp 注册完毕"); - - } - - private void registerDescMcpWatch() throws IOException { - WatchContext ctx = new WatchContext(Path.of(MCP_DESC_PATH), FileSystems.getDefault().newWatchService()); - LocalWatchEventProcessor.Desc desc = new LocalWatchEventProcessor.Desc(existedMetaActions, mcpDescServer, ctx); - new LocalWatchServiceBuild.BuildRegistry(ctx) - .initialLoad(desc.buildLoad()) - .registerCreate(desc.buildCreate()) - .registerDelete(desc.buildDelete()) - .registerModify(desc.buildModify()) - .registerOverflow(desc.buildOverflow()) - .watchAll(true) - .commit(executor); - } - - private void registerDynamicActionMcp() throws IOException { - InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); - McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() - .tools(true) - .build(); - dynamicActionMcpServer = McpServer.async(pair.serverSide()) - .capabilities(serverCapabilities) - .jsonMapper(McpJsonMapper.getDefault()) - .build(); - // Tools 的执行逻辑应当高度一致化,但仍需要独立为不同 Tool - // 初始的加载逻辑通过 initialLoad 加载 - // 后续的动态更新通过对应的 event 事件触发 - registerDynamicActionMcpWatch(); - log.info("DynamicActionMcp 文件监听注册完毕"); - registerMcpClient(MCP_NAME_DYNAMIC, pair.clientSide(), 10); - log.info("DynamicActionMcp 注册完毕"); - } - - private void registerDynamicActionMcpWatch() throws IOException { - // MODIFY、CREATE、DELETE、OVERFLOW 都需要不同的处理方式 - WatchContext ctx = new WatchContext(Path.of(DYNAMIC_ACTION_PATH), FileSystems.getDefault().newWatchService()); - LocalWatchEventProcessor.Dynamic dynamic = new LocalWatchEventProcessor.Dynamic(existedMetaActions, dynamicActionMcpServer, ctx); - new LocalWatchServiceBuild.BuildRegistry(ctx) - .initialLoad(dynamic.buildLoad()) - .registerCreate(dynamic.buildCreate()) - .registerModify(dynamic.buildModify()) - .registerDelete(dynamic.buildDelete()) - .registerOverflow(dynamic.buildOverflow()) - .watchAll(true) - .commit(executor); - } - @Override protected RunnerResponse doRun(MetaAction metaAction) { log.debug("执行行动: {}", metaAction); RunnerResponse response; try { response = switch (metaAction.getType()) { - case MetaAction.Type.MCP -> doRunWithMcp(metaAction); - case MetaAction.Type.ORIGIN -> doRunWithOrigin(metaAction); - case MetaAction.Type.BUILTIN -> doRunWithBuiltin(metaAction); + case MCP -> mcpActionExecutor.run(metaAction); + case ORIGIN -> originExecutionService.run(metaAction); + case BUILTIN -> doRunWithBuiltin(metaAction); }; } catch (Exception e) { response = new RunnerResponse(); @@ -235,130 +135,36 @@ public class LocalRunnerClient extends RunnerClient { return response; } - private RunnerResponse doRunWithOrigin(MetaAction metaAction) { - RunnerResponse response = new RunnerResponse(); - File file = new File(metaAction.getLocation()); - String ext = FileUtil.getSuffix(file); - if (ext == null || ext.isEmpty()) { - response.setOk(false); - response.setData("未知文件类型"); - return response; - } - String[] commands = SystemExecHelper.buildCommands(ext, metaAction.getParams(), file.getAbsolutePath()); - if (commands == null || commands.length == 0) { - response.setOk(false); - response.setData("不支持的文件类型: " + file.getName()); - } - SystemExecHelper.Result execResult = SystemExecHelper.exec(commands); - response.setOk(execResult.isOk()); - response.setData(execResult.getTotal()); - return response; - } - - private RunnerResponse doRunWithMcp(MetaAction metaAction) { - RunnerResponse response = new RunnerResponse(); - McpSyncClient mcpClient = mcpClients.get(metaAction.getLocation()); - McpSchema.CallToolRequest callToolRequest = McpSchema.CallToolRequest.builder() - .name(metaAction.getName()) - .arguments(metaAction.getParams()) - .build(); - McpSchema.CallToolResult callToolResult = mcpClient.callTool(callToolRequest); - val callToolResultError = callToolResult.isError(); - if (callToolResultError == null) { - response.setOk(false); - } else { - response.setOk(!callToolResultError); - } - response.setData(callToolResult.structuredContent().toString()); - return response; - } - @Override public String buildTmpPath(String actionKey, String codeType) { - return Path.of(TMP_ACTION_PATH, System.currentTimeMillis() + "-" + actionKey + codeType).toString(); + return actionSerializer.buildTmpPath(actionKey, codeType); } @Override public void tmpSerialize(MetaAction tempAction, String code, String codeType) throws IOException { - log.debug("行动程序临时序列化: {}", tempAction); - Path path = Path.of(tempAction.getLocation()); - File file = path.toFile(); - file.createNewFile(); - Files.writeString(path, code); - log.debug("临时序列化完毕"); + actionSerializer.tmpSerialize(tempAction, code, codeType); } @Override public void persistSerialize(MetaActionInfo metaActionInfo, ActionFileMetaData fileMetaData) { - log.debug("行动程序持久序列化: {}", metaActionInfo); - val baseDir = Path.of(DYNAMIC_ACTION_PATH); - - if (!Files.isDirectory(baseDir)) { - throw new ActionSerializeFailedException( - "目录不存在或不可用: " + baseDir.toAbsolutePath() - ); - } - - val baseName = fileMetaData.getName(); - val ext = fileMetaData.getExt(); - - val actionDir = createActionDir(baseName, baseDir); - - // 使用临时文件写入内容 - val runTmp = actionDir.resolve("run." + ext + ".tmp"); - val descTmp = actionDir.resolve("desc.json.tmp"); - - val runFinal = actionDir.resolve("run." + ext); - val descFinal = actionDir.resolve("desc.json"); - - try { - Files.writeString(runTmp, fileMetaData.getContent()); - Files.writeString(descTmp, JSONObject.toJSONString(metaActionInfo)); - - // 原子提交 - Files.move(runTmp, runFinal, StandardCopyOption.ATOMIC_MOVE); - Files.move(descTmp, descFinal, StandardCopyOption.ATOMIC_MOVE); - } catch (IOException e) { - // 失败清理 - safeDelete(runTmp); - safeDelete(descTmp); - safeDelete(runFinal); - safeDelete(descFinal); - safeDelete(actionDir); - throw new ActionSerializeFailedException("行动文件写入失败", e); - } - log.debug("持久序列化结束"); + actionSerializer.persistSerialize(metaActionInfo, fileMetaData); } - private void safeDelete(Path path) { - try { - if (Files.exists(path)) { - Files.delete(path); - } - } catch (IOException ignored) { - } - } - - @Override public JSONObject listSysDependencies() { - // 先只列出系统/环境的 Python 依赖 - // TODO 在 AgentConfigLoader 内配置启用的脚本语言及对应的扩展名 - // 这里的逻辑后续需要替换为“根据 AgentConfigLoader 读取到的脚本语言启用情况,遍历并列出当前系统环境依赖” - // 还需要将返回值调整为相应的数据类 - // 后续还需要将不同语言的处理逻辑分散到不同方法内,这里为了验证,先写死在当前方法 JSONObject sysDependencies = new JSONObject(); sysDependencies.put("language", "Python"); JSONArray dependencies = sysDependencies.putArray("dependencies"); - SystemExecHelper.Result pyResult = SystemExecHelper.exec("pip", "list", "--format=freeze"); - System.out.println(pyResult); + CommandExecutionService.Result pyResult = commandExecutionService.exec("pip", "list", "--format=freeze"); if (pyResult.isOk()) { List resultList = pyResult.getResultList(); - for (String result : resultList) { - JSONObject element = dependencies.addObject(); - String[] split = result.split("=="); - element.put("name", split[0]); - element.put("version", split[1]); + if (resultList != null) { + for (String result : resultList) { + JSONObject element = dependencies.addObject(); + String[] split = result.split("=="); + element.put("name", split[0]); + element.put("version", split.length > 1 ? split[1] : ""); + } } } else { JSONObject element = dependencies.addObject(); @@ -367,1206 +173,37 @@ public class LocalRunnerClient extends RunnerClient { return sysDependencies; } - private void registerMcpClient(String id, McpClientTransport clientTransport, int timeout) { - McpSyncClient client = McpClient.sync(clientTransport) - .requestTimeout(Duration.ofSeconds(timeout)) - .clientInfo(new McpSchema.Implementation(id, "PARTNER")) - // 行动程序(现 MCP Tool)的描述文本将直接由resources返回 - // 原因: ToolChange 发送的内容侧重调用,缺少可承担描述文本的字段 - // ResourcesChange 事件传递的 Resource 可以由 Client 读取内容 - // 预计在 Server 侧,收到客户端发送的新的行动程序信息,该信息由客户端处补充后,将其放置在指定位置 - // 并写入描述文件、发起 ResourcesChange 事件 + private void registerMcpClient(McpClientRegistry clientRegistry, McpTransportFactory transportFactory, String id, McpTransportConfig transportConfig) { + val client = io.modelcontextprotocol.client.McpClient.sync(transportFactory.create(transportConfig, null)) + .requestTimeout(java.time.Duration.ofSeconds(transportConfig.timeout())) + .clientInfo(new io.modelcontextprotocol.spec.McpSchema.Implementation(id, "PARTNER")) .build(); - mcpClients.put(id, client); + clientRegistry.register(id, client); } private void setupShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - dynamicActionMcpServer.close(); - mcpDescServer.close(); - this.mcpClients.forEach((id, client) -> { - client.close(); - log.info("[{}] MCP-Client 已关闭", id); - }); - })); + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); } - private interface LocalWatchServiceBuild { - LocalWatchServiceBuild registerCreate(EventHandler handler); - - LocalWatchServiceBuild registerModify(EventHandler handler); - - LocalWatchServiceBuild registerDelete(EventHandler handler); - - LocalWatchServiceBuild registerOverflow(EventHandler handler); - - LocalWatchServiceBuild initialLoad(InitLoader loader); - - LocalWatchServiceBuild watchAll(boolean watchAll); - - void commit(ExecutorService executor); - - interface EventHandler { - void handle(Path thisDir, Path context); - } - - interface InitLoader { - void load(); - } - - class BuildRegistry implements LocalWatchServiceBuild { - - private final Map, EventHandler> handlers = new HashMap<>(); - private final WatchContext ctx; - private InitLoader initLoader; - private boolean watchAll = false; - - private BuildRegistry(WatchContext ctx) { - this.ctx = ctx; - } - - @Override - public LocalWatchServiceBuild registerCreate(EventHandler handler) { - ctx.kinds.add(StandardWatchEventKinds.ENTRY_CREATE); - handlers.put(StandardWatchEventKinds.ENTRY_CREATE, handler); - return this; - } - - @Override - public LocalWatchServiceBuild registerModify(EventHandler handler) { - ctx.kinds.add(StandardWatchEventKinds.ENTRY_MODIFY); - handlers.put(StandardWatchEventKinds.ENTRY_MODIFY, handler); - return this; - } - - @Override - public LocalWatchServiceBuild registerDelete(EventHandler handler) { - ctx.kinds.add(StandardWatchEventKinds.ENTRY_DELETE); - handlers.put(StandardWatchEventKinds.ENTRY_DELETE, handler); - return this; - } - - @Override - public LocalWatchServiceBuild registerOverflow(EventHandler handler) { - ctx.kinds.add(StandardWatchEventKinds.OVERFLOW); - handlers.put(StandardWatchEventKinds.OVERFLOW, handler); - return this; - } - - @Override - public LocalWatchServiceBuild initialLoad(InitLoader loader) { - initLoader = loader; - return this; - } - - @Override - public LocalWatchServiceBuild watchAll(boolean watchAll) { - this.watchAll = watchAll; - return this; - } - - @Override - public void commit(ExecutorService executor) { - registerPath(); - if (initLoader != null) - initLoader.load(); - executor.execute(buildWatchTask()); - } - - private void registerPath() { - Path root = ctx.root; - WatchService watchService = ctx.watchService; - Map watchKeys = ctx.watchKeys; - try { - WatchEvent.Kind[] kindsArray = ctx.kinds.toArray(WatchEvent.Kind[]::new); - WatchKey rootKey = root.register(watchService, kindsArray); - watchKeys.put(rootKey, root); - if (!watchAll) { - return; - } - Stream walk = Files.list(root).filter(Files::isDirectory); - for (Path dir : walk.toList()) { - WatchKey key = dir.register(watchService, kindsArray); - watchKeys.put(key, dir); - log.debug("注册目录监听: {}", dir); - } - walk.close(); - } catch (IOException e) { - log.error("监听目录注册失败: ", e); - } - } - - private Runnable buildWatchTask() { - return () -> { - String rootStr = ctx.root.toString(); - log.info("行动程序目录监听器已启动,监听目录: {}", rootStr); - while (true) { - WatchKey key = null; - try { - key = ctx.watchService.take(); - List> events = key.pollEvents(); - for (WatchEvent e : events) { - WatchEvent.Kind kind = e.kind(); - Object context = e.context(); - log.debug("文件目录监听事件: {} - {} - {}", rootStr, kind.name(), context); - Path thisDir = (Path) key.watchable(); - EventHandler handler = handlers.get(kind); - if (handler == null) { - continue; - } - handler.handle(thisDir, context instanceof Path ? thisDir.resolve((Path) context) : null); - } - } catch (InterruptedException e) { - log.info("监听线程被中断,准备退出..."); - Thread.currentThread().interrupt(); // 恢复中断标志 - break; - } catch (ClosedWatchServiceException e) { - log.info("WatchService 已关闭,监听线程退出。"); - break; - } finally { - if (key != null) { - // reset 返回 false 表示该 key 已失效(目录被删、不可访问等) - boolean valid = key.reset(); - if (!valid) { - log.info("WatchKey 已失效,停止监听该目录: {}", key.watchable()); - ctx.watchKeys.remove(key); - if (key.watchable().equals(ctx.root)) { - try { - Files.createDirectories(ctx.root); - registerPath(); - if (initLoader != null) - initLoader.load(); - } catch (IOException e) { - log.error("重建根目录并重新注册监听失败: {}", ctx.root, e); - } - } - } - } - } - } - }; - } - + @Override + public void close() { + if (!closed.compareAndSet(false, true)) { + return; } + closeQuietly(mcpConfigWatcher); + closeQuietly(dynamicActionMcpManager); + closeQuietly(mcpDescWatcher); + closeQuietly(mcpMetaRegistry); + closeQuietly(mcpClientRegistry); } - private record WatchContext(Path root, WatchService watchService, Map watchKeys, - List> kinds) { - private WatchContext(Path root, WatchService watchService) { - this(root, watchService, new HashMap<>(), new ArrayList<>()); + private void closeQuietly(AutoCloseable closeable) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (Exception ignored) { } } - - private sealed static abstract class LocalWatchEventProcessor permits LocalWatchEventProcessor.Dynamic, LocalWatchEventProcessor.Desc, LocalWatchEventProcessor.Common { - - protected final ConcurrentHashMap existedMetaActions; - protected final WatchContext ctx; - - private LocalWatchEventProcessor(ConcurrentHashMap existedMetaActions, WatchContext ctx) { - this.existedMetaActions = existedMetaActions; - this.ctx = ctx; - } - - protected abstract @NotNull LocalWatchServiceBuild.InitLoader buildLoad(); - - protected abstract @NotNull LocalWatchServiceBuild.EventHandler buildModify(); - - protected abstract @NotNull LocalWatchServiceBuild.EventHandler buildCreate(); - - protected abstract @NotNull LocalWatchServiceBuild.EventHandler buildDelete(); - - protected abstract @NotNull LocalWatchServiceBuild.EventHandler buildOverflow(); - - protected File[] loadFiles(Path root) { - // 在批量删除场景下,在接收到事件时目录等内容可能已被删除,此时不应该报错,而是返回一个‘异常值’ - if (!Files.isDirectory(root)) { - return null; - } - return root.toFile().listFiles(); - } - - @SuppressWarnings("LoggingSimilarMessage") - private static final class Dynamic extends LocalWatchEventProcessor { - - private final McpStatelessAsyncServer dynamicActionMcpServer; - - private Dynamic(ConcurrentHashMap existedMetaActions, McpStatelessAsyncServer dynamicActionMcpServer, WatchContext ctx) { - super(existedMetaActions, ctx); - this.dynamicActionMcpServer = dynamicActionMcpServer; - } - - @SuppressWarnings("BooleanMethodIsAlwaysInverted") - private boolean normalPath(Path path) { - val files = loadFiles(path); - if (files == null || files.length < 2) { - return false; - } - boolean desc = false; - int run = 0; - for (File f : files) { - String fileName = f.getName(); - if (fileName.equals("desc.json")) { - desc = true; - } - if (fileName.startsWith("run.")) { - run++; - } - } - return run == 1 && desc; - } - - private boolean addAction(String name, Path dir) { - File program = null; - try (Stream stream = Files.list(dir)) { - for (Path p : stream.toList()) { - if (p.getFileName().toString().startsWith("run.")) - program = p.toFile(); - } - } catch (Exception e) { - log.error("添加 action 失败", e); - return false; - } - MetaActionInfo info; - try { - info = JSONUtil.readJSONObject(dir.resolve("desc.json").toFile(), StandardCharsets.UTF_8).toBean(MetaActionInfo.class); - } catch (Exception e) { - log.error("desc.json 加载失败: {}", dir); - return false; - } - String actionKey = "local::" + name; - existedMetaActions.put(actionKey, info); - dynamicActionMcpServer.addTool(buildAsyncToolSpecification(info, program, actionKey, name)).subscribe(); - return true; - } - - private void removeAction(String name) { - existedMetaActions.remove("local::" + name); - dynamicActionMcpServer.removeTool(name).subscribe(); - } - - private BiFunction> buildToolHandler(File finalProgram) { - return (mcpTransportContext, callToolRequest) -> { - Map arguments = callToolRequest.arguments(); - if (arguments == null) { - arguments = Map.of(); - } - String ext = FileUtil.getSuffix(finalProgram); - String[] commands = SystemExecHelper.buildCommands(ext, arguments, finalProgram.getAbsolutePath()); - if (commands == null) { - return Mono.just(McpSchema.CallToolResult.builder() - .addTextContent("未知文件类型: " + finalProgram.getName()) - .isError(true) - .build()); - } - - return Mono.fromCallable(() -> { - SystemExecHelper.Result execResult = SystemExecHelper.exec(commands); - McpSchema.CallToolResult.Builder builder = McpSchema.CallToolResult.builder() - .isError(!execResult.isOk()); - List resultList = execResult.getResultList(); - if (resultList != null && !resultList.isEmpty()) { - builder.textContent(resultList); - builder.structuredContent(resultList); - } else { - builder.addTextContent(execResult.getTotal()); - builder.structuredContent(execResult.getTotal()); - } - return builder.build(); - }).subscribeOn(Schedulers.boundedElastic()); - }; - } - - @Override - @NotNull - protected LocalWatchServiceBuild.InitLoader buildLoad() { - // 从该路径列出已存在的目录,每个目录对应不同的行动程序及描述文件,从描述文件加载程序信息 - return () -> { - Path root = ctx.root; - File file = root.toFile(); - if (file.isFile()) { - throw new ActionInitFailedException("未找到目录: " + root); - } - File[] files = file.listFiles(); - if (files == null) { - throw new ActionInitFailedException("未正常读取目录: " + root); - } - for (File dir : files) { - if (!normalPath(dir.toPath())) { - continue; - } - addAction(dir.getName(), dir.toPath()); - } - }; - } - - private McpStatelessServerFeatures.AsyncToolSpecification buildAsyncToolSpecification(MetaActionInfo - info, File program, String actionKey, String name) { - Map additional = Map.of("pre", info.getPreActions(), - "post", info.getPostActions(), - "strict_pre", info.isStrictDependencies(), - "io", info.isIo()); - McpSchema.Tool tool = McpSchema.Tool.builder() - .name(name) - .description(info.getDescription()) - .inputSchema(McpJsonMapper.getDefault(), JSONObject.toJSONString(info.getParams())) - .outputSchema(info.getResponseSchema()) - .title(actionKey) - .meta(additional) - .build(); - return McpStatelessServerFeatures.AsyncToolSpecification.builder() - .tool(tool) - .callHandler(buildToolHandler(program)) - .build(); - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildModify() { - return (thisDir, context) -> { - // 查看当前目录是否为空或者能否正常读取 - if (!normalPath(thisDir)) { - return; - } - // 对应本地程序或者描述文件的修改行为 - modify(thisDir, context); - }; - } - - private void modify(Path thisDir, Path context) { - String fileName = context.getFileName().toString(); - if (fileName.equals("desc.json")) { - handleMetaModify(thisDir); - } - if (fileName.startsWith("run.")) { - handleProgramModify(thisDir); - } - } - - private void handleProgramModify(Path thisDir) { - String name = thisDir.getFileName().toString(); - String actionKey = "local::" + name; - // 检查是否存在当前 program 对应的 Tool - if (existedMetaActions.containsKey(actionKey)) { - return; - } - if (!addAction(name, thisDir)) { - removeAction(name); - } - } - - private void handleMetaModify(Path thisDir) { - // 检查是否除了描述文件外还存在别的可执行文件 - String name = thisDir.getFileName().toString(); - if (!addAction(name, thisDir)) { - removeAction(name); - } - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildCreate() { - return (thisDir, context) -> { - if (thisDir.equals(ctx.root) && Files.isDirectory(context)) { - try { - context.register(ctx.watchService, ctx.kinds.toArray(WatchEvent.Kind[]::new)); - } catch (IOException e) { - log.error("监听目录注册失败: {}", context); - } - } - if (normalPath(thisDir)) { - modify(thisDir, context); - } - if (Files.isDirectory(context) && normalPath(context)) { - File[] files = context.toFile().listFiles(); - if (files == null) { - log.warn("目录无法访问: {}", context); - return; - } - for (File f : files) { - modify(context, f.toPath()); - } - } - }; - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildDelete() { - return (thisDir, context) -> { - // 如果发生在 root,且context为目录,则需要移除对应的 action - if (thisDir.equals(ctx.root)) { - String name = context.getFileName().toString(); - Path candidate = ctx.root.resolve(name); - - // 如果 root 下仍然存在一个同名目录 - if (Files.isDirectory(candidate)) { - // 说明删的是同名文件,不是 action 目录 - return; - } - - removeAction(name); - - AtomicReference toRemove = new AtomicReference<>(); - ctx.watchKeys.forEach((key, path) -> { - if (path.getFileName().toString().equals(name)) { - key.cancel(); - toRemove.set(key); - } - }); - if (toRemove.get() != null) { - ctx.watchKeys.remove(toRemove.get()); - } - return; - } - - // 如果发生在非 root 目录内且 context 不符合 action 目录特征 - // 由于只会监听 root 目录与 action 目录 - // 所以此时则证明当前目录对应的行动已不可靠,需要移除 - if (!thisDir.equals(ctx.root) && !normalPath(thisDir)) { - // 未通过校验则删除对应的 action,并在 DynamicActonMcpServer 中删除对应的工具 - String name = thisDir.getFileName().toString(); - removeAction(name); - } - }; - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildOverflow() { - return (thisDir, context) -> { - // 直接从 existedMetaActions 中拿取现有的 key,上游也从这里发现可用工具 - Set existed = existedMetaActions.keySet().stream().map(actionKey -> actionKey.split("::")[1]).collect(Collectors.toSet()); - Set currentDirs = new HashSet<>(); - // 按照预期 root 目录下有效 path 只包括各个 action 目录 - // 排除非目录 path - try (Stream stream = Files.list(ctx.root).filter(Files::isDirectory)) { - stream.forEach(path -> { - String name = path.getFileName().toString(); - currentDirs.add(name); - - boolean contains = existed.contains(name); - boolean normal = normalPath(path); - - // 如果该目录对应 action 已被记录,且符合 action 目录要求,则无处理 - // 如果已被记录,但不符合,则移除行动 - // 此时必定被监听 - if (contains && !normal) { - removeAction(name); - } - - // 如果 action 没有记录,但符合要求,由于此时必定尚未被监听,则注册监听且添加新 action - // 如果未被记录,且不符合要求,则只注册监听 - if (!contains) { - boolean alreadyWatching = ctx.watchKeys.values().stream() - .anyMatch(p -> p.equals(path)); - if (!alreadyWatching) { - try { - WatchKey watchKey = path.register(ctx.watchService, ctx.kinds.toArray(WatchEvent.Kind[]::new)); - ctx.watchKeys.put(watchKey, path); - } catch (IOException e) { - log.error("监听目录注册失败: {}", path); - } - } - - if (normal) { - addAction(name, path); - } - } - }); - } catch (IOException e) { - log.error("目录无法读取: {}", ctx.root); - return; - } - for (String existedName : existed) { - if (!currentDirs.contains(existedName)) { - removeAction(existedName); - } - } - }; - } - } - - private static final class Desc extends LocalWatchEventProcessor { - - private final McpStatelessAsyncServer mcpDescServer; - private final ConcurrentHashMap descCache = new ConcurrentHashMap<>(); - - private Desc(ConcurrentHashMap existedMetaActions, McpStatelessAsyncServer mcpDescServer, WatchContext ctx) { - super(existedMetaActions, ctx); - this.mcpDescServer = mcpDescServer; - } - - private McpStatelessServerFeatures.AsyncResourceSpecification buildAsyncResourceSpecification(String name, String uri) { - McpSchema.Resource resource = McpSchema.Resource.builder() - .name(name) - .title(name) - .description("Action descriptor for " + name) - .mimeType("application/json") - .uri(uri) - .build(); - BiFunction> readHandler = (context, request) -> { - String requestUri = request.uri(); - String result = descCache.get(requestUri); - if (result == null) { - return Mono.error(new NotFoundException("未找到 Resource: " + requestUri)); - } - return Mono.just(new McpSchema.ReadResourceResult(List.of(new McpSchema.TextResourceContents(requestUri, "application/json", result)))); - }; - return new McpStatelessServerFeatures.AsyncResourceSpecification(resource, readHandler); - } - - private boolean normal(String fileName) { - return fileName.endsWith(".desc.json") && fileName.contains("::"); - } - - private boolean normal(File file) { - String name = file.getName(); - return normal(name); - } - - private boolean normal(Path path) { - return normal(path.toFile()); - } - - private boolean addResource(File file) { - String name = file.getName(); - if (!normal(name)) { - return false; - } - // 读取并解析为 MetaActionInfo,存入 resources - try { - MetaActionInfo info = JSONUtil.readJSONObject(file, StandardCharsets.UTF_8).toBean(MetaActionInfo.class); - String uri = ctx.root.resolve(name).toUri().toString(); - descCache.put(uri, JSONObject.toJSONString(info)); - mcpDescServer.addResource(buildAsyncResourceSpecification(name, uri)).block(); - String actionKey = name.replace(".desc.json", ""); - if (existedMetaActions.containsKey(actionKey)) { - existedMetaActions.put(actionKey, info); - } - } catch (Exception e) { - log.error("desc.json 解析失败: {}", file.getAbsolutePath()); - return false; - } - return true; - } - - private void removeResource(Path path) { - String uri = path.toUri().toString(); - String actionKey = path.getFileName().toString().replace(".desc.json", ""); - - descCache.remove(uri); - mcpDescServer.removeResource(uri).block(); - if (existedMetaActions.containsKey(actionKey)) { - resetMetaActionInfo(existedMetaActions.get(actionKey)); - } - } - - @Override - @NotNull - protected LocalWatchServiceBuild.InitLoader buildLoad() { - return () -> { - // DescMcp 的加载逻辑只负责读取已有的 *.desc.json 并注册为 resources - // 正常来讲 root 直接对应 MCP_DESC_PATH,先检查 root 是否为目录,否则拒绝启动 - val files = loadFiles(ctx.root); - for (File file : files) { - addResource(file); - } - }; - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildModify() { - return (thisDir, context) -> { - // 排除目录事件、名称不符合要求的文件 - String fileName = context.getFileName().toString(); - if (!Files.isRegularFile(context) || !normal(fileName)) { - return; - } - - if (!addResource(context.toFile())) { - removeResource(context); - } - }; - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildCreate() { - return buildModify(); - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildDelete() { - return (thisDir, context) -> { - // 排除被删除文件名称不符合要求的事件 - String fileName = context.getFileName().toString(); - if (!normal(fileName)) { - return; - } - - // DELETE 事件发生后,需要移除对应的 descCache 条目; - // 如果存在对应的 info,也需要将其中的额外信息进行重置,只保留 Tools 自身的信息 - removeResource(context); - }; - } - - private void resetMetaActionInfo(MetaActionInfo info) { - info.setIo(false); - info.getTags().clear(); - info.getPreActions().clear(); - info.getPostActions().clear(); - info.setStrictDependencies(false); - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildOverflow() { - return (thisDir, context) -> { - // 对于 OVERFLOW 事件,需要依据当前目录下的所有 *.desc.json 针对现有内容进行修复 - List files; - try (Stream stream = Files.list(ctx.root)) { - files = stream.filter(Files::isRegularFile).filter(this::normal).map(Path::toFile).toList(); - } catch (IOException e) { - log.error("目录无法访问: {}", ctx.root); - return; - } - Set currentUriStr = new HashSet<>(); - - for (File file : files) { - MetaActionInfo info = null; - try { - info = JSONUtil.readJSONObject(file, StandardCharsets.UTF_8).toBean(MetaActionInfo.class); - } catch (Exception e) { - log.warn("desc.json 读取失败: {}", file.toPath()); - } - - boolean available = info != null; - - // 如果读取成功,则更新 descCache - // 若在 existedMetaActions 中存在,则更新对应 info - String uriStr = file.toURI().toString(); - currentUriStr.add(uriStr); - - if (available) { - // 由于涉及内容均为 map,所以额外判断没有必要,直接进行 add 行为即可 - addResource(file); - } else { - // 如果读取失败,则移除对应 descCache 条目 - // 若在 existedMetaActions 中存在,则重置对应 info - removeResource(file.toPath()); - } - - } - - List serverUris = mcpDescServer.listResources() - .map(McpSchema.Resource::uri) - .collectList() - .block(); - if (serverUris == null) { - log.error("无法获取 DescMcpServer 持有的资源列表"); - return; - } - for (String uri : serverUris) { - if (!currentUriStr.contains(uri)) { - removeResource(Paths.get(URI.create(uri))); - } - } - }; - } - } - - private static final class Common extends LocalWatchEventProcessor { - - private final Map mcpClients; - private final Map mcpConfigFileCache = new HashMap<>(); - private final McpSyncClient descClient; - - private Common(ConcurrentHashMap existedMetaActions, Map mcpClients, McpSyncClient descClient, WatchContext ctx) { - super(existedMetaActions, ctx); - this.mcpClients = mcpClients; - this.descClient = descClient; - } - - @SuppressWarnings("BooleanMethodIsAlwaysInverted") - private boolean normalFile(File file) { - return file.exists() && file.isFile() && file.getName().endsWith(".json"); - } - - /** - * 该部分主要发生在扫描到新的MCP Server描述文件时出现的注册逻辑 - * - * @param id MCP Client 的 id - * @param mcpClientTransportParams MCP Server 的参数 - */ - private void registerMcpClient(String id, McpClientTransportParams mcpClientTransportParams) { - // 如果已存在同名 client,则需要先获取并关闭 - val old = mcpClients.get(id); - val clientTransport = createTransport(mcpClientTransportParams); - val timeout = mcpClientTransportParams.timeout; - val client = McpClient.sync(clientTransport) - .requestTimeout(Duration.ofSeconds(timeout)) - .clientInfo(new McpSchema.Implementation(id, "PARTNER")) - .build(); - - try { - for (McpSchema.Tool tool : client.listTools().tools()) { - val metaActionInfo = buildMetaActionInfo(id, tool); - existedMetaActions.put(id + "::" + tool.name(), metaActionInfo); - } - mcpClients.put(id, client); - - if (old != null) { - old.close(); - } - } catch (Exception e) { - log.warn("[{}] MCP client init failed, skipped (probably non-stdio-safe)", id, e); - client.close(); - } - - } - - /** - * 构建 MetaActionInfo - * - * @param id MCP Client id - * @param tool 待构建 MetaActionInfo 的 MCP Tool - * @return MetaActionInfo 信息 - */ - private @NotNull MetaActionInfo buildMetaActionInfo(String id, McpSchema.Tool tool) { - // 先检查是否存在对应的描述文件 - val toolName = tool.name(); //格式: mcpName::toolName - for (McpSchema.Resource resource : descClient.listResources().resources()) { - val resourceName = resource.name(); - if (resourceName.equals(toolName) && resourceName.split("::")[0].equals(id)) { - val resourceContent = descClient.readResource(resource).contents().getFirst(); - if (resourceContent instanceof McpSchema.TextResourceContents r) { - try { - return JSONObject.parseObject(r.text(), MetaActionInfo.class); - } catch (Exception e) { - log.error("序列化描述文件资源失败", e); - log.error("序列化描述文件资源失败, uri: {}", r.uri()); - log.error("序列化描述文件资源失败, text: {}", r.text()); - } - } - } - } - - MetaActionInfo info = new MetaActionInfo(); - info.setDescription(tool.description()); - Map outputSchema = tool.outputSchema(); - info.setResponseSchema(outputSchema == null ? JSONObject.of() : JSONObject.from(outputSchema)); - info.setParams(tool.inputSchema().properties()); - - val meta = tool.meta(); - if (meta != null) { - JSONObject metaJson = JSONObject.from(meta); - info.setIo(metaJson.getBoolean("io")); - info.setPreActions(metaJson.getList("pre", String.class)); - info.setPostActions(metaJson.getList("post", String.class)); - info.setStrictDependencies(metaJson.getBoolean("strict")); - info.setTags(metaJson.getList("tag", String.class)); - } - return info; - } - - private McpClientTransport createTransport(McpClientTransportParams mcpClientTransportParams) { - return switch (mcpClientTransportParams) { - case McpClientTransportParams.Stdio params -> { - val serverParameters = ServerParameters.builder(params.command).env(params.env).args(params.args).build(); - yield new StdioClientTransport(serverParameters, McpJsonMapper.getDefault()); - } - case McpClientTransportParams.Http params -> { - val customizer = new McpSyncHttpClientRequestCustomizer() { - @Override - public void customize(HttpRequest.Builder builder, String method, URI endpoint, String body, McpTransportContext context) { - params.headers.forEach(builder::setHeader); - } - }; - yield HttpClientSseClientTransport.builder(params.baseUri).httpRequestCustomizer(customizer).sseEndpoint(params.endpoint).build(); - } - }; - } - - @Override - @NotNull - protected LocalWatchServiceBuild.InitLoader buildLoad() { - return () -> { - // For CommonMcp, we need to list all files in MCP_SERVER_PATH, - // and search for files with extend name .json, - // and then reading them as JSONObject to get McpClientTransportParams. - val files = loadFiles(ctx.root); - - for (File file : files) { - if (!normalFile(file)) { - continue; - } - loadAndRegisterMcpClientsFromFile(file); - } - }; - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildModify() { - /* - 发现文件更改事件时,读取该文件中存放的mcp配置,与现有的 MCP 记录对比 - 根据其是否发生配置变动等,针对对应的 client 进行调整 - 如果额外维护一个 文件-clientIds 的映射,可以解决删除某一mcp的情况 - 但如果对于极端场景:从某一文件剪切并粘贴至另一文件,但后者先于前者保存 - 此时就会出现问题,重复client无法被注册 - 建议对于这种‘分布式’的配置存放方式,每个文件变更最好都触发全量加载 - */ - return (thisDir, context) -> checkAndReload(true); - } - - private cn.hutool.json.JSONObject readJson(File file) { - try { - return JSONUtil.readJSONObject(file, StandardCharsets.UTF_8); - } catch (Exception ignored) { - return null; - } - } - - private cn.hutool.json.JSONObject readMcp(cn.hutool.json.JSONObject json, String id) { - try { - return json.getJSONObject(id); - } catch (Exception ignored) { - return null; - } - } - - @SuppressWarnings("unchecked") - private McpClientTransportParams readParams(cn.hutool.json.JSONObject mcp) { - val stdioKeys = Set.of("command", "args", "env"); - val httpKeys = Set.of("uri", "endpoint", "headers"); - val httpKey = Set.of("url"); - val keys = mcp.keySet(); - val timeout = mcp.getInt("timeout", 30); - - if (keys.equals(stdioKeys)) { - val command = mcp.getStr("command"); - val env = mcp.getBean("env", Map.class); - val args = mcp.getBeanList("args", String.class); - if (command == null || env == null || args == null) { - return null; - } - return new McpClientTransportParams.Stdio(timeout, command, env, args); - } - - if (keys.equals(httpKeys)) { - val uri = mcp.getStr("uri"); - val endpoint = mcp.getStr("endpoint"); - val headers = mcp.getBean("headers", Map.class); - if (uri == null || endpoint == null || headers == null) { - return null; - } - return new McpClientTransportParams.Http(timeout, uri, endpoint, headers); - } - - if (keys.equals(httpKey)) { - val url = mcp.getStr("url"); - if (url == null) { - return null; - } - return new McpClientTransportParams.Http(timeout, url, "", Map.of()); - } - - return null; - } - - private void checkAndReload(boolean trustCache) { - /* - for each file cannot present all mcp configurations, - we need to load all at once, and then compare them with existed records. - we will record existing mcp paramsCacheMap and id-params map for which is changed. - - recording changedMap only cannot figure out which mcp was deleted, - so existingMcpIdSet attr is required - */ - val changedMap = new HashMap(); - val existingMcpIdSet = new HashSet(); - - val files = loadFiles(ctx.root); - for (File file : files) { - if (!normalFile(file)) { - continue; - } - - // check if necessary stats changed, null record is seen as file changed - val fileRecord = mcpConfigFileCache.get(file); - boolean fileRecordExists = fileRecord != null; - if (fileRecordExists && !fileChanged(file, fileRecord) && trustCache) { - existingMcpIdSet.addAll(fileRecord.paramsCacheMap().keySet()); - continue; - } - - // if changed, read file and load mcp configurations - val mcpConfigJson = readJson(file); - if (mcpConfigJson == null) { - // uses old records to avoid abnormal deletion - if (fileRecordExists) { - existingMcpIdSet.addAll(fileRecord.paramsCacheMap().keySet()); - } - continue; - } - - val newFileRecord = new McpConfigFileRecord(file.lastModified(), file.length(), new HashMap<>()); - for (String id : mcpConfigJson.keySet()) { - val mcp = readMcp(mcpConfigJson, id); - if (mcp == null) { - continue; - } - - val params = readParams(mcp); - if (params == null) { - continue; - } - - existingMcpIdSet.add(id); - newFileRecord.paramsCacheMap().put(id, params); - - if (fileRecordExists) { - val paramsCache = fileRecord.paramsCacheMap().get(id); - if (paramsCache != null && paramsCache.equals(params)) { - continue; - } - } - - changedMap.put(id, params); - } - mcpConfigFileCache.put(file, newFileRecord); - } - - updateMcpClients(changedMap, existingMcpIdSet); - } - - private void updateMcpClients(HashMap changedMap, HashSet existingMcpIdSet) { - // following attr changedMap, update or insert mcp clients - changedMap.forEach((id, params) -> { - // close outdated clients if exists - val oldClient = mcpClients.get(id); - if (oldClient != null) { - oldClient.close(); - } - // create new clients - registerMcpClient(id, params); - }); - - // following attr existingMcpIdSet, align mcp clients - // new mcp clients and outdated clients has been updated in above logic - // this part focus on removing non-existing mcp - mcpClients.keySet().removeIf(id -> !existingMcpIdSet.contains(id)); - // clear relevant tools' action info - existedMetaActions.keySet().removeIf(id -> !existingMcpIdSet.contains(id.split("::")[0])); - } - - private boolean fileChanged(File file, McpConfigFileRecord fileRecord) { - val lastModified = file.lastModified(); - val length = file.length(); - - return fileRecord.lastModified() != lastModified || fileRecord.length() != length; - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildOverflow() { - return (thisDir, context) -> checkAndReload(false); - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildCreate() { - return (thisDir, context) -> { - val file = context.toFile(); - if (!normalFile(file)) { - return; - } - - loadAndRegisterMcpClientsFromFile(file); - }; - } - - private void loadAndRegisterMcpClientsFromFile(File file) { - val mcpConfigJson = readJson(file); - if (mcpConfigJson == null) { - return; - } - - val newFileRecord = new McpConfigFileRecord(file.lastModified(), file.length()); - for (String id : mcpConfigJson.keySet()) { - val mcp = readMcp(mcpConfigJson, id); - if (mcp == null) { - continue; - } - - val params = readParams(mcp); - if (params == null) { - continue; - } - - registerMcpClient(id, params); - newFileRecord.paramsCacheMap().put(id, params); - } - mcpConfigFileCache.put(file, newFileRecord); - } - - @Override - @NotNull - protected LocalWatchServiceBuild.EventHandler buildDelete() { - return (thisDir, context) -> { - val file = context.toFile(); - if (!file.getName().endsWith(".json")) { - return; - } - - val fileRecord = mcpConfigFileCache.remove(file); - if (fileRecord == null) { - return; - } - - // clear from existedMetaActions and mcpClients - // client id comes from fileRecord.paramsCache - // actionKey from `id::toolName` - val clientIdSet = fileRecord.paramsCacheMap().keySet(); - for (String clientId : clientIdSet) { - val client = mcpClients.remove(clientId); - if (client == null) { - continue; - } - - val tools = client.listTools().tools(); - for (McpSchema.Tool tool : tools) { - val actionKey = clientId + "::" + tool.name(); - existedMetaActions.remove(actionKey); - } - client.close(); - } - }; - } - - private record McpConfigFileRecord(long lastModified, long length, - Map paramsCacheMap) { - public McpConfigFileRecord(long lastModified, long length) { - this(lastModified, length, new HashMap<>()); - } - } - } - } - - private sealed abstract static class McpClientTransportParams permits McpClientTransportParams.Http, McpClientTransportParams.Stdio { - private final int timeout; - - private McpClientTransportParams(int timeout) { - this.timeout = timeout; - } - - private final static class Http extends McpClientTransportParams { - private final String baseUri; - private final String endpoint; - private final Map headers; - - private Http(int timeout, String baseUri, String endpoint, Map header) { - super(timeout); - this.baseUri = baseUri; - this.endpoint = endpoint; - this.headers = header; - } - } - - private final static class Stdio extends McpClientTransportParams { - private final String command; - private final Map env; - private final List args; - - private Stdio(int timeout, String command, Map env, List args) { - super(timeout); - this.command = command; - this.env = env; - this.args = args; - } - } - } - - private static class SystemExecHelper { - - //TODO 后续需在加载时、或者通过配置文件获取可用命令并注册匹配 - private static String[] buildCommands(String ext, Map params, String absolutePath) { - String command = switch (ext) { - case "py" -> "python"; - case "sh" -> "bash"; - default -> null; - }; - if (command == null) { - return null; - } - int paramSize = params == null ? 0 : params.size(); - String[] commands = new String[paramSize + 2]; - commands[0] = command; - commands[1] = absolutePath; - AtomicInteger paramCount = new AtomicInteger(2); - if (params != null) { - params.forEach((param, value) -> commands[paramCount.getAndIncrement()] = "--" + param + "=" + value); - } - return commands; - } - - private static Result exec(String... command) { - Result result = new Result(); - List output = new ArrayList<>(); - List error = new ArrayList<>(); - - try { - Process process = new ProcessBuilder(command) - .redirectErrorStream(false) // 分开读 - .start(); - - Thread stdoutThread = new Thread(() -> { - try (BufferedReader r = new BufferedReader(new InputStreamReader(process.getInputStream()))) { - String line; - while ((line = r.readLine()) != null) { - output.add(line); - } - } catch (Exception ignored) { - } - }); - - Thread stderrThread = new Thread(() -> { - try (BufferedReader r = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { - String line; - while ((line = r.readLine()) != null) { - error.add(line); - } - } catch (Exception ignored) { - } - }); - - stdoutThread.start(); - stderrThread.start(); - - int exitCode = process.waitFor(); - stdoutThread.join(); - stderrThread.join(); - - result.setOk(exitCode == 0); - result.setResultList(output.isEmpty() ? error : output); - result.setTotal(String.join("\n", output.isEmpty() ? error : output)); - - } catch (Exception e) { - result.setOk(false); - result.setTotal(e.getMessage()); - } - - return result; - } - - @Data - private static class Result { - private boolean ok; - private String total; - private List resultList; - } - } - } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpActionExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpActionExecutor.java new file mode 100644 index 00000000..09e4418b --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpActionExecutor.java @@ -0,0 +1,63 @@ +package work.slhaf.partner.core.action.runner; + +import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.spec.McpSchema; +import work.slhaf.partner.core.action.entity.MetaAction; + +import java.util.List; +import java.util.stream.Collectors; + +class McpActionExecutor { + + private final McpClientRegistry mcpClientRegistry; + + McpActionExecutor(McpClientRegistry mcpClientRegistry) { + this.mcpClientRegistry = mcpClientRegistry; + } + + RunnerClient.RunnerResponse run(MetaAction metaAction) { + RunnerClient.RunnerResponse response = new RunnerClient.RunnerResponse(); + McpSyncClient mcpClient = mcpClientRegistry.get(metaAction.getLocation()); + if (mcpClient == null) { + response.setOk(false); + response.setData("MCP client not found: " + metaAction.getLocation()); + return response; + } + McpSchema.CallToolRequest callToolRequest = McpSchema.CallToolRequest.builder() + .name(metaAction.getName()) + .arguments(metaAction.getParams()) + .build(); + McpSchema.CallToolResult callToolResult = mcpClient.callTool(callToolRequest); + Boolean error = callToolResult.isError(); + response.setOk(error == null || !error); + response.setData(extractResponseData(callToolResult)); + return response; + } + + private String extractResponseData(McpSchema.CallToolResult callToolResult) { + Object structuredContent = callToolResult.structuredContent(); + if (structuredContent != null) { + return String.valueOf(structuredContent); + } + + List contents = callToolResult.content(); + if (contents != null && !contents.isEmpty()) { + String contentSummary = contents.stream() + .map(this::renderContent) + .filter(text -> text != null && !text.isBlank()) + .collect(Collectors.joining("\n")); + if (!contentSummary.isBlank()) { + return contentSummary; + } + } + + return callToolResult.toString(); + } + + private String renderContent(McpSchema.Content content) { + if (content instanceof McpSchema.TextContent textContent) { + return textContent.text(); + } + return String.valueOf(content); + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpClientRegistry.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpClientRegistry.java new file mode 100644 index 00000000..6e38a18a --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpClientRegistry.java @@ -0,0 +1,49 @@ +package work.slhaf.partner.core.action.runner; + +import io.modelcontextprotocol.client.McpSyncClient; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +class McpClientRegistry implements AutoCloseable { + + private final ConcurrentHashMap clients = new ConcurrentHashMap<>(); + + McpSyncClient get(String serverName) { + return clients.get(serverName); + } + + void register(String serverName, McpSyncClient client) { + McpSyncClient old = clients.put(serverName, client); + if (old != null && old != client) { + old.close(); + } + } + + McpSyncClient remove(String serverName) { + McpSyncClient client = detach(serverName); + if (client != null) { + client.close(); + } + return client; + } + + McpSyncClient detach(String serverName) { + return clients.remove(serverName); + } + + boolean contains(String serverName) { + return clients.containsKey(serverName); + } + + Set listIds() { + return new HashSet<>(clients.keySet()); + } + + @Override + public void close() { + clients.forEach((id, client) -> client.close()); + clients.clear(); + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpConfigWatcher.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpConfigWatcher.java new file mode 100644 index 00000000..f6b1d32a --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpConfigWatcher.java @@ -0,0 +1,295 @@ +package work.slhaf.partner.core.action.runner; + +import cn.hutool.json.JSONUtil; +import io.modelcontextprotocol.client.McpClient; +import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.spec.McpSchema; +import lombok.extern.slf4j.Slf4j; +import work.slhaf.partner.core.action.entity.MetaActionInfo; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +@Slf4j +class McpConfigWatcher implements AutoCloseable { + + private final Path root; + private final ConcurrentHashMap existedMetaActions; + private final McpClientRegistry mcpClientRegistry; + private final McpTransportFactory mcpTransportFactory; + private final McpMetaRegistry mcpMetaRegistry; + private final DirectoryWatchSupport watchSupport; + private final Map mcpConfigFileCache = new HashMap<>(); + + McpConfigWatcher(Path root, + ConcurrentHashMap existedMetaActions, + McpClientRegistry mcpClientRegistry, + McpTransportFactory mcpTransportFactory, + McpMetaRegistry mcpMetaRegistry, + ExecutorService executor) throws IOException { + this.root = root; + this.existedMetaActions = existedMetaActions; + this.mcpClientRegistry = mcpClientRegistry; + this.mcpTransportFactory = mcpTransportFactory; + this.mcpMetaRegistry = mcpMetaRegistry; + this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, false, () -> loadInitial()) + .onCreate(this::handleCreate) + .onModify((thisDir, context) -> checkAndReload(true)) + .onDelete(this::handleDelete) + .onOverflow((thisDir, context) -> checkAndReload(false)); + } + + void start() { + watchSupport.start(); + log.info("CommonMcp 文件监听注册完毕"); + } + + private void loadInitial() { + File[] files = loadFiles(root); + if (files == null) { + return; + } + for (File file : files) { + if (!normalFile(file)) { + continue; + } + loadAndRegisterMcpClientsFromFile(file); + } + } + + private void handleCreate(Path thisDir, Path context) { + if (context == null) { + return; + } + File file = context.toFile(); + if (!normalFile(file)) { + return; + } + loadAndRegisterMcpClientsFromFile(file); + } + + private void handleDelete(Path thisDir, Path context) { + if (context == null) { + return; + } + File file = context.toFile(); + if (!file.getName().endsWith(".json")) { + return; + } + McpConfigFileRecord fileRecord = mcpConfigFileCache.remove(file); + if (fileRecord == null) { + return; + } + for (String clientId : fileRecord.paramsCacheMap().keySet()) { + McpSyncClient client = mcpClientRegistry.detach(clientId); + if (client == null) { + continue; + } + for (McpSchema.Tool tool : client.listTools().tools()) { + existedMetaActions.remove(clientId + "::" + tool.name()); + } + client.close(); + } + } + + private boolean normalFile(File file) { + return file.exists() && file.isFile() && file.getName().endsWith(".json"); + } + + private void registerMcpClient(String id, McpTransportConfig transportConfig) { + McpSyncClient client = McpClient.sync(mcpTransportFactory.create(transportConfig, null)) + .requestTimeout(Duration.ofSeconds(transportConfig.timeout())) + .clientInfo(new McpSchema.Implementation(id, "PARTNER")) + .build(); + try { + for (McpSchema.Tool tool : client.listTools().tools()) { + existedMetaActions.put(id + "::" + tool.name(), mcpMetaRegistry.buildMetaActionInfo(id, tool)); + } + mcpClientRegistry.register(id, client); + } catch (Exception e) { + log.warn("[{}] MCP client init failed, skipped (probably non-stdio-safe)", id, e); + client.close(); + } + } + + private cn.hutool.json.JSONObject readJson(File file) { + try { + return JSONUtil.readJSONObject(file, StandardCharsets.UTF_8); + } catch (Exception ignored) { + return null; + } + } + + private cn.hutool.json.JSONObject readMcp(cn.hutool.json.JSONObject json, String id) { + try { + return json.getJSONObject(id); + } catch (Exception ignored) { + return null; + } + } + + @SuppressWarnings("unchecked") + private McpTransportConfig readParams(cn.hutool.json.JSONObject mcp) { + Set keys = mcp.keySet(); + int timeout = mcp.getInt("timeout", 30); + + if (matchesKeys(keys, Set.of("command", "args", "env"), Set.of("timeout"))) { + String command = mcp.getStr("command"); + Map env = mcp.getBean("env", Map.class); + java.util.List args = mcp.getBeanList("args", String.class); + if (command == null || env == null || args == null) { + return null; + } + return new McpTransportConfig.Stdio(timeout, command, env, args); + } + if (matchesKeys(keys, Set.of("uri", "endpoint", "headers"), Set.of("timeout"))) { + String uri = mcp.getStr("uri"); + String endpoint = mcp.getStr("endpoint"); + Map headers = mcp.getBean("headers", Map.class); + if (uri == null || endpoint == null || headers == null) { + return null; + } + return new McpTransportConfig.Http(timeout, uri, endpoint, headers); + } + if (matchesKeys(keys, Set.of("url"), Set.of("timeout"))) { + String url = mcp.getStr("url"); + if (url == null) { + return null; + } + return new McpTransportConfig.Http(timeout, url, "", Map.of()); + } + return null; + } + + private boolean matchesKeys(Set actualKeys, Set requiredKeys, Set optionalKeys) { + if (!actualKeys.containsAll(requiredKeys)) { + return false; + } + Set allowedKeys = new HashSet<>(requiredKeys); + allowedKeys.addAll(optionalKeys); + return allowedKeys.containsAll(actualKeys); + } + + private void checkAndReload(boolean trustCache) { + HashMap changedMap = new HashMap<>(); + HashSet existingMcpIdSet = new HashSet<>(); + + File[] files = loadFiles(root); + if (files == null) { + return; + } + for (File file : files) { + if (!normalFile(file)) { + continue; + } + McpConfigFileRecord fileRecord = mcpConfigFileCache.get(file); + boolean fileRecordExists = fileRecord != null; + if (fileRecordExists && !fileChanged(file, fileRecord) && trustCache) { + existingMcpIdSet.addAll(fileRecord.paramsCacheMap().keySet()); + continue; + } + + cn.hutool.json.JSONObject mcpConfigJson = readJson(file); + if (mcpConfigJson == null) { + if (fileRecordExists) { + existingMcpIdSet.addAll(fileRecord.paramsCacheMap().keySet()); + } + continue; + } + + McpConfigFileRecord newFileRecord = new McpConfigFileRecord(file.lastModified(), file.length(), new HashMap<>()); + for (String id : mcpConfigJson.keySet()) { + cn.hutool.json.JSONObject mcp = readMcp(mcpConfigJson, id); + if (mcp == null) { + continue; + } + McpTransportConfig params = readParams(mcp); + if (params == null) { + continue; + } + existingMcpIdSet.add(id); + newFileRecord.paramsCacheMap().put(id, params); + if (fileRecordExists) { + McpTransportConfig paramsCache = fileRecord.paramsCacheMap().get(id); + if (paramsCache != null && paramsCache.equals(params)) { + continue; + } + } + changedMap.put(id, params); + } + mcpConfigFileCache.put(file, newFileRecord); + } + updateMcpClients(changedMap, existingMcpIdSet); + } + + private void updateMcpClients(HashMap changedMap, HashSet existingMcpIdSet) { + changedMap.forEach(this::registerMcpClient); + for (String clientId : mcpClientRegistry.listIds()) { + if (clientId.equals(LocalRunnerClient.MCP_NAME_DESC) || clientId.equals(LocalRunnerClient.MCP_NAME_DYNAMIC)) { + continue; + } + if (!existingMcpIdSet.contains(clientId)) { + mcpClientRegistry.remove(clientId); + } + } + existedMetaActions.keySet().removeIf(actionKey -> { + String serverId = actionKey.split("::")[0]; + return !serverId.equals("local") + && !serverId.equals(LocalRunnerClient.MCP_NAME_DESC) + && !serverId.equals(LocalRunnerClient.MCP_NAME_DYNAMIC) + && !existingMcpIdSet.contains(serverId); + }); + } + + private boolean fileChanged(File file, McpConfigFileRecord fileRecord) { + return fileRecord.lastModified() != file.lastModified() || fileRecord.length() != file.length(); + } + + private void loadAndRegisterMcpClientsFromFile(File file) { + cn.hutool.json.JSONObject mcpConfigJson = readJson(file); + if (mcpConfigJson == null) { + return; + } + McpConfigFileRecord newFileRecord = new McpConfigFileRecord(file.lastModified(), file.length()); + for (String id : mcpConfigJson.keySet()) { + cn.hutool.json.JSONObject mcp = readMcp(mcpConfigJson, id); + if (mcp == null) { + continue; + } + McpTransportConfig params = readParams(mcp); + if (params == null) { + continue; + } + registerMcpClient(id, params); + newFileRecord.paramsCacheMap().put(id, params); + } + mcpConfigFileCache.put(file, newFileRecord); + } + + private File[] loadFiles(Path path) { + if (!path.toFile().isDirectory()) { + return null; + } + return path.toFile().listFiles(); + } + + @Override + public void close() throws Exception { + watchSupport.close(); + } + + private record McpConfigFileRecord(long lastModified, long length, Map paramsCacheMap) { + private McpConfigFileRecord(long lastModified, long length) { + this(lastModified, length, new HashMap<>()); + } + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpDescWatcher.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpDescWatcher.java new file mode 100644 index 00000000..c84e1db1 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpDescWatcher.java @@ -0,0 +1,52 @@ +package work.slhaf.partner.core.action.runner; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.ExecutorService; + +@Slf4j +class McpDescWatcher implements AutoCloseable { + + private final Path root; + private final McpMetaRegistry mcpMetaRegistry; + private final DirectoryWatchSupport watchSupport; + + McpDescWatcher(Path root, McpMetaRegistry mcpMetaRegistry, ExecutorService executor) throws IOException { + this.root = root; + this.mcpMetaRegistry = mcpMetaRegistry; + this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, true, () -> mcpMetaRegistry.loadDirectory(root)) + .onCreate(this::handleUpsert) + .onModify(this::handleUpsert) + .onDelete(this::handleDelete) + .onOverflow((thisDir, context) -> mcpMetaRegistry.reconcile(root)); + } + + void start() { + watchSupport.start(); + log.info("DescMcp 文件监听注册完毕"); + } + + private void handleUpsert(Path thisDir, Path context) { + if (context == null || Files.isDirectory(context) || !mcpMetaRegistry.isValidDescFile(context.getFileName().toString())) { + return; + } + if (!mcpMetaRegistry.addOrUpdate(context)) { + mcpMetaRegistry.remove(context); + } + } + + private void handleDelete(Path thisDir, Path context) { + if (context == null || !mcpMetaRegistry.isValidDescFile(context.getFileName().toString())) { + return; + } + mcpMetaRegistry.remove(context); + } + + @Override + public void close() throws Exception { + watchSupport.close(); + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpMetaRegistry.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpMetaRegistry.java new file mode 100644 index 00000000..21f31b6a --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpMetaRegistry.java @@ -0,0 +1,233 @@ +package work.slhaf.partner.core.action.runner; + +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson2.JSONObject; +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.server.McpServer; +import io.modelcontextprotocol.server.McpStatelessAsyncServer; +import io.modelcontextprotocol.server.McpStatelessServerFeatures; +import io.modelcontextprotocol.spec.McpSchema; +import javassist.NotFoundException; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; +import work.slhaf.partner.common.mcp.InProcessMcpTransport; +import work.slhaf.partner.core.action.entity.MetaActionInfo; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; + +@Slf4j +class McpMetaRegistry implements AutoCloseable { + + private final ConcurrentHashMap existedMetaActions; + private final ConcurrentHashMap descCache = new ConcurrentHashMap<>(); + private final ConcurrentHashMap descInfoCache = new ConcurrentHashMap<>(); + private final ConcurrentHashMap originalInfoCache = new ConcurrentHashMap<>(); + private final McpStatelessAsyncServer descServer; + private final InProcessMcpTransport clientTransport; + + McpMetaRegistry(ConcurrentHashMap existedMetaActions) { + this.existedMetaActions = existedMetaActions; + InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair(); + this.clientTransport = pair.clientSide(); + McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() + .resources(true, true) + .build(); + this.descServer = McpServer.async(pair.serverSide()) + .capabilities(serverCapabilities) + .jsonMapper(McpJsonMapper.getDefault()) + .build(); + } + + McpTransportConfig.InProcess clientConfig(String serverName, int timeout) { + return new McpTransportConfig.InProcess(timeout, clientTransport); + } + + void loadDirectory(Path root) { + File[] files = loadFiles(root); + if (files == null) { + return; + } + for (File file : files) { + addOrUpdate(file); + } + } + + boolean addOrUpdate(Path path) { + return addOrUpdate(path.toFile()); + } + + boolean addOrUpdate(File file) { + String name = file.getName(); + if (!isValidDescFile(name)) { + return false; + } + try { + MetaActionInfo info = JSONUtil.readJSONObject(file, StandardCharsets.UTF_8).toBean(MetaActionInfo.class); + String uri = file.toPath().toUri().toString(); + descCache.put(uri, JSONObject.toJSONString(info)); + String actionKey = name.replace(".desc.json", ""); + descInfoCache.put(actionKey, copyMetaActionInfo(info)); + descServer.addResource(buildAsyncResourceSpecification(name, uri)).block(); + if (existedMetaActions.containsKey(actionKey)) { + existedMetaActions.put(actionKey, mergeWithOriginal(actionKey, info)); + } + return true; + } catch (Exception e) { + log.error("desc.json 解析失败: {}", file.getAbsolutePath()); + return false; + } + } + + void remove(Path path) { + String uri = path.toUri().toString(); + String actionKey = path.getFileName().toString().replace(".desc.json", ""); + descCache.remove(uri); + descInfoCache.remove(actionKey); + descServer.removeResource(uri).block(); + MetaActionInfo originalInfo = originalInfoCache.get(actionKey); + if (originalInfo != null) { + existedMetaActions.put(actionKey, copyMetaActionInfo(originalInfo)); + return; + } + MetaActionInfo info = existedMetaActions.get(actionKey); + if (info != null) { + resetMetaActionInfo(info); + } + } + + void reconcile(Path root) { + File[] files = loadFiles(root); + if (files == null) { + return; + } + Set currentUris = ConcurrentHashMap.newKeySet(); + for (File file : files) { + if (!isValidDescFile(file.getName())) { + continue; + } + currentUris.add(file.toURI().toString()); + if (!addOrUpdate(file)) { + remove(file.toPath()); + } + } + List serverUris = descServer.listResources() + .map(McpSchema.Resource::uri) + .collectList() + .block(); + if (serverUris == null) { + log.error("无法获取 DescMcpServer 持有的资源列表"); + return; + } + for (String uri : serverUris) { + if (!currentUris.contains(uri)) { + remove(Path.of(java.net.URI.create(uri))); + } + } + } + + MetaActionInfo buildMetaActionInfo(String serverId, McpSchema.Tool tool) { + String actionKey = serverId + "::" + tool.name(); + MetaActionInfo baseInfo = buildToolMetaActionInfo(tool); + originalInfoCache.put(actionKey, copyMetaActionInfo(baseInfo)); + MetaActionInfo override = descInfoCache.get(actionKey); + return override == null ? baseInfo : mergeWithOriginal(actionKey, override); + } + + private McpStatelessServerFeatures.AsyncResourceSpecification buildAsyncResourceSpecification(String name, String uri) { + McpSchema.Resource resource = McpSchema.Resource.builder() + .name(name) + .title(name) + .description("Action descriptor for " + name) + .mimeType("application/json") + .uri(uri) + .build(); + BiFunction> readHandler = (context, request) -> { + String result = descCache.get(request.uri()); + if (result == null) { + return Mono.error(new NotFoundException("未找到 Resource: " + request.uri())); + } + return Mono.just(new McpSchema.ReadResourceResult(List.of( + new McpSchema.TextResourceContents(request.uri(), "application/json", result) + ))); + }; + return new McpStatelessServerFeatures.AsyncResourceSpecification(resource, readHandler); + } + + boolean isValidDescFile(String fileName) { + return fileName.endsWith(".desc.json") && fileName.contains("::"); + } + + private File[] loadFiles(Path root) { + if (!Files.isDirectory(root)) { + return null; + } + return root.toFile().listFiles(); + } + + private void resetMetaActionInfo(@NotNull MetaActionInfo info) { + info.setIo(false); + if (info.getTags() != null) { + info.getTags().clear(); + } + if (info.getPreActions() != null) { + info.getPreActions().clear(); + } + if (info.getPostActions() != null) { + info.getPostActions().clear(); + } + info.setStrictDependencies(false); + } + + @Override + public void close() { + descServer.close(); + } + + private MetaActionInfo buildToolMetaActionInfo(McpSchema.Tool tool) { + MetaActionInfo info = new MetaActionInfo(); + info.setDescription(tool.description()); + Map outputSchema = tool.outputSchema(); + info.setResponseSchema(outputSchema == null ? JSONObject.of() : JSONObject.from(outputSchema)); + info.setParams(tool.inputSchema().properties()); + + Map meta = tool.meta(); + if (meta != null) { + JSONObject metaJson = JSONObject.from(meta); + info.setIo(Boolean.TRUE.equals(metaJson.getBoolean("io"))); + info.setPreActions(metaJson.getList("pre", String.class)); + info.setPostActions(metaJson.getList("post", String.class)); + info.setStrictDependencies(Boolean.TRUE.equals(metaJson.getBoolean("strict"))); + info.setTags(metaJson.getList("tag", String.class)); + } + return info; + } + + private MetaActionInfo mergeWithOriginal(String actionKey, MetaActionInfo override) { + MetaActionInfo original = originalInfoCache.get(actionKey); + return override == null ? copyMetaActionInfo(original) : copyMetaActionInfo(override); + } + + private MetaActionInfo copyMetaActionInfo(MetaActionInfo source) { + if (source == null) { + return null; + } + MetaActionInfo copy = new MetaActionInfo(); + copy.setIo(source.isIo()); + copy.setParams(source.getParams() == null ? null : new HashMap<>(source.getParams())); + copy.setDescription(source.getDescription()); + copy.setTags(source.getTags() == null ? new ArrayList<>() : new ArrayList<>(source.getTags())); + copy.setPreActions(source.getPreActions() == null ? new ArrayList<>() : new ArrayList<>(source.getPreActions())); + copy.setPostActions(source.getPostActions() == null ? new ArrayList<>() : new ArrayList<>(source.getPostActions())); + copy.setStrictDependencies(source.isStrictDependencies()); + copy.setResponseSchema(source.getResponseSchema() == null ? JSONObject.of() : JSONObject.from(source.getResponseSchema())); + return copy; + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpTransportConfig.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpTransportConfig.java new file mode 100644 index 00000000..1c8823bc --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpTransportConfig.java @@ -0,0 +1,22 @@ +package work.slhaf.partner.core.action.runner; + +import work.slhaf.partner.common.mcp.InProcessMcpTransport; + +import java.util.List; +import java.util.Map; + +sealed interface McpTransportConfig permits McpTransportConfig.Http, McpTransportConfig.Stdio, McpTransportConfig.InProcess { + + int timeout(); + + record Http(int timeout, String baseUri, String endpoint, + Map headers) implements McpTransportConfig { + } + + record Stdio(int timeout, String command, Map env, + List args) implements McpTransportConfig { + } + + record InProcess(int timeout, InProcessMcpTransport clientTransport) implements McpTransportConfig { + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpTransportFactory.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpTransportFactory.java new file mode 100644 index 00000000..79a9328d --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/McpTransportFactory.java @@ -0,0 +1,40 @@ +package work.slhaf.partner.core.action.runner; + +import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport; +import io.modelcontextprotocol.client.transport.ServerParameters; +import io.modelcontextprotocol.client.transport.StdioClientTransport; +import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.spec.McpClientTransport; + +import java.net.URI; +import java.net.http.HttpRequest; + +class McpTransportFactory { + + McpClientTransport create(McpTransportConfig config, RunnerExecutionPolicy policy) { + return switch (config) { + case McpTransportConfig.Stdio stdio -> { + ServerParameters serverParameters = ServerParameters.builder(stdio.command()) + .env(stdio.env()) + .args(stdio.args()) + .build(); + yield new StdioClientTransport(serverParameters, McpJsonMapper.getDefault()); + } + case McpTransportConfig.Http http -> { + McpSyncHttpClientRequestCustomizer customizer = new McpSyncHttpClientRequestCustomizer() { + @Override + public void customize(HttpRequest.Builder builder, String method, URI endpoint, String body, McpTransportContext context) { + http.headers().forEach(builder::setHeader); + } + }; + yield HttpClientSseClientTransport.builder(http.baseUri()) + .httpRequestCustomizer(customizer) + .sseEndpoint(http.endpoint()) + .build(); + } + case McpTransportConfig.InProcess inProcess -> inProcess.clientTransport(); + }; + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/OriginExecutionService.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/OriginExecutionService.java new file mode 100644 index 00000000..f7920629 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/OriginExecutionService.java @@ -0,0 +1,36 @@ +package work.slhaf.partner.core.action.runner; + +import cn.hutool.core.io.FileUtil; +import work.slhaf.partner.core.action.entity.MetaAction; + +import java.io.File; + +class OriginExecutionService { + + private final CommandExecutionService commandExecutionService; + + OriginExecutionService(CommandExecutionService commandExecutionService) { + this.commandExecutionService = commandExecutionService; + } + + RunnerClient.RunnerResponse run(MetaAction metaAction) { + RunnerClient.RunnerResponse response = new RunnerClient.RunnerResponse(); + File file = new File(metaAction.getLocation()); + String ext = FileUtil.getSuffix(file); + if (ext == null || ext.isEmpty()) { + response.setOk(false); + response.setData("未知文件类型"); + return response; + } + String[] commands = commandExecutionService.buildCommands(ext, metaAction.getParams(), file.getAbsolutePath()); + if (commands == null || commands.length == 0) { + response.setOk(false); + response.setData("不支持的文件类型: " + file.getName()); + return response; + } + CommandExecutionService.Result execResult = commandExecutionService.exec(commands); + response.setOk(execResult.isOk()); + response.setData(execResult.getTotal()); + return response; + } +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/RunnerExecutionPolicy.java b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/RunnerExecutionPolicy.java new file mode 100644 index 00000000..4a481ea8 --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/action/runner/RunnerExecutionPolicy.java @@ -0,0 +1,4 @@ +package work.slhaf.partner.core.action.runner; + +interface RunnerExecutionPolicy { +} diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/RunnerStabilizationTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/RunnerStabilizationTest.java new file mode 100644 index 00000000..ce79338d --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/action/runner/RunnerStabilizationTest.java @@ -0,0 +1,160 @@ +package work.slhaf.partner.core.action.runner; + +import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.spec.McpSchema; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; +import work.slhaf.partner.core.action.entity.MetaAction; +import work.slhaf.partner.core.action.entity.MetaActionInfo; + +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +class RunnerStabilizationTest { + + @Test + void actionSerializerUsesNormalizedCodeType(@TempDir Path tempDir) throws Exception { + ActionSerializer serializer = new ActionSerializer(tempDir.toString(), tempDir.toString()); + String builtPath = serializer.buildTmpPath("demo", "py"); + Assertions.assertTrue(builtPath.endsWith(".py")); + + MetaAction metaAction = new MetaAction("demo", false, MetaAction.Type.ORIGIN, builtPath); + serializer.tmpSerialize(metaAction, "print('ok')", ".py"); + + Assertions.assertTrue(Files.exists(Path.of(builtPath))); + Assertions.assertEquals("print('ok')", Files.readString(Path.of(builtPath))); + Assertions.assertThrows(Exception.class, () -> serializer.tmpSerialize(metaAction, "print('bad')", ".sh")); + } + + @Test + void mcpTransportConfigHasValueEquality() { + McpTransportConfig.Stdio left = new McpTransportConfig.Stdio(30, "npx", Map.of("A", "1"), List.of("-y", "demo")); + McpTransportConfig.Stdio right = new McpTransportConfig.Stdio(30, "npx", Map.of("A", "1"), List.of("-y", "demo")); + + Assertions.assertEquals(left, right); + Assertions.assertEquals(left.hashCode(), right.hashCode()); + } + + @Test + void mcpConfigWatcherReadParamsAcceptsTimeout(@TempDir Path tempDir) throws Exception { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + McpConfigWatcher watcher = new McpConfigWatcher( + tempDir, + existedMetaActions, + new McpClientRegistry(), + new McpTransportFactory(), + new McpMetaRegistry(existedMetaActions), + executor + ); + try { + Method readParams = McpConfigWatcher.class.getDeclaredMethod("readParams", cn.hutool.json.JSONObject.class); + readParams.setAccessible(true); + + cn.hutool.json.JSONObject stdioJson = cn.hutool.json.JSONUtil.parseObj(""" + { + "command": "npx", + "args": ["-y", "demo"], + "env": {}, + "timeout": 45 + } + """); + Object stdioConfig = readParams.invoke(watcher, stdioJson); + + Assertions.assertInstanceOf(McpTransportConfig.Stdio.class, stdioConfig); + Assertions.assertEquals(45, ((McpTransportConfig.Stdio) stdioConfig).timeout()); + } finally { + watcher.close(); + executor.shutdownNow(); + } + } + + @Test + void localRunnerClientCloseIsIdempotent(@TempDir Path tempDir) { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + try { + client.close(); + client.close(); + } finally { + executor.shutdownNow(); + } + } + + @Test + void mcpActionExecutorUsesStructuredContentThenTextContent() { + McpClientRegistry registry = new McpClientRegistry(); + McpSyncClient client = Mockito.mock(McpSyncClient.class); + registry.register("demo", client); + + McpActionExecutor executor = new McpActionExecutor(registry); + MetaAction metaAction = new MetaAction("tool", false, MetaAction.Type.MCP, "demo"); + + Mockito.when(client.callTool(Mockito.any())).thenReturn( + new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("hello")), null, null, Map.of()) + ); + RunnerClient.RunnerResponse textResponse = executor.run(metaAction); + Assertions.assertTrue(textResponse.isOk()); + Assertions.assertEquals("hello", textResponse.getData()); + + Mockito.when(client.callTool(Mockito.any())).thenReturn( + new McpSchema.CallToolResult(List.of(), Boolean.FALSE, Map.of("k", "v"), Map.of()) + ); + RunnerClient.RunnerResponse structuredResponse = executor.run(metaAction); + Assertions.assertTrue(structuredResponse.isOk()); + Assertions.assertEquals("{k=v}", structuredResponse.getData()); + } + + @Test + void mcpMetaRegistryFallsBackToOriginalToolMetaAfterDescRemoval(@TempDir Path tempDir) throws Exception { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + McpMetaRegistry registry = new McpMetaRegistry(existedMetaActions); + try { + McpSchema.Tool tool = McpSchema.Tool.builder() + .name("tool") + .description("tool description") + .inputSchema(McpJsonMapper.getDefault(), "{\"type\":\"object\",\"properties\":{}}") + .outputSchema(Map.of("type", "string")) + .meta(Map.of("io", true, "pre", List.of("pre"), "post", List.of("post"), "strict", true, "tag", List.of("tag"))) + .build(); + + MetaActionInfo baseInfo = registry.buildMetaActionInfo("demo", tool); + existedMetaActions.put("demo::tool", baseInfo); + + Path descFile = tempDir.resolve("demo::tool.desc.json"); + Files.writeString(descFile, """ + { + "io": false, + "params": {}, + "description": "desc override", + "tags": ["desc"], + "preActions": [], + "postActions": [], + "strictDependencies": false, + "responseSchema": {} + } + """); + + Assertions.assertTrue(registry.addOrUpdate(descFile)); + Assertions.assertEquals("desc override", existedMetaActions.get("demo::tool").getDescription()); + + registry.remove(descFile); + MetaActionInfo restoredInfo = existedMetaActions.get("demo::tool"); + Assertions.assertEquals("tool description", restoredInfo.getDescription()); + Assertions.assertTrue(restoredInfo.isIo()); + Assertions.assertEquals(List.of("tag"), restoredInfo.getTags()); + } finally { + registry.close(); + } + } +}