refactor(LocalRunnerClient): introduce WatchContext and decouple build/processor state

This commit is contained in:
2025-12-31 23:11:15 +08:00
parent a6e33edc7a
commit 64b907707a

View File

@@ -47,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import static work.slhaf.partner.common.util.PathUtil.buildPathStr;
@@ -280,10 +281,6 @@ public class LocalRunnerClient extends RunnerClient {
}));
}
private LocalWatchServiceBuild registerWatchService(Path path) {
return new LocalWatchServiceBuild.BuildRegistry(path, watchService, executor);
}
private interface LocalWatchServiceBuild {
LocalWatchServiceBuild registerCreate(EventHandler handler);
@@ -295,28 +292,25 @@ public class LocalRunnerClient extends RunnerClient {
LocalWatchServiceBuild initialLoad(InitLoader loader);
void commit();
LocalWatchServiceBuild watchAll(boolean watchAll);
void commit(ExecutorService executor);
interface EventHandler {
void handle(Path thisDir, Path context);
}
interface InitLoader {
void load(Path path);
void load();
}
class BuildRegistry implements LocalWatchServiceBuild {
private final Map<WatchEvent.Kind<?>, EventHandler> handlers = new HashMap<>();
private final Path path;
private final WatchService watchService;
private final ExecutorService executor;
private InitLoader initLoader;
private BuildRegistry(Path path, WatchService watchService, ExecutorService executor) {
this.path = path;
this.watchService = watchService;
this.executor = executor;
private BuildRegistry(WatchContext ctx) {
this.ctx = ctx;
}
@Override
@@ -357,12 +351,12 @@ public class LocalRunnerClient extends RunnerClient {
private Runnable buildWatchTask() {
return () -> {
String pathStr = path.toString();
String pathStr = ctx.path.toString();
log.info("行动程序目录监听器已启动,监听目录: {}", pathStr);
while (true) {
WatchKey key;
try {
key = watchService.take();
key = ctx.watchService.take();
List<WatchEvent<?>> events = key.pollEvents();
for (WatchEvent<?> e : events) {
WatchEvent.Kind<?> kind = e.kind();
@@ -394,12 +388,20 @@ public class LocalRunnerClient extends RunnerClient {
}
}
private sealed static abstract class LocalWatchServiceHelper permits LocalWatchServiceHelper.Dynamic, LocalWatchServiceHelper.Desc, LocalWatchServiceHelper.Common {
private record WatchContext(Path path, WatchService watchService, Map<WatchKey, Path> watchKeys) {
private WatchContext(Path path, WatchService watchService) {
this(path, watchService, new HashMap<>());
}
}
private sealed static abstract class LocalWatchEventProcessor permits LocalWatchEventProcessor.Dynamic, LocalWatchEventProcessor.Desc, LocalWatchEventProcessor.Common {
protected final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions;
protected final WatchContext ctx;
private LocalWatchServiceHelper(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions) {
private LocalWatchEventProcessor(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, WatchContext ctx) {
this.existedMetaActions = existedMetaActions;
this.ctx = ctx;
}
protected abstract @NotNull LocalWatchServiceBuild.InitLoader buildLoad();
@@ -412,12 +414,12 @@ public class LocalRunnerClient extends RunnerClient {
protected abstract @NotNull LocalWatchServiceBuild.EventHandler buildOverflow();
private static final class Dynamic extends LocalWatchServiceHelper {
private static final class Dynamic extends LocalWatchEventProcessor {
private final McpStatelessAsyncServer dynamicActionMcpServer;
private Dynamic(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, McpStatelessAsyncServer dynamicActionMcpServer) {
super(existedMetaActions);
private Dynamic(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, McpStatelessAsyncServer dynamicActionMcpServer, WatchContext ctx) {
super(existedMetaActions, ctx);
this.dynamicActionMcpServer = dynamicActionMcpServer;
}
@@ -494,33 +496,36 @@ public class LocalRunnerClient extends RunnerClient {
@Override
@NotNull
protected LocalWatchServiceBuild.InitLoader buildLoad() {
return path -> {
// 从该路径列出已存在的目录,每个目录对应不同的行动程序及描述文件,从描述文件加载程序信息
File file = path.toFile();
if (file.isFile()) {
throw new ActionInitFailedException("未找到目录: " + path);
}
File[] files = file.listFiles();
if (files == null) {
throw new ActionInitFailedException("未正常读取目录: " + path);
}
for (File dir : files) {
if (!normalPath(dir.toPath())) {
continue;
}
File meta = new File(dir, "desc.json");
File program = null;
//noinspection DataFlowIssue
for (File f : dir.listFiles()) {
if (f.getName().startsWith("run.")) {
program = f;
}
}
// 从该路径列出已存在的目录,每个目录对应不同的行动程序及描述文件,从描述文件加载程序信息
return this::load;
}
MetaActionInfo info = JSONUtil.readJSONObject(meta, StandardCharsets.UTF_8).toBean(MetaActionInfo.class);
addAction(dir.getName(), info, program);
private void load() {
Path path = ctx.path;
File file = path.toFile();
if (file.isFile()) {
throw new ActionInitFailedException("未找到目录: " + path);
}
File[] files = file.listFiles();
if (files == null) {
throw new ActionInitFailedException("未正常读取目录: " + path);
}
for (File dir : files) {
if (!normalPath(dir.toPath())) {
continue;
}
};
File meta = new File(dir, "desc.json");
File program = null;
//noinspection DataFlowIssue
for (File f : dir.listFiles()) {
if (f.getName().startsWith("run.")) {
program = f;
}
}
MetaActionInfo info = JSONUtil.readJSONObject(meta, StandardCharsets.UTF_8).toBean(MetaActionInfo.class);
addAction(dir.getName(), info, program);
}
}
private McpStatelessServerFeatures.AsyncToolSpecification buildAsyncToolSpecification(MetaActionInfo info, File program, String actionKey, String name) {
@@ -626,12 +631,12 @@ public class LocalRunnerClient extends RunnerClient {
}
}
private static final class Desc extends LocalWatchServiceHelper {
private static final class Desc extends LocalWatchEventProcessor {
private final McpStatelessAsyncServer mcpDescServer;
private Desc(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, McpStatelessAsyncServer mcpDescServer) {
super(existedMetaActions);
private Desc(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, McpStatelessAsyncServer mcpDescServer, WatchContext ctx) {
super(existedMetaActions, ctx);
this.mcpDescServer = mcpDescServer;
}
@@ -666,12 +671,12 @@ public class LocalRunnerClient extends RunnerClient {
}
}
private static final class Common extends LocalWatchServiceHelper {
private static final class Common extends LocalWatchEventProcessor {
private final Map<String, McpSyncClient> mcpClients;
private Common(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, Map<String, McpSyncClient> mcpClients) {
super(existedMetaActions);
private Common(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, Map<String, McpSyncClient> mcpClients, WatchContext ctx) {
super(existedMetaActions, ctx);
this.mcpClients = mcpClients;
}