feat(LocalRunnerClient): support repairing description data while OVERFLOW event happened in DescMcpServer

This commit is contained in:
2026-01-02 21:29:18 +08:00
parent 04c98c7856
commit 4ea8926363

View File

@@ -36,6 +36,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.time.Duration;
@@ -45,8 +46,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -801,7 +800,7 @@ public class LocalRunnerClient extends RunnerClient {
private static final class Desc extends LocalWatchEventProcessor {
private final McpStatelessAsyncServer mcpDescServer;
private final HashMap<String, String> descCache = new HashMap<>();
private final ConcurrentHashMap<String, String> descCache = new ConcurrentHashMap<>();
private Desc(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, McpStatelessAsyncServer mcpDescServer, WatchContext ctx) {
super(existedMetaActions, ctx);
@@ -828,14 +827,7 @@ public class LocalRunnerClient extends RunnerClient {
}
private boolean normal(String fileName) {
String pattern = "[a-z][A-Z]+::[a-z][A-Z]+.desk.json";
Pattern p = Pattern.compile(pattern);
Matcher matcher = p.matcher(fileName);
if (!matcher.find()) {
log.error("文件名称不符合要求: {}", fileName);
return false;
}
return true;
return fileName.endsWith(".desc.json") && fileName.contains("::");
}
private boolean normal(File file) {
@@ -847,7 +839,6 @@ public class LocalRunnerClient extends RunnerClient {
return normal(path.toFile());
}
@SuppressWarnings("UnusedReturnValue")
private boolean addResource(File file) {
String name = file.getName();
if (!normal(name)) {
@@ -858,7 +849,11 @@ public class LocalRunnerClient extends RunnerClient {
MetaActionInfo info = JSONUtil.readJSONObject(file, StandardCharsets.UTF_8).toBean(MetaActionInfo.class);
String uri = ctx.root.resolve(name).toUri().toString();
descCache.put(uri, JSONObject.toJSONString(info));
mcpDescServer.addResource(buildAsyncResourceSpecification(name, uri)).subscribe();
mcpDescServer.addResource(buildAsyncResourceSpecification(name, uri)).block();
String actionKey = name.replace(".desc.json", "");
if (existedMetaActions.containsKey(actionKey)) {
existedMetaActions.put(actionKey, info);
}
} catch (Exception e) {
log.error("desc.json 解析失败: {}", file.getAbsolutePath());
return false;
@@ -866,6 +861,17 @@ public class LocalRunnerClient extends RunnerClient {
return true;
}
private void removeResource(Path path) {
String uri = path.toUri().toString();
String actionKey = path.getFileName().toString().replace(".desc.json", "");
descCache.remove(uri);
mcpDescServer.removeResource(uri).block();
if (existedMetaActions.containsKey(actionKey)) {
resetMetaActionInfo(existedMetaActions.get(actionKey));
}
}
@Override
@NotNull
protected LocalWatchServiceBuild.InitLoader buildLoad() {
@@ -892,30 +898,13 @@ public class LocalRunnerClient extends RunnerClient {
return (thisDir, context) -> {
// 排除目录事件、名称不符合要求的文件
String fileName = context.getFileName().toString();
if (!Files.isDirectory(context) || !normal(fileName)) {
if (!Files.isRegularFile(context) || !normal(fileName)) {
return;
}
// 先尝试能否正常读取,再决定是否步入更新逻辑
MetaActionInfo info;
try {
info = JSONUtil.readJSONObject(context.toFile(), StandardCharsets.UTF_8).toBean(MetaActionInfo.class);
} catch (Exception e) {
// 加载失败也需要移除对应的 cache
descCache.remove(context.toUri().toString());
log.warn("desc.json 加载失败: {}", context);
return;
if (!addResource(context.toFile())) {
removeResource(context);
}
// 要处理的 MODIFY 上下文只有一种
// *.desc.json 发生变更时,检查是否存在于 existedMetaActions 内部
// 如果存在,则读取并更新对应的 info同时更新 descCache
// 如果不存在,则只更新 descCache
String actionKey = fileName.replace(".desc.json", "");
if (existedMetaActions.containsKey(actionKey)) {
existedMetaActions.put(actionKey, info);
}
descCache.put(context.toUri().toString(), JSONObject.toJSONString(info));
};
}
@@ -937,11 +926,7 @@ public class LocalRunnerClient extends RunnerClient {
// DELETE 事件发生后,需要移除对应的 descCache 条目;
// 如果存在对应的 info,也需要将其中的额外信息进行重置,只保留 Tools 自身的信息
descCache.remove(context.toUri().toString());
String actionKey = fileName.replace(".desc.json", "");
if (existedMetaActions.containsKey(actionKey)) {
resetMetaActionInfo(existedMetaActions.get(actionKey));
}
removeResource(context);
};
}
@@ -956,7 +941,57 @@ public class LocalRunnerClient extends RunnerClient {
@Override
@NotNull
protected LocalWatchServiceBuild.EventHandler buildOverflow() {
return null;
return (thisDir, context) -> {
// 对于 OVERFLOW 事件,需要依据当前目录下的所有 *.desc.json 针对现有内容进行修复
List<File> files;
try (Stream<Path> stream = Files.list(ctx.root)) {
files = stream.filter(Files::isRegularFile).filter(this::normal).map(Path::toFile).toList();
} catch (IOException e) {
log.error("目录无法访问: {}", ctx.root);
return;
}
Set<String> currentUriStr = new HashSet<>();
for (File file : files) {
MetaActionInfo info = null;
try {
info = JSONUtil.readJSONObject(file, StandardCharsets.UTF_8).toBean(MetaActionInfo.class);
} catch (Exception e) {
log.warn("desc.json 读取失败: {}", file.toPath());
}
boolean available = info != null;
// 如果读取成功,则更新 descCache
// 若在 existedMetaActions 中存在,则更新对应 info
String uriStr = file.toURI().toString();
currentUriStr.add(uriStr);
if (available) {
// 由于涉及内容均为 map所以额外判断没有必要直接进行 add 行为即可
addResource(file);
} else {
// 如果读取失败,则移除对应 descCache 条目
// 若在 existedMetaActions 中存在,则重置对应 info
removeResource(file.toPath());
}
}
List<String> serverUris = mcpDescServer.listResources()
.map(McpSchema.Resource::uri)
.collectList()
.block();
if (serverUris == null) {
log.error("无法获取 DescMcpServer 持有的资源列表");
return;
}
for (String uri : serverUris) {
if (!currentUriStr.contains(uri)) {
removeResource(Paths.get(URI.create(uri)));
}
}
};
}
}