mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
refactor(runner): separate logic of different domain in LocalRunnerClient into different class
This commit is contained in:
@@ -0,0 +1,111 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import lombok.val;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import work.slhaf.partner.core.action.entity.ActionFileMetaData;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaAction;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||||
|
import work.slhaf.partner.core.action.exception.ActionSerializeFailedException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.FileAlreadyExistsException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardCopyOption;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
class ActionSerializer {
|
||||||
|
|
||||||
|
private final String tmpActionPath;
|
||||||
|
private final String dynamicActionPath;
|
||||||
|
|
||||||
|
ActionSerializer(String tmpActionPath, String dynamicActionPath) {
|
||||||
|
this.tmpActionPath = tmpActionPath;
|
||||||
|
this.dynamicActionPath = dynamicActionPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
static String normalizeCodeType(String codeType) {
|
||||||
|
if (codeType == null || codeType.isBlank()) {
|
||||||
|
throw new IllegalArgumentException("codeType 不能为空");
|
||||||
|
}
|
||||||
|
return codeType.startsWith(".") ? codeType : "." + codeType;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static @NotNull Path createActionDir(String baseName, Path baseDir) {
|
||||||
|
for (int i = 0; ; i++) {
|
||||||
|
String dirName = i == 0 ? baseName : baseName + "(" + i + ")";
|
||||||
|
Path candidate = baseDir.resolve(dirName);
|
||||||
|
try {
|
||||||
|
Files.createDirectory(candidate);
|
||||||
|
return candidate;
|
||||||
|
} catch (FileAlreadyExistsException ignored) {
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ActionSerializeFailedException("无法创建行动目录: " + candidate.toAbsolutePath(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String buildTmpPath(String actionKey, String codeType) {
|
||||||
|
return Path.of(tmpActionPath, System.currentTimeMillis() + "-" + actionKey + normalizeCodeType(codeType)).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
void tmpSerialize(MetaAction tempAction, String code, String codeType) throws IOException {
|
||||||
|
log.debug("行动程序临时序列化: {}", tempAction);
|
||||||
|
Path path = Path.of(tempAction.getLocation());
|
||||||
|
validateTmpLocation(path, codeType);
|
||||||
|
File file = path.toFile();
|
||||||
|
file.createNewFile();
|
||||||
|
Files.writeString(path, code);
|
||||||
|
log.debug("临时序列化完毕");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateTmpLocation(Path path, String codeType) throws IOException {
|
||||||
|
String normalizedCodeType = normalizeCodeType(codeType);
|
||||||
|
String fileName = path.getFileName().toString();
|
||||||
|
if (!fileName.endsWith(normalizedCodeType)) {
|
||||||
|
throw new IOException("临时文件路径与 codeType 不匹配: " + path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void persistSerialize(MetaActionInfo metaActionInfo, ActionFileMetaData fileMetaData) {
|
||||||
|
log.debug("行动程序持久序列化: {}", metaActionInfo);
|
||||||
|
val baseDir = Path.of(dynamicActionPath);
|
||||||
|
|
||||||
|
if (!Files.isDirectory(baseDir)) {
|
||||||
|
throw new ActionSerializeFailedException("目录不存在或不可用: " + baseDir.toAbsolutePath());
|
||||||
|
}
|
||||||
|
|
||||||
|
val actionDir = createActionDir(fileMetaData.getName(), baseDir);
|
||||||
|
val runTmp = actionDir.resolve("run." + fileMetaData.getExt() + ".tmp");
|
||||||
|
val descTmp = actionDir.resolve("desc.json.tmp");
|
||||||
|
val runFinal = actionDir.resolve("run." + fileMetaData.getExt());
|
||||||
|
val descFinal = actionDir.resolve("desc.json");
|
||||||
|
|
||||||
|
try {
|
||||||
|
Files.writeString(runTmp, fileMetaData.getContent());
|
||||||
|
Files.writeString(descTmp, JSONObject.toJSONString(metaActionInfo));
|
||||||
|
Files.move(runTmp, runFinal, StandardCopyOption.ATOMIC_MOVE);
|
||||||
|
Files.move(descTmp, descFinal, StandardCopyOption.ATOMIC_MOVE);
|
||||||
|
} catch (IOException e) {
|
||||||
|
safeDelete(runTmp);
|
||||||
|
safeDelete(descTmp);
|
||||||
|
safeDelete(runFinal);
|
||||||
|
safeDelete(descFinal);
|
||||||
|
safeDelete(actionDir);
|
||||||
|
throw new ActionSerializeFailedException("行动文件写入失败", e);
|
||||||
|
}
|
||||||
|
log.debug("持久序列化结束");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void safeDelete(Path path) {
|
||||||
|
try {
|
||||||
|
if (Files.exists(path)) {
|
||||||
|
Files.delete(path);
|
||||||
|
}
|
||||||
|
} catch (IOException ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
interface CommandExecutionService {
|
||||||
|
|
||||||
|
String[] buildCommands(String ext, Map<String, Object> params, String absolutePath);
|
||||||
|
|
||||||
|
Result exec(String... command);
|
||||||
|
|
||||||
|
@Data
|
||||||
|
class Result {
|
||||||
|
private boolean ok;
|
||||||
|
private String total;
|
||||||
|
private List<String> resultList;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,164 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.*;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static java.nio.file.StandardWatchEventKinds.*;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
class DirectoryWatchSupport implements Closeable {
|
||||||
|
|
||||||
|
private final Context ctx;
|
||||||
|
private final Map<WatchEvent.Kind<?>, EventHandler> handlers = new HashMap<>();
|
||||||
|
private final ExecutorService executor;
|
||||||
|
private final boolean watchAll;
|
||||||
|
private final InitLoader initLoader;
|
||||||
|
DirectoryWatchSupport(Context ctx, ExecutorService executor, boolean watchAll, InitLoader initLoader) {
|
||||||
|
this.ctx = ctx;
|
||||||
|
this.executor = executor;
|
||||||
|
this.watchAll = watchAll;
|
||||||
|
this.initLoader = initLoader;
|
||||||
|
}
|
||||||
|
|
||||||
|
DirectoryWatchSupport onCreate(EventHandler handler) {
|
||||||
|
ctx.kinds().add(ENTRY_CREATE);
|
||||||
|
handlers.put(ENTRY_CREATE, handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
DirectoryWatchSupport onModify(EventHandler handler) {
|
||||||
|
ctx.kinds().add(ENTRY_MODIFY);
|
||||||
|
handlers.put(ENTRY_MODIFY, handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
DirectoryWatchSupport onDelete(EventHandler handler) {
|
||||||
|
ctx.kinds().add(ENTRY_DELETE);
|
||||||
|
handlers.put(ENTRY_DELETE, handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
DirectoryWatchSupport onOverflow(EventHandler handler) {
|
||||||
|
ctx.kinds().add(OVERFLOW);
|
||||||
|
handlers.put(OVERFLOW, handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
void start() {
|
||||||
|
registerPath();
|
||||||
|
if (initLoader != null) {
|
||||||
|
initLoader.load();
|
||||||
|
}
|
||||||
|
executor.execute(buildWatchTask());
|
||||||
|
}
|
||||||
|
|
||||||
|
Context context() {
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isWatching(Path dir) {
|
||||||
|
return ctx.watchKeys().values().stream().anyMatch(dir::equals);
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerDirectory(Path dir) throws IOException {
|
||||||
|
if (!java.nio.file.Files.isDirectory(dir) || isWatching(dir)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
WatchEvent.Kind<?>[] kindsArray = ctx.kinds().toArray(WatchEvent.Kind[]::new);
|
||||||
|
WatchKey key = dir.register(ctx.watchService(), kindsArray);
|
||||||
|
ctx.watchKeys().put(key, dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerPath() {
|
||||||
|
try {
|
||||||
|
registerDirectory(ctx.root());
|
||||||
|
if (!watchAll) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try (Stream<Path> walk = Files.list(ctx.root()).filter(Files::isDirectory)) {
|
||||||
|
for (Path dir : walk.toList()) {
|
||||||
|
registerDirectory(dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("监听目录注册失败: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Runnable buildWatchTask() {
|
||||||
|
return () -> {
|
||||||
|
String rootStr = ctx.root().toString();
|
||||||
|
log.info("行动程序目录监听器已启动,监听目录: {}", rootStr);
|
||||||
|
while (true) {
|
||||||
|
WatchKey key = null;
|
||||||
|
try {
|
||||||
|
key = ctx.watchService().take();
|
||||||
|
List<WatchEvent<?>> events = key.pollEvents();
|
||||||
|
for (WatchEvent<?> event : events) {
|
||||||
|
WatchEvent.Kind<?> kind = event.kind();
|
||||||
|
Object context = event.context();
|
||||||
|
log.debug("文件目录监听事件: {} - {} - {}", rootStr, kind.name(), context);
|
||||||
|
Path thisDir = (Path) key.watchable();
|
||||||
|
EventHandler handler = handlers.get(kind);
|
||||||
|
if (handler == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
handler.handle(thisDir, context instanceof Path path ? thisDir.resolve(path) : null);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.info("监听线程被中断,准备退出...");
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
} catch (ClosedWatchServiceException e) {
|
||||||
|
log.info("WatchService 已关闭,监听线程退出。");
|
||||||
|
break;
|
||||||
|
} finally {
|
||||||
|
if (key != null) {
|
||||||
|
boolean valid = key.reset();
|
||||||
|
if (!valid) {
|
||||||
|
log.info("WatchKey 已失效,停止监听该目录: {}", key.watchable());
|
||||||
|
ctx.watchKeys().remove(key);
|
||||||
|
if (key.watchable().equals(ctx.root())) {
|
||||||
|
try {
|
||||||
|
Files.createDirectories(ctx.root());
|
||||||
|
registerPath();
|
||||||
|
if (initLoader != null) {
|
||||||
|
initLoader.load();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("重建根目录并重新注册监听失败: {}", ctx.root(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
ctx.watchService().close();
|
||||||
|
ctx.watchKeys().clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
interface EventHandler {
|
||||||
|
void handle(Path thisDir, Path context);
|
||||||
|
}
|
||||||
|
|
||||||
|
interface InitLoader {
|
||||||
|
void load();
|
||||||
|
}
|
||||||
|
|
||||||
|
record Context(Path root, WatchService watchService, Map<WatchKey, Path> watchKeys, Set<WatchEvent.Kind<?>> kinds) {
|
||||||
|
Context(Path root) throws IOException {
|
||||||
|
this(root, FileSystems.getDefault().newWatchService(), new HashMap<>(), new LinkedHashSet<>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,349 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import cn.hutool.core.io.FileUtil;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import io.modelcontextprotocol.common.McpTransportContext;
|
||||||
|
import io.modelcontextprotocol.json.McpJsonMapper;
|
||||||
|
import io.modelcontextprotocol.server.McpServer;
|
||||||
|
import io.modelcontextprotocol.server.McpStatelessAsyncServer;
|
||||||
|
import io.modelcontextprotocol.server.McpStatelessServerFeatures;
|
||||||
|
import io.modelcontextprotocol.spec.McpSchema;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
import work.slhaf.partner.common.mcp.InProcessMcpTransport;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||||
|
import work.slhaf.partner.core.action.exception.ActionInitFailedException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
class DynamicActionMcpManager implements AutoCloseable {
|
||||||
|
|
||||||
|
private final Path root;
|
||||||
|
private final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions;
|
||||||
|
private final CommandExecutionService commandExecutionService;
|
||||||
|
private final McpStatelessAsyncServer server;
|
||||||
|
private final InProcessMcpTransport clientTransport;
|
||||||
|
private final DirectoryWatchSupport watchSupport;
|
||||||
|
|
||||||
|
DynamicActionMcpManager(Path root,
|
||||||
|
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions,
|
||||||
|
ExecutorService executor,
|
||||||
|
CommandExecutionService commandExecutionService) throws IOException {
|
||||||
|
this.root = root;
|
||||||
|
this.existedMetaActions = existedMetaActions;
|
||||||
|
this.commandExecutionService = commandExecutionService;
|
||||||
|
InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair();
|
||||||
|
this.clientTransport = pair.clientSide();
|
||||||
|
McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder()
|
||||||
|
.tools(true)
|
||||||
|
.build();
|
||||||
|
this.server = McpServer.async(pair.serverSide())
|
||||||
|
.capabilities(serverCapabilities)
|
||||||
|
.jsonMapper(McpJsonMapper.getDefault())
|
||||||
|
.build();
|
||||||
|
this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, true, this::loadExisting)
|
||||||
|
.onCreate(this::handleCreate)
|
||||||
|
.onModify(this::handleModify)
|
||||||
|
.onDelete(this::handleDelete)
|
||||||
|
.onOverflow((thisDir, context) -> reconcile());
|
||||||
|
}
|
||||||
|
|
||||||
|
McpTransportConfig.InProcess clientConfig(int timeout) {
|
||||||
|
return new McpTransportConfig.InProcess(timeout, clientTransport);
|
||||||
|
}
|
||||||
|
|
||||||
|
void start() {
|
||||||
|
watchSupport.start();
|
||||||
|
log.info("DynamicActionMcp 文件监听注册完毕");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadExisting() {
|
||||||
|
File file = root.toFile();
|
||||||
|
if (file.isFile()) {
|
||||||
|
throw new ActionInitFailedException("未找到目录: " + root);
|
||||||
|
}
|
||||||
|
File[] files = file.listFiles();
|
||||||
|
if (files == null) {
|
||||||
|
throw new ActionInitFailedException("未正常读取目录: " + root);
|
||||||
|
}
|
||||||
|
for (File dir : files) {
|
||||||
|
if (!normalPath(dir.toPath())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
addAction(dir.getName(), dir.toPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isTmp(Path context) {
|
||||||
|
return context.getFileName().endsWith(".tmp");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleModify(Path thisDir, Path context) {
|
||||||
|
if (context == null || isTmp(context) || !normalPath(thisDir)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
modify(thisDir, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleCreate(Path thisDir, Path context) {
|
||||||
|
if (context == null || isTmp(context)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (thisDir.equals(root) && Files.isDirectory(context)) {
|
||||||
|
try {
|
||||||
|
watchSupport.registerDirectory(context);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("监听目录注册失败: {}", context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (normalPath(thisDir)) {
|
||||||
|
modify(thisDir, context);
|
||||||
|
}
|
||||||
|
if (Files.isDirectory(context) && normalPath(context)) {
|
||||||
|
File[] files = context.toFile().listFiles();
|
||||||
|
if (files == null) {
|
||||||
|
log.warn("目录无法访问: {}", context);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (File file : files) {
|
||||||
|
modify(context, file.toPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleDelete(Path thisDir, Path context) {
|
||||||
|
if (context == null || isTmp(context)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (thisDir.equals(root)) {
|
||||||
|
String name = context.getFileName().toString();
|
||||||
|
Path candidate = root.resolve(name);
|
||||||
|
if (Files.isDirectory(candidate)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
removeAction(name);
|
||||||
|
AtomicReference<java.nio.file.WatchKey> toRemove = new AtomicReference<>();
|
||||||
|
watchSupport.context().watchKeys().forEach((key, path) -> {
|
||||||
|
if (path.getFileName().toString().equals(name)) {
|
||||||
|
key.cancel();
|
||||||
|
toRemove.set(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (toRemove.get() != null) {
|
||||||
|
watchSupport.context().watchKeys().remove(toRemove.get());
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!thisDir.equals(root) && !normalPath(thisDir)) {
|
||||||
|
removeAction(thisDir.getFileName().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reconcile() {
|
||||||
|
Set<String> existed = existedMetaActions.keySet().stream()
|
||||||
|
.filter(actionKey -> actionKey.startsWith("local::"))
|
||||||
|
.map(actionKey -> actionKey.split("::")[1])
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
Set<String> currentDirs = new HashSet<>();
|
||||||
|
try (Stream<Path> stream = Files.list(root).filter(Files::isDirectory)) {
|
||||||
|
stream.forEach(path -> {
|
||||||
|
String name = path.getFileName().toString();
|
||||||
|
currentDirs.add(name);
|
||||||
|
boolean contains = existed.contains(name);
|
||||||
|
boolean normal = normalPath(path);
|
||||||
|
if (contains && !normal) {
|
||||||
|
removeAction(name);
|
||||||
|
}
|
||||||
|
if (!contains) {
|
||||||
|
boolean alreadyWatching = watchSupport.isWatching(path);
|
||||||
|
if (!alreadyWatching) {
|
||||||
|
try {
|
||||||
|
watchSupport.registerDirectory(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("监听目录注册失败: {}", path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (normal) {
|
||||||
|
addAction(name, path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("目录无法读取: {}", root);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (String existedName : existed) {
|
||||||
|
if (!currentDirs.contains(existedName)) {
|
||||||
|
removeAction(existedName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modify(Path thisDir, Path context) {
|
||||||
|
String fileName = context.getFileName().toString();
|
||||||
|
if (fileName.equals("desc.json")) {
|
||||||
|
handleMetaModify(thisDir);
|
||||||
|
}
|
||||||
|
if (fileName.startsWith("run.")) {
|
||||||
|
handleProgramModify(thisDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleProgramModify(Path thisDir) {
|
||||||
|
String name = thisDir.getFileName().toString();
|
||||||
|
String actionKey = "local::" + name;
|
||||||
|
if (existedMetaActions.containsKey(actionKey)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!addAction(name, thisDir)) {
|
||||||
|
removeAction(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleMetaModify(Path thisDir) {
|
||||||
|
String name = thisDir.getFileName().toString();
|
||||||
|
if (!addAction(name, thisDir)) {
|
||||||
|
removeAction(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean addAction(String name, Path dir) {
|
||||||
|
File program = null;
|
||||||
|
try (Stream<Path> stream = Files.list(dir)) {
|
||||||
|
for (Path path : stream.toList()) {
|
||||||
|
if (isTmp(path)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (path.getFileName().toString().startsWith("run.")) {
|
||||||
|
program = path.toFile();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("添加 action 失败", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
MetaActionInfo info;
|
||||||
|
try {
|
||||||
|
info = JSONUtil.readJSONObject(dir.resolve("desc.json").toFile(), StandardCharsets.UTF_8).toBean(MetaActionInfo.class);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("desc.json 加载失败: {}", dir);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String actionKey = "local::" + name;
|
||||||
|
existedMetaActions.put(actionKey, info);
|
||||||
|
server.addTool(buildAsyncToolSpecification(info, program, actionKey, name)).subscribe();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeAction(String name) {
|
||||||
|
existedMetaActions.remove("local::" + name);
|
||||||
|
server.removeTool(name).subscribe();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean normalPath(Path path) {
|
||||||
|
File[] files = loadFiles(path);
|
||||||
|
if (files == null || files.length < 2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
boolean desc = false;
|
||||||
|
int run = 0;
|
||||||
|
for (File file : files) {
|
||||||
|
String fileName = file.getName();
|
||||||
|
if (fileName.endsWith(".tmp")) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (fileName.equals("desc.json")) {
|
||||||
|
desc = true;
|
||||||
|
}
|
||||||
|
if (fileName.startsWith("run.")) {
|
||||||
|
run++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return run == 1 && desc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private File[] loadFiles(Path path) {
|
||||||
|
if (!Files.isDirectory(path)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return path.toFile().listFiles();
|
||||||
|
}
|
||||||
|
|
||||||
|
private McpStatelessServerFeatures.AsyncToolSpecification buildAsyncToolSpecification(MetaActionInfo info, File program, String actionKey, String name) {
|
||||||
|
Map<String, Object> additional = Map.of(
|
||||||
|
"pre", info.getPreActions(),
|
||||||
|
"post", info.getPostActions(),
|
||||||
|
"strict_pre", info.isStrictDependencies(),
|
||||||
|
"io", info.isIo()
|
||||||
|
);
|
||||||
|
McpSchema.Tool tool = McpSchema.Tool.builder()
|
||||||
|
.name(name)
|
||||||
|
.description(info.getDescription())
|
||||||
|
.inputSchema(McpJsonMapper.getDefault(), JSONObject.toJSONString(info.getParams()))
|
||||||
|
.outputSchema(info.getResponseSchema())
|
||||||
|
.title(actionKey)
|
||||||
|
.meta(additional)
|
||||||
|
.build();
|
||||||
|
return McpStatelessServerFeatures.AsyncToolSpecification.builder()
|
||||||
|
.tool(tool)
|
||||||
|
.callHandler(buildToolHandler(program))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private BiFunction<McpTransportContext, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> buildToolHandler(File program) {
|
||||||
|
return (mcpTransportContext, callToolRequest) -> {
|
||||||
|
Map<String, Object> arguments = callToolRequest.arguments();
|
||||||
|
if (arguments == null) {
|
||||||
|
arguments = Map.of();
|
||||||
|
}
|
||||||
|
String ext = FileUtil.getSuffix(program);
|
||||||
|
String[] commands = commandExecutionService.buildCommands(ext, arguments, program.getAbsolutePath());
|
||||||
|
if (commands == null) {
|
||||||
|
return Mono.just(McpSchema.CallToolResult.builder()
|
||||||
|
.addTextContent("未知文件类型: " + program.getName())
|
||||||
|
.isError(true)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
return Mono.fromCallable(() -> {
|
||||||
|
CommandExecutionService.Result execResult = commandExecutionService.exec(commands);
|
||||||
|
McpSchema.CallToolResult.Builder builder = McpSchema.CallToolResult.builder()
|
||||||
|
.isError(!execResult.isOk());
|
||||||
|
List<String> resultList = execResult.getResultList();
|
||||||
|
if (resultList != null && !resultList.isEmpty()) {
|
||||||
|
builder.textContent(resultList);
|
||||||
|
builder.structuredContent(resultList);
|
||||||
|
} else {
|
||||||
|
builder.addTextContent(execResult.getTotal());
|
||||||
|
builder.structuredContent(execResult.getTotal());
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}).subscribeOn(Schedulers.boundedElastic());
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
try {
|
||||||
|
watchSupport.close();
|
||||||
|
} catch (IOException ignored) {
|
||||||
|
}
|
||||||
|
server.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,81 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
class LocalProcessCommandExecutionService implements CommandExecutionService {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] buildCommands(String ext, Map<String, Object> params, String absolutePath) {
|
||||||
|
String command = switch (ext) {
|
||||||
|
case "py" -> "python";
|
||||||
|
case "sh" -> "bash";
|
||||||
|
default -> null;
|
||||||
|
};
|
||||||
|
if (command == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
int paramSize = params == null ? 0 : params.size();
|
||||||
|
String[] commands = new String[paramSize + 2];
|
||||||
|
commands[0] = command;
|
||||||
|
commands[1] = absolutePath;
|
||||||
|
AtomicInteger paramCount = new AtomicInteger(2);
|
||||||
|
if (params != null) {
|
||||||
|
params.forEach((param, value) -> commands[paramCount.getAndIncrement()] = "--" + param + "=" + value);
|
||||||
|
}
|
||||||
|
return commands;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result exec(String... command) {
|
||||||
|
Result result = new Result();
|
||||||
|
List<String> output = new ArrayList<>();
|
||||||
|
List<String> error = new ArrayList<>();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Process process = new ProcessBuilder(command)
|
||||||
|
.redirectErrorStream(false)
|
||||||
|
.start();
|
||||||
|
|
||||||
|
Thread stdoutThread = new Thread(() -> {
|
||||||
|
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
||||||
|
String line;
|
||||||
|
while ((line = reader.readLine()) != null) {
|
||||||
|
output.add(line);
|
||||||
|
}
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Thread stderrThread = new Thread(() -> {
|
||||||
|
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
||||||
|
String line;
|
||||||
|
while ((line = reader.readLine()) != null) {
|
||||||
|
error.add(line);
|
||||||
|
}
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
stdoutThread.start();
|
||||||
|
stderrThread.start();
|
||||||
|
|
||||||
|
int exitCode = process.waitFor();
|
||||||
|
stdoutThread.join();
|
||||||
|
stderrThread.join();
|
||||||
|
|
||||||
|
result.setOk(exitCode == 0);
|
||||||
|
result.setResultList(output.isEmpty() ? error : output);
|
||||||
|
result.setTotal(String.join("\n", output.isEmpty() ? error : output));
|
||||||
|
} catch (Exception e) {
|
||||||
|
result.setOk(false);
|
||||||
|
result.setTotal(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,63 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import io.modelcontextprotocol.client.McpSyncClient;
|
||||||
|
import io.modelcontextprotocol.spec.McpSchema;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaAction;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
class McpActionExecutor {
|
||||||
|
|
||||||
|
private final McpClientRegistry mcpClientRegistry;
|
||||||
|
|
||||||
|
McpActionExecutor(McpClientRegistry mcpClientRegistry) {
|
||||||
|
this.mcpClientRegistry = mcpClientRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
|
RunnerClient.RunnerResponse run(MetaAction metaAction) {
|
||||||
|
RunnerClient.RunnerResponse response = new RunnerClient.RunnerResponse();
|
||||||
|
McpSyncClient mcpClient = mcpClientRegistry.get(metaAction.getLocation());
|
||||||
|
if (mcpClient == null) {
|
||||||
|
response.setOk(false);
|
||||||
|
response.setData("MCP client not found: " + metaAction.getLocation());
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
McpSchema.CallToolRequest callToolRequest = McpSchema.CallToolRequest.builder()
|
||||||
|
.name(metaAction.getName())
|
||||||
|
.arguments(metaAction.getParams())
|
||||||
|
.build();
|
||||||
|
McpSchema.CallToolResult callToolResult = mcpClient.callTool(callToolRequest);
|
||||||
|
Boolean error = callToolResult.isError();
|
||||||
|
response.setOk(error == null || !error);
|
||||||
|
response.setData(extractResponseData(callToolResult));
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String extractResponseData(McpSchema.CallToolResult callToolResult) {
|
||||||
|
Object structuredContent = callToolResult.structuredContent();
|
||||||
|
if (structuredContent != null) {
|
||||||
|
return String.valueOf(structuredContent);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<McpSchema.Content> contents = callToolResult.content();
|
||||||
|
if (contents != null && !contents.isEmpty()) {
|
||||||
|
String contentSummary = contents.stream()
|
||||||
|
.map(this::renderContent)
|
||||||
|
.filter(text -> text != null && !text.isBlank())
|
||||||
|
.collect(Collectors.joining("\n"));
|
||||||
|
if (!contentSummary.isBlank()) {
|
||||||
|
return contentSummary;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return callToolResult.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String renderContent(McpSchema.Content content) {
|
||||||
|
if (content instanceof McpSchema.TextContent textContent) {
|
||||||
|
return textContent.text();
|
||||||
|
}
|
||||||
|
return String.valueOf(content);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,49 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import io.modelcontextprotocol.client.McpSyncClient;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
class McpClientRegistry implements AutoCloseable {
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<String, McpSyncClient> clients = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
McpSyncClient get(String serverName) {
|
||||||
|
return clients.get(serverName);
|
||||||
|
}
|
||||||
|
|
||||||
|
void register(String serverName, McpSyncClient client) {
|
||||||
|
McpSyncClient old = clients.put(serverName, client);
|
||||||
|
if (old != null && old != client) {
|
||||||
|
old.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
McpSyncClient remove(String serverName) {
|
||||||
|
McpSyncClient client = detach(serverName);
|
||||||
|
if (client != null) {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
McpSyncClient detach(String serverName) {
|
||||||
|
return clients.remove(serverName);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean contains(String serverName) {
|
||||||
|
return clients.containsKey(serverName);
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> listIds() {
|
||||||
|
return new HashSet<>(clients.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
clients.forEach((id, client) -> client.close());
|
||||||
|
clients.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,295 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import io.modelcontextprotocol.client.McpClient;
|
||||||
|
import io.modelcontextprotocol.client.McpSyncClient;
|
||||||
|
import io.modelcontextprotocol.spec.McpSchema;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
class McpConfigWatcher implements AutoCloseable {
|
||||||
|
|
||||||
|
private final Path root;
|
||||||
|
private final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions;
|
||||||
|
private final McpClientRegistry mcpClientRegistry;
|
||||||
|
private final McpTransportFactory mcpTransportFactory;
|
||||||
|
private final McpMetaRegistry mcpMetaRegistry;
|
||||||
|
private final DirectoryWatchSupport watchSupport;
|
||||||
|
private final Map<File, McpConfigFileRecord> mcpConfigFileCache = new HashMap<>();
|
||||||
|
|
||||||
|
McpConfigWatcher(Path root,
|
||||||
|
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions,
|
||||||
|
McpClientRegistry mcpClientRegistry,
|
||||||
|
McpTransportFactory mcpTransportFactory,
|
||||||
|
McpMetaRegistry mcpMetaRegistry,
|
||||||
|
ExecutorService executor) throws IOException {
|
||||||
|
this.root = root;
|
||||||
|
this.existedMetaActions = existedMetaActions;
|
||||||
|
this.mcpClientRegistry = mcpClientRegistry;
|
||||||
|
this.mcpTransportFactory = mcpTransportFactory;
|
||||||
|
this.mcpMetaRegistry = mcpMetaRegistry;
|
||||||
|
this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, false, () -> loadInitial())
|
||||||
|
.onCreate(this::handleCreate)
|
||||||
|
.onModify((thisDir, context) -> checkAndReload(true))
|
||||||
|
.onDelete(this::handleDelete)
|
||||||
|
.onOverflow((thisDir, context) -> checkAndReload(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
void start() {
|
||||||
|
watchSupport.start();
|
||||||
|
log.info("CommonMcp 文件监听注册完毕");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadInitial() {
|
||||||
|
File[] files = loadFiles(root);
|
||||||
|
if (files == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (File file : files) {
|
||||||
|
if (!normalFile(file)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
loadAndRegisterMcpClientsFromFile(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleCreate(Path thisDir, Path context) {
|
||||||
|
if (context == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
File file = context.toFile();
|
||||||
|
if (!normalFile(file)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
loadAndRegisterMcpClientsFromFile(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleDelete(Path thisDir, Path context) {
|
||||||
|
if (context == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
File file = context.toFile();
|
||||||
|
if (!file.getName().endsWith(".json")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
McpConfigFileRecord fileRecord = mcpConfigFileCache.remove(file);
|
||||||
|
if (fileRecord == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (String clientId : fileRecord.paramsCacheMap().keySet()) {
|
||||||
|
McpSyncClient client = mcpClientRegistry.detach(clientId);
|
||||||
|
if (client == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (McpSchema.Tool tool : client.listTools().tools()) {
|
||||||
|
existedMetaActions.remove(clientId + "::" + tool.name());
|
||||||
|
}
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean normalFile(File file) {
|
||||||
|
return file.exists() && file.isFile() && file.getName().endsWith(".json");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerMcpClient(String id, McpTransportConfig transportConfig) {
|
||||||
|
McpSyncClient client = McpClient.sync(mcpTransportFactory.create(transportConfig, null))
|
||||||
|
.requestTimeout(Duration.ofSeconds(transportConfig.timeout()))
|
||||||
|
.clientInfo(new McpSchema.Implementation(id, "PARTNER"))
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
for (McpSchema.Tool tool : client.listTools().tools()) {
|
||||||
|
existedMetaActions.put(id + "::" + tool.name(), mcpMetaRegistry.buildMetaActionInfo(id, tool));
|
||||||
|
}
|
||||||
|
mcpClientRegistry.register(id, client);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("[{}] MCP client init failed, skipped (probably non-stdio-safe)", id, e);
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private cn.hutool.json.JSONObject readJson(File file) {
|
||||||
|
try {
|
||||||
|
return JSONUtil.readJSONObject(file, StandardCharsets.UTF_8);
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private cn.hutool.json.JSONObject readMcp(cn.hutool.json.JSONObject json, String id) {
|
||||||
|
try {
|
||||||
|
return json.getJSONObject(id);
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private McpTransportConfig readParams(cn.hutool.json.JSONObject mcp) {
|
||||||
|
Set<String> keys = mcp.keySet();
|
||||||
|
int timeout = mcp.getInt("timeout", 30);
|
||||||
|
|
||||||
|
if (matchesKeys(keys, Set.of("command", "args", "env"), Set.of("timeout"))) {
|
||||||
|
String command = mcp.getStr("command");
|
||||||
|
Map<String, String> env = mcp.getBean("env", Map.class);
|
||||||
|
java.util.List<String> args = mcp.getBeanList("args", String.class);
|
||||||
|
if (command == null || env == null || args == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new McpTransportConfig.Stdio(timeout, command, env, args);
|
||||||
|
}
|
||||||
|
if (matchesKeys(keys, Set.of("uri", "endpoint", "headers"), Set.of("timeout"))) {
|
||||||
|
String uri = mcp.getStr("uri");
|
||||||
|
String endpoint = mcp.getStr("endpoint");
|
||||||
|
Map<String, String> headers = mcp.getBean("headers", Map.class);
|
||||||
|
if (uri == null || endpoint == null || headers == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new McpTransportConfig.Http(timeout, uri, endpoint, headers);
|
||||||
|
}
|
||||||
|
if (matchesKeys(keys, Set.of("url"), Set.of("timeout"))) {
|
||||||
|
String url = mcp.getStr("url");
|
||||||
|
if (url == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new McpTransportConfig.Http(timeout, url, "", Map.of());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean matchesKeys(Set<String> actualKeys, Set<String> requiredKeys, Set<String> optionalKeys) {
|
||||||
|
if (!actualKeys.containsAll(requiredKeys)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Set<String> allowedKeys = new HashSet<>(requiredKeys);
|
||||||
|
allowedKeys.addAll(optionalKeys);
|
||||||
|
return allowedKeys.containsAll(actualKeys);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkAndReload(boolean trustCache) {
|
||||||
|
HashMap<String, McpTransportConfig> changedMap = new HashMap<>();
|
||||||
|
HashSet<String> existingMcpIdSet = new HashSet<>();
|
||||||
|
|
||||||
|
File[] files = loadFiles(root);
|
||||||
|
if (files == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (File file : files) {
|
||||||
|
if (!normalFile(file)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
McpConfigFileRecord fileRecord = mcpConfigFileCache.get(file);
|
||||||
|
boolean fileRecordExists = fileRecord != null;
|
||||||
|
if (fileRecordExists && !fileChanged(file, fileRecord) && trustCache) {
|
||||||
|
existingMcpIdSet.addAll(fileRecord.paramsCacheMap().keySet());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
cn.hutool.json.JSONObject mcpConfigJson = readJson(file);
|
||||||
|
if (mcpConfigJson == null) {
|
||||||
|
if (fileRecordExists) {
|
||||||
|
existingMcpIdSet.addAll(fileRecord.paramsCacheMap().keySet());
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
McpConfigFileRecord newFileRecord = new McpConfigFileRecord(file.lastModified(), file.length(), new HashMap<>());
|
||||||
|
for (String id : mcpConfigJson.keySet()) {
|
||||||
|
cn.hutool.json.JSONObject mcp = readMcp(mcpConfigJson, id);
|
||||||
|
if (mcp == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
McpTransportConfig params = readParams(mcp);
|
||||||
|
if (params == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
existingMcpIdSet.add(id);
|
||||||
|
newFileRecord.paramsCacheMap().put(id, params);
|
||||||
|
if (fileRecordExists) {
|
||||||
|
McpTransportConfig paramsCache = fileRecord.paramsCacheMap().get(id);
|
||||||
|
if (paramsCache != null && paramsCache.equals(params)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
changedMap.put(id, params);
|
||||||
|
}
|
||||||
|
mcpConfigFileCache.put(file, newFileRecord);
|
||||||
|
}
|
||||||
|
updateMcpClients(changedMap, existingMcpIdSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateMcpClients(HashMap<String, McpTransportConfig> changedMap, HashSet<String> existingMcpIdSet) {
|
||||||
|
changedMap.forEach(this::registerMcpClient);
|
||||||
|
for (String clientId : mcpClientRegistry.listIds()) {
|
||||||
|
if (clientId.equals(LocalRunnerClient.MCP_NAME_DESC) || clientId.equals(LocalRunnerClient.MCP_NAME_DYNAMIC)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!existingMcpIdSet.contains(clientId)) {
|
||||||
|
mcpClientRegistry.remove(clientId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
existedMetaActions.keySet().removeIf(actionKey -> {
|
||||||
|
String serverId = actionKey.split("::")[0];
|
||||||
|
return !serverId.equals("local")
|
||||||
|
&& !serverId.equals(LocalRunnerClient.MCP_NAME_DESC)
|
||||||
|
&& !serverId.equals(LocalRunnerClient.MCP_NAME_DYNAMIC)
|
||||||
|
&& !existingMcpIdSet.contains(serverId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean fileChanged(File file, McpConfigFileRecord fileRecord) {
|
||||||
|
return fileRecord.lastModified() != file.lastModified() || fileRecord.length() != file.length();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadAndRegisterMcpClientsFromFile(File file) {
|
||||||
|
cn.hutool.json.JSONObject mcpConfigJson = readJson(file);
|
||||||
|
if (mcpConfigJson == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
McpConfigFileRecord newFileRecord = new McpConfigFileRecord(file.lastModified(), file.length());
|
||||||
|
for (String id : mcpConfigJson.keySet()) {
|
||||||
|
cn.hutool.json.JSONObject mcp = readMcp(mcpConfigJson, id);
|
||||||
|
if (mcp == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
McpTransportConfig params = readParams(mcp);
|
||||||
|
if (params == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
registerMcpClient(id, params);
|
||||||
|
newFileRecord.paramsCacheMap().put(id, params);
|
||||||
|
}
|
||||||
|
mcpConfigFileCache.put(file, newFileRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
private File[] loadFiles(Path path) {
|
||||||
|
if (!path.toFile().isDirectory()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return path.toFile().listFiles();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws Exception {
|
||||||
|
watchSupport.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private record McpConfigFileRecord(long lastModified, long length, Map<String, McpTransportConfig> paramsCacheMap) {
|
||||||
|
private McpConfigFileRecord(long lastModified, long length) {
|
||||||
|
this(lastModified, length, new HashMap<>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,52 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
class McpDescWatcher implements AutoCloseable {
|
||||||
|
|
||||||
|
private final Path root;
|
||||||
|
private final McpMetaRegistry mcpMetaRegistry;
|
||||||
|
private final DirectoryWatchSupport watchSupport;
|
||||||
|
|
||||||
|
McpDescWatcher(Path root, McpMetaRegistry mcpMetaRegistry, ExecutorService executor) throws IOException {
|
||||||
|
this.root = root;
|
||||||
|
this.mcpMetaRegistry = mcpMetaRegistry;
|
||||||
|
this.watchSupport = new DirectoryWatchSupport(new DirectoryWatchSupport.Context(root), executor, true, () -> mcpMetaRegistry.loadDirectory(root))
|
||||||
|
.onCreate(this::handleUpsert)
|
||||||
|
.onModify(this::handleUpsert)
|
||||||
|
.onDelete(this::handleDelete)
|
||||||
|
.onOverflow((thisDir, context) -> mcpMetaRegistry.reconcile(root));
|
||||||
|
}
|
||||||
|
|
||||||
|
void start() {
|
||||||
|
watchSupport.start();
|
||||||
|
log.info("DescMcp 文件监听注册完毕");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleUpsert(Path thisDir, Path context) {
|
||||||
|
if (context == null || Files.isDirectory(context) || !mcpMetaRegistry.isValidDescFile(context.getFileName().toString())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!mcpMetaRegistry.addOrUpdate(context)) {
|
||||||
|
mcpMetaRegistry.remove(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleDelete(Path thisDir, Path context) {
|
||||||
|
if (context == null || !mcpMetaRegistry.isValidDescFile(context.getFileName().toString())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
mcpMetaRegistry.remove(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws Exception {
|
||||||
|
watchSupport.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,233 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import io.modelcontextprotocol.common.McpTransportContext;
|
||||||
|
import io.modelcontextprotocol.json.McpJsonMapper;
|
||||||
|
import io.modelcontextprotocol.server.McpServer;
|
||||||
|
import io.modelcontextprotocol.server.McpStatelessAsyncServer;
|
||||||
|
import io.modelcontextprotocol.server.McpStatelessServerFeatures;
|
||||||
|
import io.modelcontextprotocol.spec.McpSchema;
|
||||||
|
import javassist.NotFoundException;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import work.slhaf.partner.common.mcp.InProcessMcpTransport;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
class McpMetaRegistry implements AutoCloseable {
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions;
|
||||||
|
private final ConcurrentHashMap<String, String> descCache = new ConcurrentHashMap<>();
|
||||||
|
private final ConcurrentHashMap<String, MetaActionInfo> descInfoCache = new ConcurrentHashMap<>();
|
||||||
|
private final ConcurrentHashMap<String, MetaActionInfo> originalInfoCache = new ConcurrentHashMap<>();
|
||||||
|
private final McpStatelessAsyncServer descServer;
|
||||||
|
private final InProcessMcpTransport clientTransport;
|
||||||
|
|
||||||
|
McpMetaRegistry(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions) {
|
||||||
|
this.existedMetaActions = existedMetaActions;
|
||||||
|
InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair();
|
||||||
|
this.clientTransport = pair.clientSide();
|
||||||
|
McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder()
|
||||||
|
.resources(true, true)
|
||||||
|
.build();
|
||||||
|
this.descServer = McpServer.async(pair.serverSide())
|
||||||
|
.capabilities(serverCapabilities)
|
||||||
|
.jsonMapper(McpJsonMapper.getDefault())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
McpTransportConfig.InProcess clientConfig(String serverName, int timeout) {
|
||||||
|
return new McpTransportConfig.InProcess(timeout, clientTransport);
|
||||||
|
}
|
||||||
|
|
||||||
|
void loadDirectory(Path root) {
|
||||||
|
File[] files = loadFiles(root);
|
||||||
|
if (files == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (File file : files) {
|
||||||
|
addOrUpdate(file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean addOrUpdate(Path path) {
|
||||||
|
return addOrUpdate(path.toFile());
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean addOrUpdate(File file) {
|
||||||
|
String name = file.getName();
|
||||||
|
if (!isValidDescFile(name)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
MetaActionInfo info = JSONUtil.readJSONObject(file, StandardCharsets.UTF_8).toBean(MetaActionInfo.class);
|
||||||
|
String uri = file.toPath().toUri().toString();
|
||||||
|
descCache.put(uri, JSONObject.toJSONString(info));
|
||||||
|
String actionKey = name.replace(".desc.json", "");
|
||||||
|
descInfoCache.put(actionKey, copyMetaActionInfo(info));
|
||||||
|
descServer.addResource(buildAsyncResourceSpecification(name, uri)).block();
|
||||||
|
if (existedMetaActions.containsKey(actionKey)) {
|
||||||
|
existedMetaActions.put(actionKey, mergeWithOriginal(actionKey, info));
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("desc.json 解析失败: {}", file.getAbsolutePath());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void remove(Path path) {
|
||||||
|
String uri = path.toUri().toString();
|
||||||
|
String actionKey = path.getFileName().toString().replace(".desc.json", "");
|
||||||
|
descCache.remove(uri);
|
||||||
|
descInfoCache.remove(actionKey);
|
||||||
|
descServer.removeResource(uri).block();
|
||||||
|
MetaActionInfo originalInfo = originalInfoCache.get(actionKey);
|
||||||
|
if (originalInfo != null) {
|
||||||
|
existedMetaActions.put(actionKey, copyMetaActionInfo(originalInfo));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
MetaActionInfo info = existedMetaActions.get(actionKey);
|
||||||
|
if (info != null) {
|
||||||
|
resetMetaActionInfo(info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void reconcile(Path root) {
|
||||||
|
File[] files = loadFiles(root);
|
||||||
|
if (files == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Set<String> currentUris = ConcurrentHashMap.newKeySet();
|
||||||
|
for (File file : files) {
|
||||||
|
if (!isValidDescFile(file.getName())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
currentUris.add(file.toURI().toString());
|
||||||
|
if (!addOrUpdate(file)) {
|
||||||
|
remove(file.toPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
List<String> serverUris = descServer.listResources()
|
||||||
|
.map(McpSchema.Resource::uri)
|
||||||
|
.collectList()
|
||||||
|
.block();
|
||||||
|
if (serverUris == null) {
|
||||||
|
log.error("无法获取 DescMcpServer 持有的资源列表");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (String uri : serverUris) {
|
||||||
|
if (!currentUris.contains(uri)) {
|
||||||
|
remove(Path.of(java.net.URI.create(uri)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MetaActionInfo buildMetaActionInfo(String serverId, McpSchema.Tool tool) {
|
||||||
|
String actionKey = serverId + "::" + tool.name();
|
||||||
|
MetaActionInfo baseInfo = buildToolMetaActionInfo(tool);
|
||||||
|
originalInfoCache.put(actionKey, copyMetaActionInfo(baseInfo));
|
||||||
|
MetaActionInfo override = descInfoCache.get(actionKey);
|
||||||
|
return override == null ? baseInfo : mergeWithOriginal(actionKey, override);
|
||||||
|
}
|
||||||
|
|
||||||
|
private McpStatelessServerFeatures.AsyncResourceSpecification buildAsyncResourceSpecification(String name, String uri) {
|
||||||
|
McpSchema.Resource resource = McpSchema.Resource.builder()
|
||||||
|
.name(name)
|
||||||
|
.title(name)
|
||||||
|
.description("Action descriptor for " + name)
|
||||||
|
.mimeType("application/json")
|
||||||
|
.uri(uri)
|
||||||
|
.build();
|
||||||
|
BiFunction<McpTransportContext, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler = (context, request) -> {
|
||||||
|
String result = descCache.get(request.uri());
|
||||||
|
if (result == null) {
|
||||||
|
return Mono.error(new NotFoundException("未找到 Resource: " + request.uri()));
|
||||||
|
}
|
||||||
|
return Mono.just(new McpSchema.ReadResourceResult(List.of(
|
||||||
|
new McpSchema.TextResourceContents(request.uri(), "application/json", result)
|
||||||
|
)));
|
||||||
|
};
|
||||||
|
return new McpStatelessServerFeatures.AsyncResourceSpecification(resource, readHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isValidDescFile(String fileName) {
|
||||||
|
return fileName.endsWith(".desc.json") && fileName.contains("::");
|
||||||
|
}
|
||||||
|
|
||||||
|
private File[] loadFiles(Path root) {
|
||||||
|
if (!Files.isDirectory(root)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return root.toFile().listFiles();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resetMetaActionInfo(@NotNull MetaActionInfo info) {
|
||||||
|
info.setIo(false);
|
||||||
|
if (info.getTags() != null) {
|
||||||
|
info.getTags().clear();
|
||||||
|
}
|
||||||
|
if (info.getPreActions() != null) {
|
||||||
|
info.getPreActions().clear();
|
||||||
|
}
|
||||||
|
if (info.getPostActions() != null) {
|
||||||
|
info.getPostActions().clear();
|
||||||
|
}
|
||||||
|
info.setStrictDependencies(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
descServer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private MetaActionInfo buildToolMetaActionInfo(McpSchema.Tool tool) {
|
||||||
|
MetaActionInfo info = new MetaActionInfo();
|
||||||
|
info.setDescription(tool.description());
|
||||||
|
Map<String, Object> outputSchema = tool.outputSchema();
|
||||||
|
info.setResponseSchema(outputSchema == null ? JSONObject.of() : JSONObject.from(outputSchema));
|
||||||
|
info.setParams(tool.inputSchema().properties());
|
||||||
|
|
||||||
|
Map<String, Object> meta = tool.meta();
|
||||||
|
if (meta != null) {
|
||||||
|
JSONObject metaJson = JSONObject.from(meta);
|
||||||
|
info.setIo(Boolean.TRUE.equals(metaJson.getBoolean("io")));
|
||||||
|
info.setPreActions(metaJson.getList("pre", String.class));
|
||||||
|
info.setPostActions(metaJson.getList("post", String.class));
|
||||||
|
info.setStrictDependencies(Boolean.TRUE.equals(metaJson.getBoolean("strict")));
|
||||||
|
info.setTags(metaJson.getList("tag", String.class));
|
||||||
|
}
|
||||||
|
return info;
|
||||||
|
}
|
||||||
|
|
||||||
|
private MetaActionInfo mergeWithOriginal(String actionKey, MetaActionInfo override) {
|
||||||
|
MetaActionInfo original = originalInfoCache.get(actionKey);
|
||||||
|
return override == null ? copyMetaActionInfo(original) : copyMetaActionInfo(override);
|
||||||
|
}
|
||||||
|
|
||||||
|
private MetaActionInfo copyMetaActionInfo(MetaActionInfo source) {
|
||||||
|
if (source == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
MetaActionInfo copy = new MetaActionInfo();
|
||||||
|
copy.setIo(source.isIo());
|
||||||
|
copy.setParams(source.getParams() == null ? null : new HashMap<>(source.getParams()));
|
||||||
|
copy.setDescription(source.getDescription());
|
||||||
|
copy.setTags(source.getTags() == null ? new ArrayList<>() : new ArrayList<>(source.getTags()));
|
||||||
|
copy.setPreActions(source.getPreActions() == null ? new ArrayList<>() : new ArrayList<>(source.getPreActions()));
|
||||||
|
copy.setPostActions(source.getPostActions() == null ? new ArrayList<>() : new ArrayList<>(source.getPostActions()));
|
||||||
|
copy.setStrictDependencies(source.isStrictDependencies());
|
||||||
|
copy.setResponseSchema(source.getResponseSchema() == null ? JSONObject.of() : JSONObject.from(source.getResponseSchema()));
|
||||||
|
return copy;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,22 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import work.slhaf.partner.common.mcp.InProcessMcpTransport;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
sealed interface McpTransportConfig permits McpTransportConfig.Http, McpTransportConfig.Stdio, McpTransportConfig.InProcess {
|
||||||
|
|
||||||
|
int timeout();
|
||||||
|
|
||||||
|
record Http(int timeout, String baseUri, String endpoint,
|
||||||
|
Map<String, String> headers) implements McpTransportConfig {
|
||||||
|
}
|
||||||
|
|
||||||
|
record Stdio(int timeout, String command, Map<String, String> env,
|
||||||
|
List<String> args) implements McpTransportConfig {
|
||||||
|
}
|
||||||
|
|
||||||
|
record InProcess(int timeout, InProcessMcpTransport clientTransport) implements McpTransportConfig {
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,40 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
|
||||||
|
import io.modelcontextprotocol.client.transport.ServerParameters;
|
||||||
|
import io.modelcontextprotocol.client.transport.StdioClientTransport;
|
||||||
|
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
|
||||||
|
import io.modelcontextprotocol.common.McpTransportContext;
|
||||||
|
import io.modelcontextprotocol.json.McpJsonMapper;
|
||||||
|
import io.modelcontextprotocol.spec.McpClientTransport;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.http.HttpRequest;
|
||||||
|
|
||||||
|
class McpTransportFactory {
|
||||||
|
|
||||||
|
McpClientTransport create(McpTransportConfig config, RunnerExecutionPolicy policy) {
|
||||||
|
return switch (config) {
|
||||||
|
case McpTransportConfig.Stdio stdio -> {
|
||||||
|
ServerParameters serverParameters = ServerParameters.builder(stdio.command())
|
||||||
|
.env(stdio.env())
|
||||||
|
.args(stdio.args())
|
||||||
|
.build();
|
||||||
|
yield new StdioClientTransport(serverParameters, McpJsonMapper.getDefault());
|
||||||
|
}
|
||||||
|
case McpTransportConfig.Http http -> {
|
||||||
|
McpSyncHttpClientRequestCustomizer customizer = new McpSyncHttpClientRequestCustomizer() {
|
||||||
|
@Override
|
||||||
|
public void customize(HttpRequest.Builder builder, String method, URI endpoint, String body, McpTransportContext context) {
|
||||||
|
http.headers().forEach(builder::setHeader);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
yield HttpClientSseClientTransport.builder(http.baseUri())
|
||||||
|
.httpRequestCustomizer(customizer)
|
||||||
|
.sseEndpoint(http.endpoint())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
case McpTransportConfig.InProcess inProcess -> inProcess.clientTransport();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import cn.hutool.core.io.FileUtil;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaAction;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
class OriginExecutionService {
|
||||||
|
|
||||||
|
private final CommandExecutionService commandExecutionService;
|
||||||
|
|
||||||
|
OriginExecutionService(CommandExecutionService commandExecutionService) {
|
||||||
|
this.commandExecutionService = commandExecutionService;
|
||||||
|
}
|
||||||
|
|
||||||
|
RunnerClient.RunnerResponse run(MetaAction metaAction) {
|
||||||
|
RunnerClient.RunnerResponse response = new RunnerClient.RunnerResponse();
|
||||||
|
File file = new File(metaAction.getLocation());
|
||||||
|
String ext = FileUtil.getSuffix(file);
|
||||||
|
if (ext == null || ext.isEmpty()) {
|
||||||
|
response.setOk(false);
|
||||||
|
response.setData("未知文件类型");
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
String[] commands = commandExecutionService.buildCommands(ext, metaAction.getParams(), file.getAbsolutePath());
|
||||||
|
if (commands == null || commands.length == 0) {
|
||||||
|
response.setOk(false);
|
||||||
|
response.setData("不支持的文件类型: " + file.getName());
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
CommandExecutionService.Result execResult = commandExecutionService.exec(commands);
|
||||||
|
response.setOk(execResult.isOk());
|
||||||
|
response.setData(execResult.getTotal());
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,4 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
interface RunnerExecutionPolicy {
|
||||||
|
}
|
||||||
@@ -0,0 +1,160 @@
|
|||||||
|
package work.slhaf.partner.core.action.runner;
|
||||||
|
|
||||||
|
import io.modelcontextprotocol.client.McpSyncClient;
|
||||||
|
import io.modelcontextprotocol.json.McpJsonMapper;
|
||||||
|
import io.modelcontextprotocol.spec.McpSchema;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaAction;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
class RunnerStabilizationTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void actionSerializerUsesNormalizedCodeType(@TempDir Path tempDir) throws Exception {
|
||||||
|
ActionSerializer serializer = new ActionSerializer(tempDir.toString(), tempDir.toString());
|
||||||
|
String builtPath = serializer.buildTmpPath("demo", "py");
|
||||||
|
Assertions.assertTrue(builtPath.endsWith(".py"));
|
||||||
|
|
||||||
|
MetaAction metaAction = new MetaAction("demo", false, MetaAction.Type.ORIGIN, builtPath);
|
||||||
|
serializer.tmpSerialize(metaAction, "print('ok')", ".py");
|
||||||
|
|
||||||
|
Assertions.assertTrue(Files.exists(Path.of(builtPath)));
|
||||||
|
Assertions.assertEquals("print('ok')", Files.readString(Path.of(builtPath)));
|
||||||
|
Assertions.assertThrows(Exception.class, () -> serializer.tmpSerialize(metaAction, "print('bad')", ".sh"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void mcpTransportConfigHasValueEquality() {
|
||||||
|
McpTransportConfig.Stdio left = new McpTransportConfig.Stdio(30, "npx", Map.of("A", "1"), List.of("-y", "demo"));
|
||||||
|
McpTransportConfig.Stdio right = new McpTransportConfig.Stdio(30, "npx", Map.of("A", "1"), List.of("-y", "demo"));
|
||||||
|
|
||||||
|
Assertions.assertEquals(left, right);
|
||||||
|
Assertions.assertEquals(left.hashCode(), right.hashCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void mcpConfigWatcherReadParamsAcceptsTimeout(@TempDir Path tempDir) throws Exception {
|
||||||
|
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
|
||||||
|
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
|
||||||
|
McpConfigWatcher watcher = new McpConfigWatcher(
|
||||||
|
tempDir,
|
||||||
|
existedMetaActions,
|
||||||
|
new McpClientRegistry(),
|
||||||
|
new McpTransportFactory(),
|
||||||
|
new McpMetaRegistry(existedMetaActions),
|
||||||
|
executor
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
Method readParams = McpConfigWatcher.class.getDeclaredMethod("readParams", cn.hutool.json.JSONObject.class);
|
||||||
|
readParams.setAccessible(true);
|
||||||
|
|
||||||
|
cn.hutool.json.JSONObject stdioJson = cn.hutool.json.JSONUtil.parseObj("""
|
||||||
|
{
|
||||||
|
"command": "npx",
|
||||||
|
"args": ["-y", "demo"],
|
||||||
|
"env": {},
|
||||||
|
"timeout": 45
|
||||||
|
}
|
||||||
|
""");
|
||||||
|
Object stdioConfig = readParams.invoke(watcher, stdioJson);
|
||||||
|
|
||||||
|
Assertions.assertInstanceOf(McpTransportConfig.Stdio.class, stdioConfig);
|
||||||
|
Assertions.assertEquals(45, ((McpTransportConfig.Stdio) stdioConfig).timeout());
|
||||||
|
} finally {
|
||||||
|
watcher.close();
|
||||||
|
executor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void localRunnerClientCloseIsIdempotent(@TempDir Path tempDir) {
|
||||||
|
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
|
||||||
|
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
|
||||||
|
LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
|
||||||
|
try {
|
||||||
|
client.close();
|
||||||
|
client.close();
|
||||||
|
} finally {
|
||||||
|
executor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void mcpActionExecutorUsesStructuredContentThenTextContent() {
|
||||||
|
McpClientRegistry registry = new McpClientRegistry();
|
||||||
|
McpSyncClient client = Mockito.mock(McpSyncClient.class);
|
||||||
|
registry.register("demo", client);
|
||||||
|
|
||||||
|
McpActionExecutor executor = new McpActionExecutor(registry);
|
||||||
|
MetaAction metaAction = new MetaAction("tool", false, MetaAction.Type.MCP, "demo");
|
||||||
|
|
||||||
|
Mockito.when(client.callTool(Mockito.any())).thenReturn(
|
||||||
|
new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("hello")), null, null, Map.of())
|
||||||
|
);
|
||||||
|
RunnerClient.RunnerResponse textResponse = executor.run(metaAction);
|
||||||
|
Assertions.assertTrue(textResponse.isOk());
|
||||||
|
Assertions.assertEquals("hello", textResponse.getData());
|
||||||
|
|
||||||
|
Mockito.when(client.callTool(Mockito.any())).thenReturn(
|
||||||
|
new McpSchema.CallToolResult(List.of(), Boolean.FALSE, Map.of("k", "v"), Map.of())
|
||||||
|
);
|
||||||
|
RunnerClient.RunnerResponse structuredResponse = executor.run(metaAction);
|
||||||
|
Assertions.assertTrue(structuredResponse.isOk());
|
||||||
|
Assertions.assertEquals("{k=v}", structuredResponse.getData());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void mcpMetaRegistryFallsBackToOriginalToolMetaAfterDescRemoval(@TempDir Path tempDir) throws Exception {
|
||||||
|
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
|
||||||
|
McpMetaRegistry registry = new McpMetaRegistry(existedMetaActions);
|
||||||
|
try {
|
||||||
|
McpSchema.Tool tool = McpSchema.Tool.builder()
|
||||||
|
.name("tool")
|
||||||
|
.description("tool description")
|
||||||
|
.inputSchema(McpJsonMapper.getDefault(), "{\"type\":\"object\",\"properties\":{}}")
|
||||||
|
.outputSchema(Map.of("type", "string"))
|
||||||
|
.meta(Map.of("io", true, "pre", List.of("pre"), "post", List.of("post"), "strict", true, "tag", List.of("tag")))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
MetaActionInfo baseInfo = registry.buildMetaActionInfo("demo", tool);
|
||||||
|
existedMetaActions.put("demo::tool", baseInfo);
|
||||||
|
|
||||||
|
Path descFile = tempDir.resolve("demo::tool.desc.json");
|
||||||
|
Files.writeString(descFile, """
|
||||||
|
{
|
||||||
|
"io": false,
|
||||||
|
"params": {},
|
||||||
|
"description": "desc override",
|
||||||
|
"tags": ["desc"],
|
||||||
|
"preActions": [],
|
||||||
|
"postActions": [],
|
||||||
|
"strictDependencies": false,
|
||||||
|
"responseSchema": {}
|
||||||
|
}
|
||||||
|
""");
|
||||||
|
|
||||||
|
Assertions.assertTrue(registry.addOrUpdate(descFile));
|
||||||
|
Assertions.assertEquals("desc override", existedMetaActions.get("demo::tool").getDescription());
|
||||||
|
|
||||||
|
registry.remove(descFile);
|
||||||
|
MetaActionInfo restoredInfo = existedMetaActions.get("demo::tool");
|
||||||
|
Assertions.assertEquals("tool description", restoredInfo.getDescription());
|
||||||
|
Assertions.assertTrue(restoredInfo.isIo());
|
||||||
|
Assertions.assertEquals(List.of("tag"), restoredInfo.getTags());
|
||||||
|
} finally {
|
||||||
|
registry.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user