From 29d6546b078ff24291553c96d8fdd7c14d3ef530 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Wed, 1 Apr 2026 23:43:19 +0800 Subject: [PATCH] feat(config): support ConfigCenter file watching and registered json reloads --- .../api/agent/runtime/config/ConfigCenter.kt | 130 ++++++++- .../runtime/config/ConfigCenterTest.java | 255 ++++++++++++++++++ 2 files changed, 375 insertions(+), 10 deletions(-) create mode 100644 Partner-Framework/src/test/java/work/slhaf/partner/api/agent/runtime/config/ConfigCenterTest.java diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/config/ConfigCenter.kt b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/config/ConfigCenter.kt index 3fcae216..245da4d6 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/config/ConfigCenter.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/api/agent/runtime/config/ConfigCenter.kt @@ -1,21 +1,25 @@ package work.slhaf.partner.api.agent.runtime.config +import com.alibaba.fastjson2.JSON +import org.slf4j.LoggerFactory +import work.slhaf.partner.api.common.support.DirectoryWatchSupport +import java.io.IOException +import java.nio.charset.StandardCharsets +import java.nio.file.Files import java.nio.file.Path +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors -object ConfigCenter { +object ConfigCenter : AutoCloseable { + private val log = LoggerFactory.getLogger(ConfigCenter::class.java) val paths = resolvePaths() private val registrations = mutableMapOf>() + private var watchExecutor: ExecutorService? = null + private var watchSupport: DirectoryWatchSupport? = null + @Synchronized fun register(configurable: Configurable) { - - fun normalizeRelativePath(path: Path): Path { - require(!path.isAbsolute) { - "Config path must be relative: $path" - } - return path.normalize() - } - val declared = configurable.declare() val normalized = mutableMapOf>() @@ -36,6 +40,112 @@ object ConfigCenter { registrations.putAll(normalized) } + @Synchronized + fun startWatching() { + if (watchSupport != null) { + return + } + + val executor = Executors.newVirtualThreadPerTaskExecutor() + val support = DirectoryWatchSupport( + DirectoryWatchSupport.Context(paths.configDir), + executor, + -1 + ) { + reconcileAll() + }.onCreate(this::handleUpsert) + .onModify(this::handleUpsert) + .onDelete(this::handleDelete) + .onOverflow { _, _ -> reconcileAll() } + + watchExecutor = executor + watchSupport = support + support.start() + log.info("ConfigCenter 文件监听注册完毕: {}", paths.configDir) + } + + private fun handleUpsert(thisDir: Path, context: Path?) { + if (context == null || !Files.isRegularFile(context) || !isJsonFile(context)) { + return + } + reloadIfRegistered(context) + } + + private fun handleDelete(thisDir: Path, context: Path?) { + if (context == null || !isJsonFile(context)) { + return + } + val relativePath = toRelativeConfigPath(context) ?: return + if (!registrations.containsKey(relativePath)) { + return + } + log.info("Config deleted, skipped reload: {}", relativePath) + } + + private fun reconcileAll() { + val configDir = paths.configDir + if (!Files.isDirectory(configDir)) { + return + } + Files.walk(configDir).use { stream -> + stream.filter(Files::isRegularFile) + .filter(::isJsonFile) + .forEach(this::reloadIfRegistered) + } + } + + private fun reloadIfRegistered(file: Path) { + val relativePath = toRelativeConfigPath(file) ?: return + val registration = registrations[relativePath] ?: return + try { + val config = loadConfig(file, registration) + notifyReload(registration, config) + } catch (e: Exception) { + log.error("Config reload failed: {}", relativePath, e) + } + } + + private fun loadConfig(file: Path, registration: ConfigRegistration): Config { + return JSON.parseObject(Files.readString(file, StandardCharsets.UTF_8), registration.type()) as Config + } + + @Suppress("UNCHECKED_CAST") + private fun notifyReload(registration: ConfigRegistration, config: Config) { + (registration as ConfigRegistration).onReload(config) + } + + private fun toRelativeConfigPath(file: Path): Path? { + val normalizedFile = file.toAbsolutePath().normalize() + val normalizedConfigDir = paths.configDir.toAbsolutePath().normalize() + if (!normalizedFile.startsWith(normalizedConfigDir)) { + return null + } + return normalizedConfigDir.relativize(normalizedFile).normalize() + } + + private fun isJsonFile(path: Path): Boolean { + return path.fileName.toString().endsWith(".json") + } + + @Synchronized + override fun close() { + try { + watchSupport?.close() + } catch (e: IOException) { + log.warn("Failed to close ConfigCenter watch service", e) + } finally { + watchSupport = null + } + watchExecutor?.shutdownNow() + watchExecutor = null + } + + private fun normalizeRelativePath(path: Path): Path { + require(!path.isAbsolute) { + "Config path must be relative: $path" + } + return path.normalize() + } } abstract class Config @@ -51,4 +161,4 @@ interface ConfigRegistration { fun type(): Class fun init(config: T) fun onReload(config: T) {} -} \ No newline at end of file +} diff --git a/Partner-Framework/src/test/java/work/slhaf/partner/api/agent/runtime/config/ConfigCenterTest.java b/Partner-Framework/src/test/java/work/slhaf/partner/api/agent/runtime/config/ConfigCenterTest.java new file mode 100644 index 00000000..f61096f9 --- /dev/null +++ b/Partner-Framework/src/test/java/work/slhaf/partner/api/agent/runtime/config/ConfigCenterTest.java @@ -0,0 +1,255 @@ +package work.slhaf.partner.api.agent.runtime.config; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BooleanSupplier; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class ConfigCenterTest { + + private static final Path INITIAL_PATH = Path.of("root-initial.json"); + private static final Path NESTED_PATH = Path.of("nested", "child.json"); + private static final Path DELETE_PATH = Path.of("delete", "target.json"); + private static final Path INVALID_PATH = Path.of("invalid", "target.json"); + private static final Path IDEMPOTENT_PATH = Path.of("idempotent", "target.json"); + + private static String originalUserHome; + private static Path configDir; + private static TrackingRegistration initialRegistration; + private static TrackingRegistration nestedRegistration; + private static TrackingRegistration deleteRegistration; + private static TrackingRegistration invalidRegistration; + private static TrackingRegistration idempotentRegistration; + + @BeforeAll + static void beforeAll(@TempDir Path tempUserHome) throws Exception { + Assumptions.assumeTrue(System.getenv("PARTNER_HOME") == null, + "PARTNER_HOME is set; user.home based ConfigCenter test is not applicable."); + + originalUserHome = System.getProperty("user.home"); + System.setProperty("user.home", tempUserHome.toString()); + + initialRegistration = new TrackingRegistration(); + nestedRegistration = new TrackingRegistration(); + deleteRegistration = new TrackingRegistration(); + invalidRegistration = new TrackingRegistration(); + idempotentRegistration = new TrackingRegistration(); + + configDir = ConfigCenter.INSTANCE.getPaths().getConfigDir(); + Files.createDirectories(configDir); + Files.createDirectories(configDir.resolve(NESTED_PATH).getParent()); + Files.createDirectories(configDir.resolve(DELETE_PATH).getParent()); + Files.createDirectories(configDir.resolve(INVALID_PATH).getParent()); + Files.createDirectories(configDir.resolve(IDEMPOTENT_PATH).getParent()); + writeJson(configDir.resolve(INITIAL_PATH), "initial", 1); + + ConfigCenter.INSTANCE.register(() -> { + Map> declared = new LinkedHashMap<>(); + declared.put(INITIAL_PATH, initialRegistration); + declared.put(NESTED_PATH, nestedRegistration); + declared.put(DELETE_PATH, deleteRegistration); + declared.put(INVALID_PATH, invalidRegistration); + declared.put(IDEMPOTENT_PATH, idempotentRegistration); + return declared; + }); + ConfigCenter.INSTANCE.startWatching(); + } + + @AfterAll + static void afterAll() { + ConfigCenter.INSTANCE.close(); + if (originalUserHome == null) { + System.clearProperty("user.home"); + } else { + System.setProperty("user.home", originalUserHome); + } + } + + private static int totalReloadCount() { + return initialRegistration.reloadCount() + + nestedRegistration.reloadCount() + + deleteRegistration.reloadCount() + + invalidRegistration.reloadCount() + + idempotentRegistration.reloadCount(); + } + + private static void writeJson(Path file, String name, int version) throws IOException { + Files.createDirectories(file.getParent()); + Files.writeString(file, + "{\"name\":\"" + name + "\",\"version\":" + version + "}", + StandardCharsets.UTF_8); + } + + private static void waitForCount(TrackingRegistration registration, int expectedCount, long timeoutMs) + throws InterruptedException { + waitForCondition(() -> registration.reloadCount() >= expectedCount, timeoutMs); + } + + private static void waitForConfig(TrackingRegistration registration, String expectedName, int expectedVersion, + long timeoutMs) throws InterruptedException { + waitForCondition(() -> hasConfig(registration, expectedName, expectedVersion), timeoutMs); + } + + private static boolean hasConfig(TrackingRegistration registration, String expectedName, int expectedVersion) { + TestConfig config = registration.lastConfig(); + return config != null + && expectedName.equals(config.name) + && expectedVersion == config.version; + } + + private static void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException { + long start = System.currentTimeMillis(); + while (!supplier.getAsBoolean()) { + if (System.currentTimeMillis() - start > timeoutMs) { + break; + } + Thread.sleep(50); + } + Assertions.assertTrue(supplier.getAsBoolean(), "Condition was not satisfied within " + timeoutMs + " ms"); + } + + @Test + @Order(1) + void testInitialReconcileReloadsRegisteredJson() throws Exception { + waitForCount(initialRegistration, 1, 3000); + + Assertions.assertEquals(1, initialRegistration.reloadCount()); + Assertions.assertEquals("initial", initialRegistration.lastConfig().name); + Assertions.assertEquals(1, initialRegistration.lastConfig().version); + } + + @Test + @Order(2) + void testNestedJsonCreateAndModifyTriggersReload() throws Exception { + Path file = configDir.resolve(NESTED_PATH); + + writeJson(file, "nested-create", 1); + waitForConfig(nestedRegistration, "nested-create", 1, 3000); + Assertions.assertEquals("nested-create", nestedRegistration.lastConfig().name); + Assertions.assertEquals(1, nestedRegistration.lastConfig().version); + + int baseline = nestedRegistration.reloadCount(); + writeJson(file, "nested-modify", 2); + waitForCondition(() -> nestedRegistration.reloadCount() > baseline + && hasConfig(nestedRegistration, "nested-modify", 2), 3000); + Assertions.assertEquals("nested-modify", nestedRegistration.lastConfig().name); + Assertions.assertEquals(2, nestedRegistration.lastConfig().version); + } + + @Test + @Order(3) + void testUnregisteredJsonDoesNotTriggerReload() throws Exception { + int totalBaseline = totalReloadCount(); + + writeJson(configDir.resolve("unregistered.json"), "ignored", 1); + Thread.sleep(300); + + Assertions.assertEquals(totalBaseline, totalReloadCount()); + } + + @Test + @Order(4) + void testNonJsonDoesNotTriggerReload() throws Exception { + int totalBaseline = totalReloadCount(); + + Path file = configDir.resolve("nested").resolve("ignored.txt"); + Files.createDirectories(file.getParent()); + Files.writeString(file, "ignored", StandardCharsets.UTF_8); + Thread.sleep(300); + + Assertions.assertEquals(totalBaseline, totalReloadCount()); + } + + @Test + @Order(5) + void testDeleteDoesNotTriggerReload() throws Exception { + Path file = configDir.resolve(DELETE_PATH); + writeJson(file, "delete-target", 1); + waitForCount(deleteRegistration, 1, 3000); + + int baseline = deleteRegistration.reloadCount(); + Files.delete(file); + Thread.sleep(300); + + Assertions.assertEquals(baseline, deleteRegistration.reloadCount()); + } + + @Test + @Order(6) + void testInvalidJsonDoesNotReloadButRecoveryStillWorks() throws Exception { + Path file = configDir.resolve(INVALID_PATH); + writeJson(file, "valid-before-invalid", 1); + waitForCount(invalidRegistration, 1, 3000); + + int baseline = invalidRegistration.reloadCount(); + Files.writeString(file, "{\"name\":", StandardCharsets.UTF_8); + Thread.sleep(300); + Assertions.assertEquals(baseline, invalidRegistration.reloadCount()); + + writeJson(file, "valid-after-invalid", 2); + waitForCount(invalidRegistration, baseline + 1, 3000); + Assertions.assertEquals("valid-after-invalid", invalidRegistration.lastConfig().name); + Assertions.assertEquals(2, invalidRegistration.lastConfig().version); + } + + @Test + @Order(7) + void testStartWatchingIsIdempotent() throws Exception { + Path file = configDir.resolve(IDEMPOTENT_PATH); + writeJson(file, "before-idempotent", 1); + waitForCount(idempotentRegistration, 1, 3000); + + ConfigCenter.INSTANCE.startWatching(); + + int baseline = idempotentRegistration.reloadCount(); + writeJson(file, "after-idempotent", 2); + waitForCount(idempotentRegistration, baseline + 1, 3000); + Thread.sleep(300); + + Assertions.assertEquals(baseline + 1, idempotentRegistration.reloadCount()); + Assertions.assertEquals("after-idempotent", idempotentRegistration.lastConfig().name); + Assertions.assertEquals(2, idempotentRegistration.lastConfig().version); + } + + public static class TestConfig extends Config { + public String name; + public int version; + } + + private static class TrackingRegistration implements ConfigRegistration { + private final AtomicInteger reloadCount = new AtomicInteger(); + private final AtomicReference lastConfig = new AtomicReference<>(); + + @Override + public Class type() { + return TestConfig.class; + } + + @Override + public void init(TestConfig config) { + } + + @Override + public void onReload(TestConfig config) { + lastConfig.set(config); + reloadCount.incrementAndGet(); + } + + int reloadCount() { + return reloadCount.get(); + } + + TestConfig lastConfig() { + return lastConfig.get(); + } + } +}