From f510dc5a4239f5e89237e41c6302a3d508ec95c3 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Wed, 15 Apr 2026 16:36:23 +0800 Subject: [PATCH] refactor(context): support wait a debounce delay before executing turn --- .../agent/interaction/AgentRuntime.kt | 118 ++++++++++++++---- .../agent/interaction/AgentRuntimeTest.kt | 55 +++++++- 2 files changed, 146 insertions(+), 27 deletions(-) diff --git a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/interaction/AgentRuntime.kt b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/interaction/AgentRuntime.kt index c3e36798..627246cf 100644 --- a/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/interaction/AgentRuntime.kt +++ b/Partner-Framework/src/main/java/work/slhaf/partner/framework/agent/interaction/AgentRuntime.kt @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.annotation.JSONField import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import work.slhaf.partner.framework.agent.config.Config +import work.slhaf.partner.framework.agent.config.ConfigDoc import work.slhaf.partner.framework.agent.config.ConfigRegistration import work.slhaf.partner.framework.agent.config.Configurable import work.slhaf.partner.framework.agent.exception.ExceptionReporterHandler @@ -16,8 +17,9 @@ import work.slhaf.partner.framework.agent.interaction.flow.RunningFlowContext import work.slhaf.partner.framework.agent.support.Result import java.nio.file.Path import java.util.* +import kotlin.time.Duration.Companion.milliseconds -object AgentRuntime : Configurable, ConfigRegistration { +object AgentRuntime : Configurable, ConfigRegistration { private const val DEFAULT_LOG_CHANNEL = "log_channel" @@ -52,6 +54,9 @@ object AgentRuntime : Configurable, ConfigRegistration { @Volatile private var maskedModules: Set = emptySet() + @Volatile + private var debounceWindow: Long = 0 + @Volatile private var currentExecutingSource: String? = null @@ -123,17 +128,7 @@ object AgentRuntime : Configurable, ConfigRegistration { private suspend fun executeSource(source: String) { while (true) { - val execution = synchronized(stateLock) { - val context = latestContextsBySource[source] ?: run { - sourceQueue.remove(source) - sourceVersions.remove(source) - return - } - currentExecutingSource = source - currentExecutingContext = context - context.status.interrupted = false - SourceExecution(context, sourceVersions[source] ?: 0L) - } + val execution = awaitDebouncedExecution(source) ?: return val interrupted = executeTurn(execution.context) @@ -165,6 +160,64 @@ object AgentRuntime : Configurable, ConfigRegistration { } } + private suspend fun awaitDebouncedExecution(source: String): SourceExecution? { + if (debounceWindow <= 0) { + return synchronized(stateLock) { buildSourceExecutionLocked(source) } + } + + var observedVersion = synchronized(stateLock) { + sourceVersions[source] + } ?: return cleanupSourceAndReturnNull(source) + + while (true) { + delay(debounceWindow.milliseconds) + when (val result = synchronized(stateLock) { + val context = latestContextsBySource[source] + val latestVersion = sourceVersions[source] + when { + context == null || latestVersion == null -> DebounceResult.Missing + latestVersion != observedVersion -> DebounceResult.Retry(latestVersion) + else -> { + currentExecutingSource = source + currentExecutingContext = context + context.status.interrupted = false + DebounceResult.Ready(SourceExecution(context, latestVersion)) + } + } + }) { + DebounceResult.Missing -> return cleanupSourceAndReturnNull(source) + is DebounceResult.Ready -> return result.execution + is DebounceResult.Retry -> observedVersion = result.latestVersion + } + } + } + + private fun buildSourceExecutionLocked(source: String): SourceExecution? { + val context = latestContextsBySource[source] ?: run { + sourceQueue.remove(source) + sourceVersions.remove(source) + return null + } + val version = sourceVersions[source] ?: run { + latestContextsBySource.remove(source) + sourceQueue.remove(source) + return null + } + currentExecutingSource = source + currentExecutingContext = context + context.status.interrupted = false + return SourceExecution(context, version) + } + + private fun cleanupSourceAndReturnNull(source: String): SourceExecution? { + synchronized(stateLock) { + latestContextsBySource.remove(source) + sourceQueue.remove(source) + sourceVersions.remove(source) + } + return null + } + private suspend fun executeTurn(runningFlowContext: RunningFlowContext): Boolean { if (runningModules.isEmpty()) { refreshRunningModules() @@ -211,33 +264,34 @@ object AgentRuntime : Configurable, ConfigRegistration { } override fun declare(): Map> { - return mapOf(Path.of("masked_modules.json") to this) + return mapOf(Path.of("runtime.json") to this) } - override fun type(): Class { - return ModuleMaskConfig::class.java + override fun type(): Class { + return RuntimeConfig::class.java } override fun init( - config: ModuleMaskConfig, + config: RuntimeConfig, json: JSONObject? ) { - applyModuleMask(config) + applyConfig(config) } override fun onReload( - config: ModuleMaskConfig, + config: RuntimeConfig, json: JSONObject? ) { - applyModuleMask(config) + applyConfig(config) } - override fun defaultConfig(): ModuleMaskConfig { - return ModuleMaskConfig(setOf()) + override fun defaultConfig(): RuntimeConfig { + return RuntimeConfig(setOf(), 300) } - private fun applyModuleMask(config: ModuleMaskConfig) { + private fun applyConfig(config: RuntimeConfig) { maskedModules = config.maskedModules + debounceWindow = config.debounceWindow refreshRunningModules() } @@ -245,9 +299,25 @@ object AgentRuntime : Configurable, ConfigRegistration { val context: RunningFlowContext, val version: Long ) + + private sealed interface DebounceResult { + data object Missing : DebounceResult + data class Retry(val latestVersion: Long) : DebounceResult + data class Ready(val execution: SourceExecution) : DebounceResult + } } -data class ModuleMaskConfig( +data class RuntimeConfig( @field:JSONField(name = "masked_modules") - val maskedModules: Set + @field:ConfigDoc( + description = "运行时屏蔽的模块" + ) + val maskedModules: Set, + + @field:JSONField(name = "debounce_window") + @field:ConfigDoc( + description = "输入后的等待窗口", + unit = "ms" + ) + val debounceWindow: Long ) : Config() diff --git a/Partner-Framework/src/test/java/work/slhaf/partner/framework/agent/interaction/AgentRuntimeTest.kt b/Partner-Framework/src/test/java/work/slhaf/partner/framework/agent/interaction/AgentRuntimeTest.kt index 71bc9a3e..0b4725b5 100644 --- a/Partner-Framework/src/test/java/work/slhaf/partner/framework/agent/interaction/AgentRuntimeTest.kt +++ b/Partner-Framework/src/test/java/work/slhaf/partner/framework/agent/interaction/AgentRuntimeTest.kt @@ -1,8 +1,7 @@ package work.slhaf.partner.framework.agent.interaction import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import work.slhaf.partner.framework.agent.factory.component.abstracts.AbstractAgentModule @@ -41,19 +40,65 @@ class AgentRuntimeTest { assertEquals(listOf("source-a", "source-b"), recorder.sources) } + @Test + fun `agent runtime waits debounce before first execution`() { + setPrivateField("debounceWindow", 200) + val recorder = RecordingModule(order = 1, expectedExecutions = 1) + registerModule("debounce-recorder", recorder) + + AgentRuntime.submit(TestRunningFlowContext.of("source-a", "alpha")) + + assertFalse(recorder.latch.await(100, TimeUnit.MILLISECONDS)) + assertTrue(recorder.latch.await(500, TimeUnit.MILLISECONDS)) + assertEquals(listOf(1), recorder.inputSizes) + } + + @Test + fun `agent runtime resets debounce when same source receives new input`() { + setPrivateField("debounceWindow", 200) + val recorder = RecordingModule(order = 1, expectedExecutions = 1) + registerModule("debounce-merge-recorder", recorder) + + AgentRuntime.submit(TestRunningFlowContext.of("source-a", "first", 1_000L)) + Thread.sleep(100) + AgentRuntime.submit(TestRunningFlowContext.of("source-a", "second", 1_300L)) + + assertFalse(recorder.latch.await(120, TimeUnit.MILLISECONDS)) + assertTrue(recorder.latch.await(500, TimeUnit.MILLISECONDS)) + assertEquals(listOf(2), recorder.inputSizes) + assertEquals(listOf("first\nsecond"), recorder.historyInputs) + } + + @Test + fun `agent runtime debounce keeps queue head exclusive`() { + setPrivateField("debounceWindow", 150) + val recorder = RecordingModule(order = 1, expectedExecutions = 2) + registerModule("debounce-queue-recorder", recorder) + + AgentRuntime.submit(TestRunningFlowContext.of("source-a", "alpha")) + Thread.sleep(50) + AgentRuntime.submit(TestRunningFlowContext.of("source-b", "beta")) + + assertFalse(recorder.latch.await(100, TimeUnit.MILLISECONDS)) + assertTrue(recorder.latch.await(800, TimeUnit.MILLISECONDS)) + assertEquals(listOf("source-a", "source-b"), recorder.sources) + } + @Test fun `agent runtime interrupts current source and reruns from chain head with merged context`() { + setPrivateField("debounceWindow", 150) val blocking = BlockingModule() val finalizer = RecordingModule(order = 2, expectedExecutions = 1) registerModule("blocking-module", blocking) registerModule("finalizer-module", finalizer) AgentRuntime.submit(TestRunningFlowContext.of("source-a", "first", 1_000L)) - assertTrue(blocking.firstExecutionStarted.await(5, TimeUnit.SECONDS)) + assertTrue(blocking.firstExecutionStarted.await(2, TimeUnit.SECONDS)) AgentRuntime.submit(TestRunningFlowContext.of("source-a", "second", 1_300L)) blocking.releaseFirstExecution.countDown() + assertFalse(blocking.secondExecutionStarted.await(100, TimeUnit.MILLISECONDS)) assertTrue(finalizer.latch.await(5, TimeUnit.SECONDS)) waitUntil { blocking.seenInputSizes.size >= 2 } @@ -85,6 +130,7 @@ class AgentRuntimeTest { private fun resetAgentRuntime() { setPrivateField("runningModules", emptyMap>>()) setPrivateField("maskedModules", emptySet()) + setPrivateField("debounceWindow", 0) setPrivateField("currentExecutingSource", null) setPrivateField("currentExecutingContext", null) getPrivateMutableMap("latestContextsBySource").clear() @@ -149,6 +195,7 @@ class AgentRuntimeTest { private class BlockingModule : AbstractAgentModule.Running() { val seenInputSizes = CopyOnWriteArrayList() val firstExecutionStarted = CountDownLatch(1) + val secondExecutionStarted = CountDownLatch(1) val releaseFirstExecution = CountDownLatch(1) private val invocationCount = AtomicInteger(0) @@ -161,6 +208,8 @@ class AgentRuntimeTest { if (invocationCount.getAndIncrement() == 0) { firstExecutionStarted.countDown() releaseFirstExecution.await(5, TimeUnit.SECONDS) + } else { + secondExecutionStarted.countDown() } }