mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
refactor(LocalRunnerClient): replace ActionWatchService with unified watch service builder.
Context: ActionWatchService was used to support SCRIPT and PLUGIN type actions loading from local FileSystem, this refactor allows register different paths to watch.
This commit is contained in:
@@ -25,7 +25,6 @@ import work.slhaf.partner.core.action.entity.MetaAction;
|
|||||||
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||||
import work.slhaf.partner.core.action.entity.MetaActionType;
|
import work.slhaf.partner.core.action.entity.MetaActionType;
|
||||||
import work.slhaf.partner.core.action.exception.ActionInitFailedException;
|
import work.slhaf.partner.core.action.exception.ActionInitFailedException;
|
||||||
import work.slhaf.partner.core.action.exception.ActionLoadFailedException;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@@ -41,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
|
|
||||||
import static work.slhaf.partner.common.Constant.Path.TMP_ACTION_DIR_LOCAL;
|
import static work.slhaf.partner.common.Constant.Path.TMP_ACTION_DIR_LOCAL;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@@ -52,11 +50,15 @@ public class LocalRunnerClient extends RunnerClient {
|
|||||||
* 动态生成的行动程序都将挂载至该 McpServer
|
* 动态生成的行动程序都将挂载至该 McpServer
|
||||||
*/
|
*/
|
||||||
private McpStatelessAsyncServer dynamicActionMcpServer;
|
private McpStatelessAsyncServer dynamicActionMcpServer;
|
||||||
|
private final WatchService watchService;
|
||||||
|
|
||||||
public LocalRunnerClient(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService executor, @Nullable String actionWatchPath) {
|
public LocalRunnerClient(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService executor, @Nullable String actionWatchPath) {
|
||||||
super(existedMetaActions, executor);
|
super(existedMetaActions, executor);
|
||||||
ActionWatchService watchService = new ActionWatchService(actionWatchPath);
|
try {
|
||||||
watchService.launch();
|
watchService = FileSystems.getDefault().newWatchService();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ActionInitFailedException("目录监听器启动失败", e);
|
||||||
|
}
|
||||||
registerDynamicActionMcp();
|
registerDynamicActionMcp();
|
||||||
setupShutdownHook();
|
setupShutdownHook();
|
||||||
}
|
}
|
||||||
@@ -308,11 +310,127 @@ public class LocalRunnerClient extends RunnerClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void setupShutdownHook() {
|
private void setupShutdownHook() {
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||||
dynamicActionMcpServer.close();
|
dynamicActionMcpServer.close();
|
||||||
this.mcpClients.forEach((id, client) -> {
|
this.mcpClients.forEach((id, client) -> {
|
||||||
client.close();
|
client.close();
|
||||||
log.info("[{}] MCP-Client 已关闭", id);
|
log.info("[{}] MCP-Client 已关闭", id);
|
||||||
});
|
});
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
private WatchServiceBuild registerWatchService(Path path) {
|
||||||
|
return new LocalWatchServiceHelper(path, watchService, executor);
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface WatchServiceBuild {
|
||||||
|
WatchServiceBuild registerCreate(WatchEventHandler handler);
|
||||||
|
|
||||||
|
WatchServiceBuild registerModify(WatchEventHandler handler);
|
||||||
|
|
||||||
|
WatchServiceBuild registerDelete(WatchEventHandler handler);
|
||||||
|
|
||||||
|
WatchServiceBuild registerOverflow(WatchEventHandler handler);
|
||||||
|
|
||||||
|
WatchServiceBuild initialLoad(WatchInitLoader loader);
|
||||||
|
|
||||||
|
void commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface WatchEventHandler {
|
||||||
|
void handle(Path thisDir, Path context);
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface WatchInitLoader {
|
||||||
|
void load(Path path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class LocalWatchServiceHelper implements WatchServiceBuild {
|
||||||
|
|
||||||
|
private final Map<WatchEvent.Kind<?>, WatchEventHandler> handlers = new HashMap<>();
|
||||||
|
private final Path path;
|
||||||
|
private final WatchService watchService;
|
||||||
|
private final ExecutorService executor;
|
||||||
|
private WatchInitLoader initLoader;
|
||||||
|
|
||||||
|
private LocalWatchServiceHelper(Path path, WatchService watchService, ExecutorService executor) {
|
||||||
|
this.path = path;
|
||||||
|
this.watchService = watchService;
|
||||||
|
this.executor = executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WatchServiceBuild registerCreate(WatchEventHandler handler) {
|
||||||
|
handlers.put(StandardWatchEventKinds.ENTRY_CREATE, handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WatchServiceBuild registerModify(WatchEventHandler handler) {
|
||||||
|
handlers.put(StandardWatchEventKinds.ENTRY_MODIFY, handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WatchServiceBuild registerDelete(WatchEventHandler handler) {
|
||||||
|
handlers.put(StandardWatchEventKinds.ENTRY_DELETE, handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WatchServiceBuild registerOverflow(WatchEventHandler handler) {
|
||||||
|
handlers.put(StandardWatchEventKinds.OVERFLOW, handler);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WatchServiceBuild initialLoad(WatchInitLoader loader) {
|
||||||
|
initLoader = loader;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void commit() {
|
||||||
|
if (initLoader != null) initLoader.load(path);
|
||||||
|
executor.execute(buildWatchTask());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Runnable buildWatchTask() {
|
||||||
|
return () -> {
|
||||||
|
String pathStr = path.toString();
|
||||||
|
log.info("行动程序目录监听器已启动,监听目录: {}", pathStr);
|
||||||
|
while (true) {
|
||||||
|
WatchKey key;
|
||||||
|
try {
|
||||||
|
key = watchService.take();
|
||||||
|
List<WatchEvent<?>> events = key.pollEvents();
|
||||||
|
for (WatchEvent<?> e : events) {
|
||||||
|
WatchEvent.Kind<?> kind = e.kind();
|
||||||
|
Object context = e.context();
|
||||||
|
log.info("行动程序目录变更事件: {} - {} - {}", pathStr, kind.name(), context);
|
||||||
|
Path thisDir = (Path) key.watchable();
|
||||||
|
if (!thisDir.equals(path)) {
|
||||||
|
// 若事件所在目录不为为 path,忽略并步入下一轮循环
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
WatchEventHandler handler = handlers.get(kind);
|
||||||
|
if (handler == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
handler.handle(thisDir, context instanceof Path ? (Path) context : null);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.info("监听线程被中断,准备退出...");
|
||||||
|
Thread.currentThread().interrupt(); // 恢复中断标志
|
||||||
|
break;
|
||||||
|
} catch (ClosedWatchServiceException e) {
|
||||||
|
log.info("WatchService 已关闭,监听线程退出。");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private sealed abstract static class McpClientTransportParams permits HttpMcpClientTransportParams, StdioMcpClientTransportParams {
|
private sealed abstract static class McpClientTransportParams permits HttpMcpClientTransportParams, StdioMcpClientTransportParams {
|
||||||
@@ -355,182 +473,4 @@ public class LocalRunnerClient extends RunnerClient {
|
|||||||
private String total;
|
private String total;
|
||||||
private List<String> resultList;
|
private List<String> resultList;
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO 逻辑待更新,用以适配 MCP 服务的及时发现与注册
|
|
||||||
private class ActionWatchService {
|
|
||||||
|
|
||||||
private final HashMap<Path, WatchKey> registeredPaths = new HashMap<>();
|
|
||||||
private final String actionWatchPath;
|
|
||||||
|
|
||||||
private ActionWatchService(String actionWatchPath) {
|
|
||||||
this.actionWatchPath = actionWatchPath;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void launch() {
|
|
||||||
Path path = Path.of(actionWatchPath != null ? actionWatchPath : ACTION_PROGRAM);
|
|
||||||
scanActions(path.toFile());
|
|
||||||
launchActionDirectoryWatcher(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void launchActionDirectoryWatcher(Path path) {
|
|
||||||
WatchService watchService;
|
|
||||||
try {
|
|
||||||
watchService = FileSystems.getDefault().newWatchService();
|
|
||||||
setupShutdownHook(watchService);
|
|
||||||
registerParentToWatch(path, watchService);
|
|
||||||
registerSubToWatch(path, watchService);
|
|
||||||
executor.execute(registerWatchTask(path, watchService));
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ActionInitFailedException("行动程序目录监听器启动失败", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setupShutdownHook(WatchService watchService) {
|
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
|
||||||
try {
|
|
||||||
watchService.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
private Runnable registerWatchTask(Path path, WatchService watchService) {
|
|
||||||
return () -> {
|
|
||||||
log.info("行动程序目录监听器已启动");
|
|
||||||
while (true) {
|
|
||||||
WatchKey key;
|
|
||||||
try {
|
|
||||||
key = watchService.take();
|
|
||||||
List<WatchEvent<?>> events = key.pollEvents();
|
|
||||||
for (WatchEvent<?> e : events) {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
WatchEvent<Path> event = (WatchEvent<Path>) e;
|
|
||||||
WatchEvent.Kind<Path> kind = event.kind();
|
|
||||||
Path context = event.context();
|
|
||||||
log.info("行动程序目录变更事件: {} - {}", kind.name(), context.toString());
|
|
||||||
Path thisDir = (Path) key.watchable();
|
|
||||||
// 根据事件发生的目录进行分流,分为父目录事件和子程序事件
|
|
||||||
if (thisDir.equals(path)) {
|
|
||||||
handleParentDirEvent(kind, thisDir, context, watchService);
|
|
||||||
} else {
|
|
||||||
handleSubDirEvent(kind, thisDir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.info("监听线程被中断,准备退出...");
|
|
||||||
Thread.currentThread().interrupt(); // 恢复中断标志
|
|
||||||
break;
|
|
||||||
} catch (ClosedWatchServiceException e) {
|
|
||||||
log.info("WatchService 已关闭,监听线程退出。");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleSubDirEvent(WatchEvent.Kind<Path> kind, Path thisDir) {
|
|
||||||
// path为触发本次行动的文件的路径(当前位于某个action目录下)
|
|
||||||
// 先判定发生的目录前缀是否匹配(action、desc),否则忽略
|
|
||||||
if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
|
|
||||||
// CREATE、MODIFY 事件将触发一次检测,看当前thisDir中action和desc是否都具备,如果通过则尝试加载(put)。
|
|
||||||
boolean complete = checkComplete(thisDir);
|
|
||||||
if (!complete)
|
|
||||||
return;
|
|
||||||
try {
|
|
||||||
MetaActionInfo newActionInfo = new MetaActionInfo();
|
|
||||||
existedMetaActions.put(thisDir.toString(), newActionInfo);
|
|
||||||
} catch (ActionLoadFailedException e) {
|
|
||||||
log.warn("行动信息重新加载失败,触发行为: {}", kind.name());
|
|
||||||
}
|
|
||||||
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
|
|
||||||
// DELETE 事件将会把该 MetaActionInfo 从记录中移除
|
|
||||||
existedMetaActions.remove(thisDir.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean checkComplete(Path thisDir) {
|
|
||||||
File[] files = thisDir.toFile().listFiles();
|
|
||||||
if (files == null) {
|
|
||||||
log.error("当前目录无法访问: [{}]", thisDir);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
boolean existedAction = false;
|
|
||||||
boolean existedDesc = false;
|
|
||||||
for (File file : files) {
|
|
||||||
String fileName = file.getName();
|
|
||||||
String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.'));
|
|
||||||
if (nameWithoutExt.equals("action"))
|
|
||||||
existedAction = true;
|
|
||||||
else if (nameWithoutExt.equals("desc"))
|
|
||||||
existedDesc = true;
|
|
||||||
}
|
|
||||||
return existedAction && existedDesc;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleParentDirEvent(WatchEvent.Kind<Path> kind, Path thisDir, Path context,
|
|
||||||
WatchService watchService) {
|
|
||||||
Path path = Path.of(thisDir.toString(), context.toString());
|
|
||||||
// MODIFY 事件不进行处理
|
|
||||||
if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
|
|
||||||
try {
|
|
||||||
path.register(watchService,
|
|
||||||
StandardWatchEventKinds.ENTRY_CREATE,
|
|
||||||
StandardWatchEventKinds.ENTRY_DELETE,
|
|
||||||
StandardWatchEventKinds.ENTRY_MODIFY);
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("新增行动程序目录监听失败: {}", path, e);
|
|
||||||
}
|
|
||||||
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
|
|
||||||
WatchKey remove = registeredPaths.remove(path);
|
|
||||||
remove.cancel();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void registerSubToWatch(Path path, WatchService watchService) throws IOException {
|
|
||||||
Files.walkFileTree(path, new SimpleFileVisitor<>() {
|
|
||||||
@Override
|
|
||||||
public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs)
|
|
||||||
throws IOException {
|
|
||||||
if (dir.getFileName().startsWith("."))
|
|
||||||
return FileVisitResult.CONTINUE;
|
|
||||||
WatchKey key = dir.register(watchService,
|
|
||||||
StandardWatchEventKinds.ENTRY_CREATE,
|
|
||||||
StandardWatchEventKinds.ENTRY_DELETE,
|
|
||||||
StandardWatchEventKinds.ENTRY_MODIFY);
|
|
||||||
registeredPaths.put(dir, key);
|
|
||||||
return FileVisitResult.CONTINUE;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void registerParentToWatch(Path path, WatchService watchService) throws IOException {
|
|
||||||
WatchKey key = path.register(watchService,
|
|
||||||
StandardWatchEventKinds.ENTRY_CREATE,
|
|
||||||
StandardWatchEventKinds.ENTRY_DELETE,
|
|
||||||
StandardWatchEventKinds.ENTRY_MODIFY);
|
|
||||||
registeredPaths.put(path, key);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void scanActions(File file) {
|
|
||||||
if (!file.exists() || file.isFile()) {
|
|
||||||
throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath());
|
|
||||||
}
|
|
||||||
File[] files = file.listFiles();
|
|
||||||
if (files == null) {
|
|
||||||
throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath());
|
|
||||||
}
|
|
||||||
for (File f : files) {
|
|
||||||
try {
|
|
||||||
MetaActionInfo actionInfo = new MetaActionInfo();
|
|
||||||
existedMetaActions.put(f.getName(), actionInfo);
|
|
||||||
// log.info("行动程序[{}]已加载", actionInfo.getKey());
|
|
||||||
} catch (ActionLoadFailedException e) {
|
|
||||||
log.warn("行动程序加载失败", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user