fix(LocalRunnerClient): treat missing action dir as invalid path during DELETE in DynamicMcp

Context:
Action directories may already be removed when DELETE events are handled.
Return null from loadFiles to signal invalid paths and lock behavior with DynamicAction watch tests.
This commit is contained in:
2026-01-12 21:46:34 +08:00
parent ddd999d47b
commit 837a4c92d1
2 changed files with 269 additions and 97 deletions

View File

@@ -556,14 +556,11 @@ public class LocalRunnerClient extends RunnerClient {
protected abstract @NotNull LocalWatchServiceBuild.EventHandler buildOverflow();
protected File[] loadFiles(Path root) {
// 在批量删除场景下,在接收到事件时目录等内容可能已被删除,此时不应该报错,而是返回一个‘异常值’
if (!Files.isDirectory(root)) {
throw new ActionInitFailedException("未找到目录: " + root);
return null;
}
val files = root.toFile().listFiles();
if (files == null) {
throw new ActionInitFailedException("目录无法正常读取: " + root);
}
return files;
return root.toFile().listFiles();
}
@SuppressWarnings("LoggingSimilarMessage")
@@ -579,7 +576,7 @@ public class LocalRunnerClient extends RunnerClient {
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
private boolean normalPath(Path path) {
val files = loadFiles(path);
if (files.length < 2) {
if (files == null || files.length < 2) {
return false;
}
boolean desc = false;

View File

@@ -1,126 +1,301 @@
package work.slhaf.partner.core.action.runner;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.MetaActionType;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Scanner;
import java.nio.file.StandardCopyOption;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BooleanSupplier;
@SuppressWarnings("LoggingSimilarMessage")
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Await.waitForCondition;
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Common.getMetaActionInfo;
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Fs.*;
@Slf4j
public class LocalRunnerClientTest {
private static void writeRunFile(Path actionDir) throws IOException {
Path runPath = actionDir.resolve("run.py");
log.debug("写入路径: {}", runPath);
Files.writeString(runPath, "print('ok')\n");
}
@SuppressWarnings("LoggingSimilarMessage")
static class Fs {
static void writeRunFile(Path actionDir) throws IOException {
Path runPath = actionDir.resolve("run.py");
log.debug("写入路径: {}", runPath);
Files.writeString(runPath, "print('ok')\n");
}
private static @NotNull MetaAction buildTmpMetaAction() {
MetaAction metaAction = new MetaAction();
metaAction.setIo(false);
metaAction.setName("hello_world");
metaAction.setParams(Map.of("name", "origin_run"));
metaAction.setType(MetaActionType.ORIGIN);
metaAction.setLocation("/home/slhaf/Projects/IdeaProjects/Projects/Partner/Partner-Main/src/test/java/resources/action/tmp/hello_world.py");
return metaAction;
}
static void writeInvalidDescJson(Path actionDir) throws IOException {
Path descPath = actionDir.resolve("desc.json");
log.debug("写入路径: {}", descPath);
Files.writeString(descPath, "{ invalid json");
}
private static void writeDescJson(Path actionDir, String description) throws IOException {
Path descPath = actionDir.resolve("desc.json");
log.debug("写入路径: {}", descPath);
String json = "{\n"
+ " \"io\": false,\n"
+ " \"params\": {},\n"
+ " \"description\": \"" + description + "\",\n"
+ " \"tags\": [],\n"
+ " \"preActions\": [],\n"
+ " \"postActions\": [],\n"
+ " \"strictDependencies\": false,\n"
+ " \"responseSchema\": {}\n"
+ "}\n";
Files.writeString(descPath, json);
}
static void writeDescJson(Path actionDir, String description) throws IOException {
Path descPath = actionDir.resolve("desc.json");
log.debug("写入路径: {}", descPath);
String json = "{\n"
+ " \"io\": false,\n"
+ " \"params\": {},\n"
+ " \"description\": \"" + description + "\",\n"
+ " \"tags\": [],\n"
+ " \"preActions\": [],\n"
+ " \"postActions\": [],\n"
+ " \"strictDependencies\": false,\n"
+ " \"responseSchema\": {}\n"
+ "}\n";
Files.writeString(descPath, json);
}
private static void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException {
long start = System.currentTimeMillis();
while (!supplier.getAsBoolean()) {
if (System.currentTimeMillis() - start > timeoutMs) {
break;
@SuppressWarnings("SameParameterValue")
static void writeDescJsonAtomic(Path actionDir, String description) throws IOException {
Path descPath = actionDir.resolve("desc.json");
Path tmpPath = actionDir.resolve("desc.json.tmp");
String json = "{\n"
+ " \"io\": false,\n"
+ " \"params\": {},\n"
+ " \"description\": \"" + description + "\",\n"
+ " \"tags\": [],\n"
+ " \"preActions\": [],\n"
+ " \"postActions\": [],\n"
+ " \"strictDependencies\": false,\n"
+ " \"responseSchema\": {}\n"
+ "}\n";
Files.writeString(tmpPath, json);
Files.move(tmpPath, descPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
static void deleteDirectory(Path dir) throws IOException {
if (!Files.exists(dir)) {
return;
}
try (var stream = Files.walk(dir)) {
stream.sorted(Comparator.reverseOrder()).forEach(path -> {
try {
Files.deleteIfExists(path);
} catch (IOException ignored) {
}
});
}
}
}
@SuppressWarnings("BusyWait")
static class Await {
static void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException {
long start = System.currentTimeMillis();
while (!supplier.getAsBoolean()) {
if (System.currentTimeMillis() - start > timeoutMs) {
break;
}
Thread.sleep(50);
}
Thread.sleep(50);
}
}
@Test
void testRunOrigin(@TempDir Path tempDir) {
LocalRunnerClient runnerClient =
new LocalRunnerClient(
new ConcurrentHashMap<>(),
Executors.newVirtualThreadPerTaskExecutor(),
tempDir.toString()
);
MetaAction metaAction = buildTmpMetaAction();
RunnerClient.RunnerResponse runnerResponse = runnerClient.doRun(metaAction);
System.out.println(runnerResponse.getData());
static class Common {
static MetaActionInfo getMetaActionInfo(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions,
String actionKey) {
return existedMetaActions.get(actionKey);
}
}
@Test
void testWatch(@TempDir Path tempDir) {
LocalRunnerClient runnerClient =
new LocalRunnerClient(
new ConcurrentHashMap<>(),
Executors.newVirtualThreadPerTaskExecutor(),
tempDir.toString()
);
// 直接等待输入然后尝试触发各种文件监听事件即可
System.out.println("Press any key to continue...");
Scanner scanner = new Scanner(System.in);
scanner.next();
}
@Nested
class DynamicMcpTest {
@Test
void testDynamicWatchCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
@Test
void testDynamicWatchCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action");
Files.createDirectories(actionDir);
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action");
Files.createDirectories(actionDir);
Fs.writeRunFile(actionDir);
writeDescJson(actionDir, "demo action");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action"));
// 触发一次 modify确保监听线程能够捕捉到完整的 action 结构
writeDescJson(actionDir, "demo action updated");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action"));
Files.deleteIfExists(actionDir.resolve("run.py"));
waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action"));
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchOutOfOrderEvents(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_order");
Files.createDirectories(actionDir);
writeDescJson(actionDir, "desc first");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_order"), 500);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_order"));
writeRunFile(actionDir);
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_order"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_order"));
Path descOnlyDir = dynamicRoot.resolve("demo_action_desc_only");
Files.createDirectories(descOnlyDir);
writeDescJson(descOnlyDir, "desc only");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_desc_only"), 500);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_desc_only"));
Path runOnlyDir = dynamicRoot.resolve("demo_action_run_only");
Files.createDirectories(runOnlyDir);
writeRunFile(runOnlyDir);
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_run_only"), 500);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_run_only"));
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchAtomicDescOverwrite(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_atomic");
Files.createDirectories(actionDir);
writeRunFile(actionDir);
writeDescJson(actionDir, "before");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_atomic"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_atomic"));
writeDescJsonAtomic(actionDir, "after");
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_atomic");
return info != null && "after".equals(info.getDescription());
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_atomic");
Assertions.assertNotNull(info);
Assertions.assertEquals("after", info.getDescription());
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchRapidDescModify(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_rapid");
Files.createDirectories(actionDir);
writeRunFile(actionDir);
writeDescJson(actionDir, "v0");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_rapid"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_rapid"));
String last = "v5";
for (int i = 1; i <= 5; i++) {
writeDescJson(actionDir, "v" + i);
}
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_rapid");
return info != null && last.equals(info.getDescription());
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_rapid");
Assertions.assertNotNull(info);
Assertions.assertEquals(last, info.getDescription());
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchDeleteBehavior(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_delete");
Files.createDirectories(actionDir);
Thread.sleep(100);
writeRunFile(actionDir);
writeDescJson(actionDir, "delete test");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_delete"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_delete"));
Files.deleteIfExists(actionDir.resolve("run.py"));
waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action_delete"), 2000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_delete"));
writeRunFile(actionDir);
writeDescJson(actionDir, "delete test restore");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_delete"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_delete"));
deleteDirectory(actionDir);
waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action_delete"), 2000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_delete"));
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchInvalidDescRecovery(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
Thread.sleep(100);
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_invalid");
Files.createDirectories(actionDir);
writeRunFile(actionDir);
writeDescJson(actionDir, "demo action");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action"));
writeRunFile(actionDir);
writeInvalidDescJson(actionDir);
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_invalid"), 500);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_invalid"));
// 触发一次 modify确保监听线程能够捕捉到完整的 action 结构
writeDescJson(actionDir, "demo action updated");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action"));
Files.deleteIfExists(actionDir.resolve("run.py"));
waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action"));
} finally {
executor.shutdownNow();
writeDescJson(actionDir, "fixed");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_invalid"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_invalid"));
} finally {
executor.shutdownNow();
}
}
}
}