refactor(context): simplify constructor and inputs encoding of running flow context

This commit is contained in:
2026-04-15 15:45:42 +08:00
parent dc147000ba
commit d8ff0b5ea4
5 changed files with 44 additions and 119 deletions

View File

@@ -69,7 +69,7 @@ public class ActionPlanner extends AbstractAgentModule.Running<PartnerRunningFlo
@Override @Override
protected void doExecute(@NotNull PartnerRunningFlowContext context) { protected void doExecute(@NotNull PartnerRunningFlowContext context) {
String input = context.encodeInputsXml(); String input = context.encodeInputsBlock().encodeToXmlString();
Result<ExtractorResult> result = actionExtractor.execute(input) Result<ExtractorResult> result = actionExtractor.execute(input)
.onFailure(exp -> { .onFailure(exp -> {
ExceptionReporterHandler.INSTANCE.report(exp, ContextExceptionReporter.REPORTER_NAME); ExceptionReporterHandler.INSTANCE.report(exp, ContextExceptionReporter.REPORTER_NAME);

View File

@@ -129,7 +129,7 @@ public class CommunicationProducer extends AbstractAgentModule.Running<PartnerRu
Element root = document.createElement("input"); Element root = document.createElement("input");
document.appendChild(root); document.appendChild(root);
runningFlowContext.appendInputsXml(document, root); document.appendChild(document.importNode(runningFlowContext.encodeInputsBlock().encodeToXml(), true));
appendTextElement(document, root, "source", runningFlowContext.getSource()); appendTextElement(document, root, "source", runningFlowContext.getSource());
for (Map.Entry<String, String> entry : runningFlowContext.getAdditionalUserInfo().entrySet()) { for (Map.Entry<String, String> entry : runningFlowContext.getAdditionalUserInfo().entrySet()) {
appendTextElement(document, root, sanitizeTagName(entry.getKey()), entry.getValue()); appendTextElement(document, root, sanitizeTagName(entry.getKey()), entry.getValue());

View File

@@ -1,15 +1,15 @@
package work.slhaf.partner.runtime package work.slhaf.partner.runtime
import org.w3c.dom.Document
import org.w3c.dom.Element
import work.slhaf.partner.common.base.Block
import work.slhaf.partner.framework.agent.interaction.flow.RunningFlowContext import work.slhaf.partner.framework.agent.interaction.flow.RunningFlowContext
class PartnerRunningFlowContext private constructor( class PartnerRunningFlowContext private constructor(
override val source: String, override val source: String,
inputs: List<InputEntry>, inputs: List<InputEntry>,
firstInputEpochMillis: Long, firstInputEpochMillis: Long
additionalUserInfo: Map<String, String> = emptyMap(), ) : RunningFlowContext(inputs, firstInputEpochMillis) {
skippedModules: Set<String> = emptySet(),
target: String = source
) : RunningFlowContext(inputs, firstInputEpochMillis, additionalUserInfo, skippedModules, target) {
companion object { companion object {
@@ -31,6 +31,7 @@ class PartnerRunningFlowContext private constructor(
} }
@JvmStatic @JvmStatic
@JvmOverloads
fun fromUser(userId: String, input: String, receivedAtMillis: Long = System.currentTimeMillis()) = fun fromUser(userId: String, input: String, receivedAtMillis: Long = System.currentTimeMillis()) =
PartnerRunningFlowContext( PartnerRunningFlowContext(
SourceTag.buildUserSource(userId), SourceTag.buildUserSource(userId),
@@ -39,6 +40,7 @@ class PartnerRunningFlowContext private constructor(
) )
@JvmStatic @JvmStatic
@JvmOverloads
fun fromSelf(input: String, receivedAtMillis: Long = System.currentTimeMillis()) = fun fromSelf(input: String, receivedAtMillis: Long = System.currentTimeMillis()) =
PartnerRunningFlowContext( PartnerRunningFlowContext(
SourceTag.buildAgentSource(), SourceTag.buildAgentSource(),
@@ -50,20 +52,21 @@ class PartnerRunningFlowContext private constructor(
} }
} }
override fun copyWith( override fun recreate(inputs: List<InputEntry>): RunningFlowContext {
inputs: List<InputEntry>,
firstInputEpochMillis: Long,
additionalUserInfo: Map<String, String>,
skippedModules: Set<String>,
target: String
): RunningFlowContext {
return PartnerRunningFlowContext( return PartnerRunningFlowContext(
source = source, source = source,
inputs = inputs, inputs = inputs,
firstInputEpochMillis = firstInputEpochMillis, firstInputEpochMillis = System.currentTimeMillis()
additionalUserInfo = additionalUserInfo,
skippedModules = skippedModules,
target = target
) )
} }
fun encodeInputsBlock(): Block = object : Block("inputs") {
override fun fillXml(document: Document, root: Element) {
appendRepeatedElements(document, root, "input", inputs) {
this.setAttribute("interval-to-first", it.offsetMillis.toString())
this.textContent = it.content
}
}
}
} }

View File

@@ -1,8 +1,6 @@
package work.slhaf.partner.framework.agent.interaction.flow package work.slhaf.partner.framework.agent.interaction.flow
import com.alibaba.fastjson2.JSONObject import com.alibaba.fastjson2.JSONObject
import org.w3c.dom.Document
import org.w3c.dom.Element
import java.time.Instant import java.time.Instant
import java.time.LocalDateTime import java.time.LocalDateTime
import java.time.ZoneId import java.time.ZoneId
@@ -14,10 +12,7 @@ import kotlin.math.min
*/ */
abstract class RunningFlowContext protected constructor( abstract class RunningFlowContext protected constructor(
inputs: List<InputEntry>, inputs: List<InputEntry>,
val firstInputEpochMillis: Long, private var firstInputEpochMillis: Long
additionalUserInfo: Map<String, String> = emptyMap(),
skippedModules: Set<String> = emptySet(),
target: String = ""
) { ) {
/** /**
* 消息来源: 由谁发出 * 消息来源: 由谁发出
@@ -38,13 +33,18 @@ abstract class RunningFlowContext protected constructor(
/** /**
* 消息回应对象,默认与 source 一致 * 消息回应对象,默认与 source 一致
*/ */
var target: String = target private var _target: String? = null
var target: String
get() = _target ?: source
set(value) {
_target = value
}
private val _additionalUserInfo = additionalUserInfo.toMutableMap() private val _additionalUserInfo = mutableMapOf<String, String>()
val additionalUserInfo: Map<String, String> val additionalUserInfo: Map<String, String>
get() = _additionalUserInfo get() = _additionalUserInfo
private val _skippedModules = skippedModules.toMutableSet() private val _skippedModules = mutableSetOf<String>()
val skippedModules: Set<String> val skippedModules: Set<String>
get() = _skippedModules get() = _skippedModules
@@ -73,37 +73,6 @@ abstract class RunningFlowContext protected constructor(
fun formatInputsForHistory(): String = inputs.joinToString("\n") { it.content } fun formatInputsForHistory(): String = inputs.joinToString("\n") { it.content }
@JvmOverloads
fun appendInputsXml(
document: Document,
parent: Element,
containerTagName: String = "inputs",
inputTagName: String = "input",
intervalAttributeName: String = "interval-to-first"
) {
val inputsElement = document.createElement(containerTagName)
parent.appendChild(inputsElement)
inputs.forEach { entry ->
val inputElement = document.createElement(inputTagName)
inputElement.setAttribute(intervalAttributeName, entry.offsetMillis.toString())
inputElement.textContent = entry.content
inputsElement.appendChild(inputElement)
}
}
fun encodeInputsXml(): String {
val builder = StringBuilder()
builder.append("<inputs>")
inputs.forEach { entry ->
builder.append("<input interval-to-first=\"")
.append(escapeXml(entry.offsetMillis.toString()))
.append("\">")
.append(escapeXml(entry.content))
.append("</input>")
}
builder.append("</inputs>")
return builder.toString()
}
fun mergedWith(other: RunningFlowContext): RunningFlowContext { fun mergedWith(other: RunningFlowContext): RunningFlowContext {
require(source == other.source) { require(source == other.source) {
@@ -115,28 +84,17 @@ abstract class RunningFlowContext protected constructor(
addAll(normalizeInputs(other, mergedFirstEpochMillis)) addAll(normalizeInputs(other, mergedFirstEpochMillis))
}.sortedBy { it.offsetMillis } }.sortedBy { it.offsetMillis }
val mergedAdditionalUserInfo = LinkedHashMap<String, String>(_additionalUserInfo) val mergedContext = recreate(mergedInputs)
mergedAdditionalUserInfo.putAll(other.additionalUserInfo) mergedContext.firstInputEpochMillis = mergedFirstEpochMillis
mergedContext.target = other.target.ifBlank { target }
val mergedSkippedModules = LinkedHashSet<String>(_skippedModules) mergedContext._additionalUserInfo.putAll(_additionalUserInfo)
mergedSkippedModules.addAll(other.skippedModules) mergedContext._additionalUserInfo.putAll(other.additionalUserInfo)
mergedContext._skippedModules.addAll(_skippedModules)
return copyWith( mergedContext._skippedModules.addAll(other.skippedModules)
inputs = mergedInputs, return mergedContext
firstInputEpochMillis = mergedFirstEpochMillis,
additionalUserInfo = mergedAdditionalUserInfo,
skippedModules = mergedSkippedModules,
target = other.target.ifBlank { target }
)
} }
protected abstract fun copyWith( protected abstract fun recreate(inputs: List<InputEntry>): RunningFlowContext
inputs: List<InputEntry>,
firstInputEpochMillis: Long,
additionalUserInfo: Map<String, String>,
skippedModules: Set<String>,
target: String
): RunningFlowContext
private fun normalizeInputs(context: RunningFlowContext, firstEpochMillis: Long): List<InputEntry> { private fun normalizeInputs(context: RunningFlowContext, firstEpochMillis: Long): List<InputEntry> {
return context.inputs.map { entry -> return context.inputs.map { entry ->
@@ -147,15 +105,6 @@ abstract class RunningFlowContext protected constructor(
} }
} }
private fun escapeXml(value: String): String {
return value
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace("\"", "&quot;")
.replace("'", "&apos;")
}
data class InputEntry( data class InputEntry(
val offsetMillis: Long, val offsetMillis: Long,
val content: String val content: String
@@ -163,7 +112,6 @@ abstract class RunningFlowContext protected constructor(
class Info { class Info {
val uuid = UUID.randomUUID().toString() val uuid = UUID.randomUUID().toString()
val dateTime: LocalDateTime = LocalDateTime.now()
} }
class Status { class Status {

View File

@@ -29,21 +29,6 @@ class AgentRuntimeTest {
clearModules() clearModules()
} }
@Test
fun `running flow context preserves offsets and xml encoding`() {
val first = TestRunningFlowContext.of("source-a", "first", 1_000L)
val second = TestRunningFlowContext.of("source-a", "second", 1_250L)
val merged = first.mergedWith(second)
assertEquals(listOf(0L, 250L), merged.inputs.map { it.offsetMillis })
assertEquals("first\nsecond", merged.input)
assertEquals(
"<inputs><input interval-to-first=\"0\">first</input><input interval-to-first=\"250\">second</input></inputs>",
merged.encodeInputsXml()
)
}
@Test @Test
fun `agent runtime keeps source queue in first arrival order`() { fun `agent runtime keeps source queue in first arrival order`() {
val recorder = RecordingModule(order = 1, expectedExecutions = 2) val recorder = RecordingModule(order = 1, expectedExecutions = 2)
@@ -185,9 +170,8 @@ class AgentRuntimeTest {
private class TestRunningFlowContext private constructor( private class TestRunningFlowContext private constructor(
override val source: String, override val source: String,
inputs: List<InputEntry>, inputs: List<InputEntry>,
firstInputEpochMillis: Long, firstInputEpochMillis: Long
target: String = source ) : RunningFlowContext(inputs, firstInputEpochMillis) {
) : RunningFlowContext(inputs, firstInputEpochMillis, target = target) {
companion object { companion object {
fun of( fun of(
@@ -203,22 +187,12 @@ class AgentRuntimeTest {
} }
} }
override fun copyWith( override fun recreate(inputs: List<InputEntry>): RunningFlowContext {
inputs: List<InputEntry>,
firstInputEpochMillis: Long,
additionalUserInfo: Map<String, String>,
skippedModules: Set<String>,
target: String
): RunningFlowContext {
return TestRunningFlowContext( return TestRunningFlowContext(
source = source, source = source,
inputs = inputs, inputs = inputs,
firstInputEpochMillis = firstInputEpochMillis, firstInputEpochMillis = System.currentTimeMillis()
target = target )
).apply {
additionalUserInfo.forEach(::putUserInfo)
skippedModules.forEach(::addSkippedModule)
}
} }
} }
} }