diff --git a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCore.java b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCore.java index 102bc960..9ec10174 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCore.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/core/cognition/CognitionCore.java @@ -2,6 +2,7 @@ package work.slhaf.partner.core.cognition; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import kotlin.Unit; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.w3c.dom.Document; @@ -27,6 +28,17 @@ import java.util.concurrent.locks.ReentrantLock; @CapabilityCore(value = "cognition") public class CognitionCore implements StateSerializable { + private static final String RECENT_CHAT_MESSAGE_NOTES = """ + 消息格式: + - user 消息当前写入格式: [[USER]: ]: <正文> 或 [[AGENT]: self]: <正文> + - assistant 消息直接记录回复正文;若以 [NOT_REPLIED]: <正文> 形式出现,表示该结果未直接发送给用户 + + 标记含义: + - [USER]: 外部用户来源 + - [AGENT]: 系统内部来源 + - [NOT_REPLIED]: 仅保留在历史中的未直接回复结果 + """; + private final ReentrantLock messageLock = new ReentrantLock(); /** @@ -105,7 +117,14 @@ public class CognitionCore implements StateSerializable { new BlockContent("recent_chat_messages", "communication_producer") { @Override protected void fillXml(@NotNull Document document, @NotNull Element root) { - appendRepeatedElements(document, root, "chat_message", resolveRecentChatMessages()); + appendTextElement(document, root, "message_tag_notes", RECENT_CHAT_MESSAGE_NOTES); + Element chatMessagesElement = document.createElement("chat_messages"); + root.appendChild(chatMessagesElement); + appendRepeatedElements(document, chatMessagesElement, "chat_message", resolveRecentChatMessages(), (messageElement, message) -> { + messageElement.setAttribute("role", message.roleValue()); + messageElement.setTextContent(message.getContent()); + return Unit.INSTANCE; + }); } }, Set.of(ContextBlock.FocusedDomain.COMMUNICATION), diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java index ab94eb4c..3d323031 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/CommunicationProducer.java @@ -1,7 +1,5 @@ package work.slhaf.partner.module.communication; -import lombok.Data; -import lombok.EqualsAndHashCode; import org.jetbrains.annotations.NotNull; import org.w3c.dom.Document; import org.w3c.dom.Element; @@ -27,20 +25,37 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -@EqualsAndHashCode(callSuper = true) -@Data public class CommunicationProducer extends AbstractAgentModule.Running implements ActivateModel { private static final String INTERRUPTED_MARKER = " [response interrupted due to internal exception]"; + private static final String NO_REPLY_MARKER = "NO_REPLY"; + private static final String NOT_REPLIED_MARKER = "NOT_REPLIED"; + private static final String NOT_REPLIED_PREFIX = "[" + NOT_REPLIED_MARKER + "]: "; private static final String MODULE_PROMPT = """ - 你是 Partner 的表达模块。 - 你接下来收到的消息固定分为三个区段: - 1. system message 是 Head, 用于说明整个输入结构与输出要求。 - 2. 区段只承载 type=CONTEXT 的上下文块, 其中每个子块都带有独立来源, 仅作为理解当前状态与辅助决策的依据。 - 3. Conversation 区段是对话轨迹; 最新的一条 user message 会使用 结构, 其中 承载本轮按时间顺序排列的输入序列, 每个 节点会带有相对首条输入的时间间隔属性, 其他子标签是输入元信息与 type=SUPPLY 的补充块, 补充块会按 blockName 分区。 - 你必须综合 Context 与 Conversation 回答最新输入, 不要把 XML 标签当作需要原样复述给用户的内容。 - 直接输出最终回应内容即可, 不需要额外包装为 JSON。 + 你当前正在承担 Partner 的对外交流职责。你需要基于系统此刻的上下文状态、保留的对话轨迹以及最新输入,生成自然、贴合当前情境、并与系统整体状态一致的交流结果。 + + 你接下来收到的消息,将按照出现顺序,固定分为三个区段: + 1. system message 是 Head,用于说明整个输入结构与输出要求,即本条消息。 + 2. 区段承载系统中所有模块产生的上下文块。上下文块代表 Partner 在此刻的系统状态投影;其中每个子块都带有独立来源,可作为理解当前状态和辅助决策的依据。 + 3. 区段是系统此刻保留的对话轨迹;最新的一条 user message 会使用 结构,其中 承载本轮按时间顺序排列的输入序列,每个 节点会带有相对首条输入的时间间隔属性;其他子标签是输入元信息与 type=SUPPLY 的补充块,补充块会按 blockName 分区。 + + 你的任务: + - 综合 以及 SUPPLY 补充块,理解当前情境,并产出本轮交流结果。 + - 优先保证交流结果与当前系统状态一致,不要忽略明显相关的上下文信号。 + - 若最新输入与已有上下文存在张力,应以最新输入为当前交流的直接依据,再结合上下文判断如何回应。 + - 你当前负责的是对外交流,不负责直接规划行动、修改系统状态,或伪造并不存在的执行结果。 + + 输出契约: + - 默认情况下,直接输出要发送给用户的最终回复正文,不要添加额外标签、解释或前后缀。 + - 若当前情境下不应直接向用户发出回复,但仍需要留下本轮交流结果供系统后续保留在交流轨迹中,则输出以 NO_REPLY 开头。 + - 使用 NO_REPLY 时,格式为: + + NO_REPLY + 这里写本轮交流结果正文 + + - 以 NO_REPLY 开头的输出不会直接展示给用户;系统在写入交流轨迹时,会以单独的历史标记形式保留该结果。 + - 不要输出空字符串;若选择不直接回复用户,应使用 NO_REPLY 契约明确表达。 """; @InjectCapability @@ -99,13 +114,27 @@ public class CommunicationProducer extends AbstractAgentModule.Running snapshotConversationMessages() { List snapshot = new ArrayList<>(cognitionCapability.snapshotChatMessages()); snapshot.removeIf(this::isStructuredUserMessage); diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt index b8b3ea43..45b374c8 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/communication/ReplyDispatcher.kt @@ -11,6 +11,7 @@ import kotlin.time.Duration.Companion.milliseconds object ReplyDispatcher { private const val AGGREGATE_WINDOW_MILLIS = 100L + private const val NO_REPLY_MARKER = "NO_REPLY" private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val collectorChannel = Channel(Channel.UNLIMITED) @@ -79,11 +80,87 @@ object ReplyDispatcher { private val target: String, ) : StreamChatMessageConsumer() { - override fun consumeDelta(delta: String?) { - if (delta != null) { - collectorChannel.trySend(ReplyChunk(delta, target)).isSuccess + private enum class VisibilityState { + UNDECIDED, + NO_REPLY, + VISIBLE + } + + private val rawResponse = StringBuilder() + private val undecidedBuffer = StringBuilder() + private var visibilityState = VisibilityState.UNDECIDED + + override fun onDelta(delta: String) { + rawResponse.append(delta) + routeDelta(delta) + } + + override fun collectResponse(): String { + finalizeUndecidedBuffer() + return rawResponse.toString() + } + + private fun routeDelta(delta: String) { + when (visibilityState) { + VisibilityState.NO_REPLY -> return + VisibilityState.VISIBLE -> flushVisible(delta) + VisibilityState.UNDECIDED -> { + undecidedBuffer.append(delta) + resolveVisibility() + } } } + private fun resolveVisibility() { + val content = undecidedBuffer.toString() + if (content.length <= NO_REPLY_MARKER.length) { + if (NO_REPLY_MARKER.startsWith(content)) { + return + } + revealBufferedContent() + return + } + + if (!content.startsWith(NO_REPLY_MARKER)) { + revealBufferedContent() + return + } + + val suffixFirstChar = content[NO_REPLY_MARKER.length] + if (suffixFirstChar == '\n' || suffixFirstChar == '\r') { + visibilityState = VisibilityState.NO_REPLY + undecidedBuffer.setLength(0) + return + } + revealBufferedContent() + } + + private fun finalizeUndecidedBuffer() { + if (visibilityState != VisibilityState.UNDECIDED) { + return + } + if (undecidedBuffer.toString() == NO_REPLY_MARKER) { + visibilityState = VisibilityState.NO_REPLY + undecidedBuffer.setLength(0) + return + } + revealBufferedContent() + } + + private fun revealBufferedContent() { + visibilityState = VisibilityState.VISIBLE + if (undecidedBuffer.isNotEmpty()) { + flushVisible(undecidedBuffer.toString()) + undecidedBuffer.setLength(0) + } + } + + private fun flushVisible(delta: String) { + collectorChannel.trySend(ReplyChunk(delta, target)).isSuccess + } + + override fun consumeDelta(delta: String?) { + } + } } diff --git a/Partner-Core/src/test/java/work/slhaf/partner/core/cognition/CognitionCoreTest.java b/Partner-Core/src/test/java/work/slhaf/partner/core/cognition/CognitionCoreTest.java new file mode 100644 index 00000000..cb053aa8 --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/core/cognition/CognitionCoreTest.java @@ -0,0 +1,45 @@ +package work.slhaf.partner.core.cognition; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import work.slhaf.partner.framework.agent.model.pojo.Message; + +import java.nio.file.Path; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class CognitionCoreTest { + + @BeforeAll + static void beforeAll(@TempDir Path tempDir) { + System.setProperty("user.home", tempDir.toAbsolutePath().toString()); + } + + @Test + void shouldRenderRecentChatMessagesWithWrapperAndNotes() { + CognitionCore cognitionCore = new CognitionCore(); + cognitionCore.getChatMessages().addAll(List.of( + new Message(Message.Character.USER, "[[USER]: user-1]: hello"), + new Message(Message.Character.ASSISTANT, "[NOT_REPLIED]: wait"), + new Message(Message.Character.ASSISTANT, "latest message") + )); + + cognitionCore.refreshRecentChatMessagesContext(); + String content = cognitionCore.contextWorkspace() + .resolve(List.of(ContextBlock.FocusedDomain.COMMUNICATION)) + .encodeToMessage() + .getContent(); + + assertTrue(content.contains("")); + assertTrue(content.contains("")); + assertTrue(content.contains("[[USER]: user-1]: hello")); + assertTrue(content.contains("[NOT_REPLIED]: wait")); + assertTrue(content.contains("[USER]")); + assertTrue(content.contains("[AGENT]")); + assertTrue(content.contains("[NOT_REPLIED]")); + assertFalse(content.contains("latest message")); + } +} diff --git a/Partner-Core/src/test/java/work/slhaf/partner/module/communication/CommunicationProducerTest.java b/Partner-Core/src/test/java/work/slhaf/partner/module/communication/CommunicationProducerTest.java new file mode 100644 index 00000000..bd20a6ac --- /dev/null +++ b/Partner-Core/src/test/java/work/slhaf/partner/module/communication/CommunicationProducerTest.java @@ -0,0 +1,111 @@ +package work.slhaf.partner.module.communication; + +import org.junit.jupiter.api.Test; +import work.slhaf.partner.core.cognition.CognitionCapability; +import work.slhaf.partner.core.cognition.ContextWorkspace; +import work.slhaf.partner.framework.agent.model.pojo.Message; +import work.slhaf.partner.runtime.PartnerRunningFlowContext; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class CommunicationProducerTest { + + private static void invokeUpdateChatMessages( + CommunicationProducer producer, + PartnerRunningFlowContext context, + String response + ) throws Exception { + Method method = CommunicationProducer.class.getDeclaredMethod( + "updateChatMessages", + PartnerRunningFlowContext.class, + String.class + ); + method.setAccessible(true); + method.invoke(producer, context, response); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + @Test + void shouldConvertNoReplyResponseWhenWritingHistory() throws Exception { + StubCognitionCapability cognitionCapability = new StubCognitionCapability(); + CommunicationProducer producer = new CommunicationProducer(); + setField(producer, "cognitionCapability", cognitionCapability); + + invokeUpdateChatMessages( + producer, + PartnerRunningFlowContext.fromUser("user-1", "hello"), + "NO_REPLY\nnot now" + ); + + List chatMessages = cognitionCapability.getChatMessages(); + assertEquals(2, chatMessages.size()); + assertEquals("[[USER]: user-1]: hello", chatMessages.get(0).getContent()); + assertEquals("[NOT_REPLIED]: not now", chatMessages.get(1).getContent()); + } + + @Test + void shouldKeepRegularAssistantResponseUntouched() throws Exception { + StubCognitionCapability cognitionCapability = new StubCognitionCapability(); + CommunicationProducer producer = new CommunicationProducer(); + setField(producer, "cognitionCapability", cognitionCapability); + + invokeUpdateChatMessages( + producer, + PartnerRunningFlowContext.fromUser("user-1", "hello"), + "normal reply" + ); + + List chatMessages = cognitionCapability.getChatMessages(); + assertEquals("normal reply", chatMessages.get(1).getContent()); + } + + private static final class StubCognitionCapability implements CognitionCapability { + private final ContextWorkspace contextWorkspace = new ContextWorkspace(); + private final List chatMessages = new ArrayList<>(); + private final Lock lock = new ReentrantLock(); + + @Override + public void initiateTurn(String input, String target, String... skippedModules) { + } + + @Override + public ContextWorkspace contextWorkspace() { + return contextWorkspace; + } + + @Override + public List getChatMessages() { + return chatMessages; + } + + @Override + public List snapshotChatMessages() { + return List.copyOf(chatMessages); + } + + @Override + public void rollChatMessagesWithSnapshot(int snapshotSize, int retainDivisor) { + } + + @Override + public void refreshRecentChatMessagesContext() { + } + + @Override + public Lock getMessageLock() { + return lock; + } + } +}