refactor(context): support wait a debounce delay before executing turn

This commit is contained in:
2026-04-15 16:36:23 +08:00
parent d8ff0b5ea4
commit f510dc5a42
2 changed files with 146 additions and 27 deletions

View File

@@ -5,6 +5,7 @@ import com.alibaba.fastjson2.annotation.JSONField
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import work.slhaf.partner.framework.agent.config.Config
import work.slhaf.partner.framework.agent.config.ConfigDoc
import work.slhaf.partner.framework.agent.config.ConfigRegistration
import work.slhaf.partner.framework.agent.config.Configurable
import work.slhaf.partner.framework.agent.exception.ExceptionReporterHandler
@@ -16,8 +17,9 @@ import work.slhaf.partner.framework.agent.interaction.flow.RunningFlowContext
import work.slhaf.partner.framework.agent.support.Result
import java.nio.file.Path
import java.util.*
import kotlin.time.Duration.Companion.milliseconds
object AgentRuntime : Configurable, ConfigRegistration<ModuleMaskConfig> {
object AgentRuntime : Configurable, ConfigRegistration<RuntimeConfig> {
private const val DEFAULT_LOG_CHANNEL = "log_channel"
@@ -52,6 +54,9 @@ object AgentRuntime : Configurable, ConfigRegistration<ModuleMaskConfig> {
@Volatile
private var maskedModules: Set<String> = emptySet()
@Volatile
private var debounceWindow: Long = 0
@Volatile
private var currentExecutingSource: String? = null
@@ -123,17 +128,7 @@ object AgentRuntime : Configurable, ConfigRegistration<ModuleMaskConfig> {
private suspend fun executeSource(source: String) {
while (true) {
val execution = synchronized(stateLock) {
val context = latestContextsBySource[source] ?: run {
sourceQueue.remove(source)
sourceVersions.remove(source)
return
}
currentExecutingSource = source
currentExecutingContext = context
context.status.interrupted = false
SourceExecution(context, sourceVersions[source] ?: 0L)
}
val execution = awaitDebouncedExecution(source) ?: return
val interrupted = executeTurn(execution.context)
@@ -165,6 +160,64 @@ object AgentRuntime : Configurable, ConfigRegistration<ModuleMaskConfig> {
}
}
private suspend fun awaitDebouncedExecution(source: String): SourceExecution? {
if (debounceWindow <= 0) {
return synchronized(stateLock) { buildSourceExecutionLocked(source) }
}
var observedVersion = synchronized(stateLock) {
sourceVersions[source]
} ?: return cleanupSourceAndReturnNull(source)
while (true) {
delay(debounceWindow.milliseconds)
when (val result = synchronized(stateLock) {
val context = latestContextsBySource[source]
val latestVersion = sourceVersions[source]
when {
context == null || latestVersion == null -> DebounceResult.Missing
latestVersion != observedVersion -> DebounceResult.Retry(latestVersion)
else -> {
currentExecutingSource = source
currentExecutingContext = context
context.status.interrupted = false
DebounceResult.Ready(SourceExecution(context, latestVersion))
}
}
}) {
DebounceResult.Missing -> return cleanupSourceAndReturnNull(source)
is DebounceResult.Ready -> return result.execution
is DebounceResult.Retry -> observedVersion = result.latestVersion
}
}
}
private fun buildSourceExecutionLocked(source: String): SourceExecution? {
val context = latestContextsBySource[source] ?: run {
sourceQueue.remove(source)
sourceVersions.remove(source)
return null
}
val version = sourceVersions[source] ?: run {
latestContextsBySource.remove(source)
sourceQueue.remove(source)
return null
}
currentExecutingSource = source
currentExecutingContext = context
context.status.interrupted = false
return SourceExecution(context, version)
}
private fun cleanupSourceAndReturnNull(source: String): SourceExecution? {
synchronized(stateLock) {
latestContextsBySource.remove(source)
sourceQueue.remove(source)
sourceVersions.remove(source)
}
return null
}
private suspend fun executeTurn(runningFlowContext: RunningFlowContext): Boolean {
if (runningModules.isEmpty()) {
refreshRunningModules()
@@ -211,33 +264,34 @@ object AgentRuntime : Configurable, ConfigRegistration<ModuleMaskConfig> {
}
override fun declare(): Map<Path, ConfigRegistration<out Config>> {
return mapOf(Path.of("masked_modules.json") to this)
return mapOf(Path.of("runtime.json") to this)
}
override fun type(): Class<ModuleMaskConfig> {
return ModuleMaskConfig::class.java
override fun type(): Class<RuntimeConfig> {
return RuntimeConfig::class.java
}
override fun init(
config: ModuleMaskConfig,
config: RuntimeConfig,
json: JSONObject?
) {
applyModuleMask(config)
applyConfig(config)
}
override fun onReload(
config: ModuleMaskConfig,
config: RuntimeConfig,
json: JSONObject?
) {
applyModuleMask(config)
applyConfig(config)
}
override fun defaultConfig(): ModuleMaskConfig {
return ModuleMaskConfig(setOf())
override fun defaultConfig(): RuntimeConfig {
return RuntimeConfig(setOf(), 300)
}
private fun applyModuleMask(config: ModuleMaskConfig) {
private fun applyConfig(config: RuntimeConfig) {
maskedModules = config.maskedModules
debounceWindow = config.debounceWindow
refreshRunningModules()
}
@@ -245,9 +299,25 @@ object AgentRuntime : Configurable, ConfigRegistration<ModuleMaskConfig> {
val context: RunningFlowContext,
val version: Long
)
private sealed interface DebounceResult {
data object Missing : DebounceResult
data class Retry(val latestVersion: Long) : DebounceResult
data class Ready(val execution: SourceExecution) : DebounceResult
}
}
data class ModuleMaskConfig(
data class RuntimeConfig(
@field:JSONField(name = "masked_modules")
val maskedModules: Set<String>
@field:ConfigDoc(
description = "运行时屏蔽的模块"
)
val maskedModules: Set<String>,
@field:JSONField(name = "debounce_window")
@field:ConfigDoc(
description = "输入后的等待窗口",
unit = "ms"
)
val debounceWindow: Long
) : Config()

View File

@@ -1,8 +1,7 @@
package work.slhaf.partner.framework.agent.interaction
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import work.slhaf.partner.framework.agent.factory.component.abstracts.AbstractAgentModule
@@ -41,19 +40,65 @@ class AgentRuntimeTest {
assertEquals(listOf("source-a", "source-b"), recorder.sources)
}
@Test
fun `agent runtime waits debounce before first execution`() {
setPrivateField("debounceWindow", 200)
val recorder = RecordingModule(order = 1, expectedExecutions = 1)
registerModule("debounce-recorder", recorder)
AgentRuntime.submit(TestRunningFlowContext.of("source-a", "alpha"))
assertFalse(recorder.latch.await(100, TimeUnit.MILLISECONDS))
assertTrue(recorder.latch.await(500, TimeUnit.MILLISECONDS))
assertEquals(listOf(1), recorder.inputSizes)
}
@Test
fun `agent runtime resets debounce when same source receives new input`() {
setPrivateField("debounceWindow", 200)
val recorder = RecordingModule(order = 1, expectedExecutions = 1)
registerModule("debounce-merge-recorder", recorder)
AgentRuntime.submit(TestRunningFlowContext.of("source-a", "first", 1_000L))
Thread.sleep(100)
AgentRuntime.submit(TestRunningFlowContext.of("source-a", "second", 1_300L))
assertFalse(recorder.latch.await(120, TimeUnit.MILLISECONDS))
assertTrue(recorder.latch.await(500, TimeUnit.MILLISECONDS))
assertEquals(listOf(2), recorder.inputSizes)
assertEquals(listOf("first\nsecond"), recorder.historyInputs)
}
@Test
fun `agent runtime debounce keeps queue head exclusive`() {
setPrivateField("debounceWindow", 150)
val recorder = RecordingModule(order = 1, expectedExecutions = 2)
registerModule("debounce-queue-recorder", recorder)
AgentRuntime.submit(TestRunningFlowContext.of("source-a", "alpha"))
Thread.sleep(50)
AgentRuntime.submit(TestRunningFlowContext.of("source-b", "beta"))
assertFalse(recorder.latch.await(100, TimeUnit.MILLISECONDS))
assertTrue(recorder.latch.await(800, TimeUnit.MILLISECONDS))
assertEquals(listOf("source-a", "source-b"), recorder.sources)
}
@Test
fun `agent runtime interrupts current source and reruns from chain head with merged context`() {
setPrivateField("debounceWindow", 150)
val blocking = BlockingModule()
val finalizer = RecordingModule(order = 2, expectedExecutions = 1)
registerModule("blocking-module", blocking)
registerModule("finalizer-module", finalizer)
AgentRuntime.submit(TestRunningFlowContext.of("source-a", "first", 1_000L))
assertTrue(blocking.firstExecutionStarted.await(5, TimeUnit.SECONDS))
assertTrue(blocking.firstExecutionStarted.await(2, TimeUnit.SECONDS))
AgentRuntime.submit(TestRunningFlowContext.of("source-a", "second", 1_300L))
blocking.releaseFirstExecution.countDown()
assertFalse(blocking.secondExecutionStarted.await(100, TimeUnit.MILLISECONDS))
assertTrue(finalizer.latch.await(5, TimeUnit.SECONDS))
waitUntil { blocking.seenInputSizes.size >= 2 }
@@ -85,6 +130,7 @@ class AgentRuntimeTest {
private fun resetAgentRuntime() {
setPrivateField("runningModules", emptyMap<Int, List<AbstractAgentModule.Running<RunningFlowContext>>>())
setPrivateField("maskedModules", emptySet<String>())
setPrivateField("debounceWindow", 0)
setPrivateField("currentExecutingSource", null)
setPrivateField("currentExecutingContext", null)
getPrivateMutableMap<String, RunningFlowContext>("latestContextsBySource").clear()
@@ -149,6 +195,7 @@ class AgentRuntimeTest {
private class BlockingModule : AbstractAgentModule.Running<TestRunningFlowContext>() {
val seenInputSizes = CopyOnWriteArrayList<Int>()
val firstExecutionStarted = CountDownLatch(1)
val secondExecutionStarted = CountDownLatch(1)
val releaseFirstExecution = CountDownLatch(1)
private val invocationCount = AtomicInteger(0)
@@ -161,6 +208,8 @@ class AgentRuntimeTest {
if (invocationCount.getAndIncrement() == 0) {
firstExecutionStarted.countDown()
releaseFirstExecution.await(5, TimeUnit.SECONDS)
} else {
secondExecutionStarted.countDown()
}
}