From 837a4c92d10589f07c3b89d844a1b75fd2038ce6 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Mon, 12 Jan 2026 21:46:34 +0800 Subject: [PATCH] fix(LocalRunnerClient): treat missing action dir as invalid path during DELETE in DynamicMcp Context: Action directories may already be removed when DELETE events are handled. Return null from loadFiles to signal invalid paths and lock behavior with DynamicAction watch tests. --- .../core/action/runner/LocalRunnerClient.java | 11 +- .../action/runner/LocalRunnerClientTest.java | 355 +++++++++++++----- 2 files changed, 269 insertions(+), 97 deletions(-) diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java index c947e2f6..0ae633ff 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/action/runner/LocalRunnerClient.java @@ -556,14 +556,11 @@ public class LocalRunnerClient extends RunnerClient { protected abstract @NotNull LocalWatchServiceBuild.EventHandler buildOverflow(); protected File[] loadFiles(Path root) { + // 在批量删除场景下,在接收到事件时目录等内容可能已被删除,此时不应该报错,而是返回一个‘异常值’ if (!Files.isDirectory(root)) { - throw new ActionInitFailedException("未找到目录: " + root); + return null; } - val files = root.toFile().listFiles(); - if (files == null) { - throw new ActionInitFailedException("目录无法正常读取: " + root); - } - return files; + return root.toFile().listFiles(); } @SuppressWarnings("LoggingSimilarMessage") @@ -579,7 +576,7 @@ public class LocalRunnerClient extends RunnerClient { @SuppressWarnings("BooleanMethodIsAlwaysInverted") private boolean normalPath(Path path) { val files = loadFiles(path); - if (files.length < 2) { + if (files == null || files.length < 2) { return false; } boolean desc = false; diff --git a/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java b/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java index 2a0dc383..4a77ff09 100644 --- a/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java +++ b/Partner-Main/src/test/java/work/slhaf/partner/core/action/runner/LocalRunnerClientTest.java @@ -1,126 +1,301 @@ package work.slhaf.partner.core.action.runner; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import work.slhaf.partner.core.action.entity.MetaAction; import work.slhaf.partner.core.action.entity.MetaActionInfo; -import work.slhaf.partner.core.action.entity.MetaActionType; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Map; -import java.util.Scanner; +import java.nio.file.StandardCopyOption; +import java.util.Comparator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.BooleanSupplier; -@SuppressWarnings("LoggingSimilarMessage") +import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Await.waitForCondition; +import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Common.getMetaActionInfo; +import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Fs.*; + @Slf4j public class LocalRunnerClientTest { - private static void writeRunFile(Path actionDir) throws IOException { - Path runPath = actionDir.resolve("run.py"); - log.debug("写入路径: {}", runPath); - Files.writeString(runPath, "print('ok')\n"); - } + @SuppressWarnings("LoggingSimilarMessage") + static class Fs { + static void writeRunFile(Path actionDir) throws IOException { + Path runPath = actionDir.resolve("run.py"); + log.debug("写入路径: {}", runPath); + Files.writeString(runPath, "print('ok')\n"); + } - private static @NotNull MetaAction buildTmpMetaAction() { - MetaAction metaAction = new MetaAction(); - metaAction.setIo(false); - metaAction.setName("hello_world"); - metaAction.setParams(Map.of("name", "origin_run")); - metaAction.setType(MetaActionType.ORIGIN); - metaAction.setLocation("/home/slhaf/Projects/IdeaProjects/Projects/Partner/Partner-Main/src/test/java/resources/action/tmp/hello_world.py"); - return metaAction; - } + static void writeInvalidDescJson(Path actionDir) throws IOException { + Path descPath = actionDir.resolve("desc.json"); + log.debug("写入路径: {}", descPath); + Files.writeString(descPath, "{ invalid json"); + } - private static void writeDescJson(Path actionDir, String description) throws IOException { - Path descPath = actionDir.resolve("desc.json"); - log.debug("写入路径: {}", descPath); - String json = "{\n" - + " \"io\": false,\n" - + " \"params\": {},\n" - + " \"description\": \"" + description + "\",\n" - + " \"tags\": [],\n" - + " \"preActions\": [],\n" - + " \"postActions\": [],\n" - + " \"strictDependencies\": false,\n" - + " \"responseSchema\": {}\n" - + "}\n"; - Files.writeString(descPath, json); - } + static void writeDescJson(Path actionDir, String description) throws IOException { + Path descPath = actionDir.resolve("desc.json"); + log.debug("写入路径: {}", descPath); + String json = "{\n" + + " \"io\": false,\n" + + " \"params\": {},\n" + + " \"description\": \"" + description + "\",\n" + + " \"tags\": [],\n" + + " \"preActions\": [],\n" + + " \"postActions\": [],\n" + + " \"strictDependencies\": false,\n" + + " \"responseSchema\": {}\n" + + "}\n"; + Files.writeString(descPath, json); + } - private static void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException { - long start = System.currentTimeMillis(); - while (!supplier.getAsBoolean()) { - if (System.currentTimeMillis() - start > timeoutMs) { - break; + @SuppressWarnings("SameParameterValue") + static void writeDescJsonAtomic(Path actionDir, String description) throws IOException { + Path descPath = actionDir.resolve("desc.json"); + Path tmpPath = actionDir.resolve("desc.json.tmp"); + String json = "{\n" + + " \"io\": false,\n" + + " \"params\": {},\n" + + " \"description\": \"" + description + "\",\n" + + " \"tags\": [],\n" + + " \"preActions\": [],\n" + + " \"postActions\": [],\n" + + " \"strictDependencies\": false,\n" + + " \"responseSchema\": {}\n" + + "}\n"; + Files.writeString(tmpPath, json); + Files.move(tmpPath, descPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } + + static void deleteDirectory(Path dir) throws IOException { + if (!Files.exists(dir)) { + return; + } + try (var stream = Files.walk(dir)) { + stream.sorted(Comparator.reverseOrder()).forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException ignored) { + } + }); + } + } + + } + + @SuppressWarnings("BusyWait") + static class Await { + static void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException { + long start = System.currentTimeMillis(); + while (!supplier.getAsBoolean()) { + if (System.currentTimeMillis() - start > timeoutMs) { + break; + } + Thread.sleep(50); } - Thread.sleep(50); } } - @Test - void testRunOrigin(@TempDir Path tempDir) { - LocalRunnerClient runnerClient = - new LocalRunnerClient( - new ConcurrentHashMap<>(), - Executors.newVirtualThreadPerTaskExecutor(), - tempDir.toString() - ); - - MetaAction metaAction = buildTmpMetaAction(); - RunnerClient.RunnerResponse runnerResponse = runnerClient.doRun(metaAction); - System.out.println(runnerResponse.getData()); + static class Common { + static MetaActionInfo getMetaActionInfo(ConcurrentHashMap existedMetaActions, + String actionKey) { + return existedMetaActions.get(actionKey); + } } - @Test - void testWatch(@TempDir Path tempDir) { - LocalRunnerClient runnerClient = - new LocalRunnerClient( - new ConcurrentHashMap<>(), - Executors.newVirtualThreadPerTaskExecutor(), - tempDir.toString() - ); - // 直接等待输入然后尝试触发各种文件监听事件即可 - System.out.println("Press any key to continue..."); - Scanner scanner = new Scanner(System.in); - scanner.next(); - } + @Nested + class DynamicMcpTest { - @Test - void testDynamicWatchCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException { - ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); - ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); - LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + @Test + void testDynamicWatchCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); - try { - Path dynamicRoot = tempDir.resolve("action").resolve("dynamic"); - Path actionDir = dynamicRoot.resolve("demo_action"); - Files.createDirectories(actionDir); + try { + Path dynamicRoot = tempDir.resolve("action").resolve("dynamic"); + Path actionDir = dynamicRoot.resolve("demo_action"); + Files.createDirectories(actionDir); + + Fs.writeRunFile(actionDir); + writeDescJson(actionDir, "demo action"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action")); + + // 触发一次 modify,确保监听线程能够捕捉到完整的 action 结构 + writeDescJson(actionDir, "demo action updated"); + + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action")); + + Files.deleteIfExists(actionDir.resolve("run.py")); + waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action"), 2000); + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action")); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testDynamicWatchOutOfOrderEvents(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + + try { + Path dynamicRoot = tempDir.resolve("action").resolve("dynamic"); + + Path actionDir = dynamicRoot.resolve("demo_action_order"); + Files.createDirectories(actionDir); + writeDescJson(actionDir, "desc first"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_order"), 500); + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_order")); + + writeRunFile(actionDir); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_order"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_order")); + + Path descOnlyDir = dynamicRoot.resolve("demo_action_desc_only"); + Files.createDirectories(descOnlyDir); + writeDescJson(descOnlyDir, "desc only"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_desc_only"), 500); + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_desc_only")); + + Path runOnlyDir = dynamicRoot.resolve("demo_action_run_only"); + Files.createDirectories(runOnlyDir); + writeRunFile(runOnlyDir); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_run_only"), 500); + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_run_only")); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testDynamicWatchAtomicDescOverwrite(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + + try { + Path dynamicRoot = tempDir.resolve("action").resolve("dynamic"); + Path actionDir = dynamicRoot.resolve("demo_action_atomic"); + Files.createDirectories(actionDir); + + writeRunFile(actionDir); + writeDescJson(actionDir, "before"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_atomic"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_atomic")); + + writeDescJsonAtomic(actionDir, "after"); + waitForCondition(() -> { + MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_atomic"); + return info != null && "after".equals(info.getDescription()); + }, 2000); + + MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_atomic"); + Assertions.assertNotNull(info); + Assertions.assertEquals("after", info.getDescription()); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testDynamicWatchRapidDescModify(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + + try { + Path dynamicRoot = tempDir.resolve("action").resolve("dynamic"); + Path actionDir = dynamicRoot.resolve("demo_action_rapid"); + Files.createDirectories(actionDir); + + writeRunFile(actionDir); + writeDescJson(actionDir, "v0"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_rapid"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_rapid")); + + String last = "v5"; + for (int i = 1; i <= 5; i++) { + writeDescJson(actionDir, "v" + i); + } + + waitForCondition(() -> { + MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_rapid"); + return info != null && last.equals(info.getDescription()); + }, 2000); + + MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_rapid"); + Assertions.assertNotNull(info); + Assertions.assertEquals(last, info.getDescription()); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testDynamicWatchDeleteBehavior(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); + + try { + Path dynamicRoot = tempDir.resolve("action").resolve("dynamic"); + Path actionDir = dynamicRoot.resolve("demo_action_delete"); + Files.createDirectories(actionDir); + Thread.sleep(100); + writeRunFile(actionDir); + writeDescJson(actionDir, "delete test"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_delete"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_delete")); + + Files.deleteIfExists(actionDir.resolve("run.py")); + waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action_delete"), 2000); + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_delete")); + + writeRunFile(actionDir); + writeDescJson(actionDir, "delete test restore"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_delete"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_delete")); + + deleteDirectory(actionDir); + waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action_delete"), 2000); + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_delete")); + } finally { + executor.shutdownNow(); + } + } + + @Test + void testDynamicWatchInvalidDescRecovery(@TempDir Path tempDir) throws IOException, InterruptedException { + ConcurrentHashMap existedMetaActions = new ConcurrentHashMap<>(); + ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + new LocalRunnerClient(existedMetaActions, executor, tempDir.toString()); Thread.sleep(100); + try { + Path dynamicRoot = tempDir.resolve("action").resolve("dynamic"); + Path actionDir = dynamicRoot.resolve("demo_action_invalid"); + Files.createDirectories(actionDir); - writeRunFile(actionDir); - writeDescJson(actionDir, "demo action"); - waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000); - Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action")); + writeRunFile(actionDir); + writeInvalidDescJson(actionDir); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_invalid"), 500); + Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_invalid")); - // 触发一次 modify,确保监听线程能够捕捉到完整的 action 结构 - writeDescJson(actionDir, "demo action updated"); - - waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000); - Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action")); - - Files.deleteIfExists(actionDir.resolve("run.py")); - waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action"), 2000); - Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action")); - } finally { - executor.shutdownNow(); + writeDescJson(actionDir, "fixed"); + waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_invalid"), 2000); + Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_invalid")); + } finally { + executor.shutdownNow(); + } } } + }