mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
feat(communication): enrich and correct system-prompt in CommunicationProducer, and support NO_REPLY answering
This commit is contained in:
@@ -2,6 +2,7 @@ package work.slhaf.partner.core.cognition;
|
||||
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import kotlin.Unit;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.w3c.dom.Document;
|
||||
@@ -27,6 +28,17 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
@CapabilityCore(value = "cognition")
|
||||
public class CognitionCore implements StateSerializable {
|
||||
|
||||
private static final String RECENT_CHAT_MESSAGE_NOTES = """
|
||||
消息格式:
|
||||
- user 消息当前写入格式: [[USER]: <userId>]: <正文> 或 [[AGENT]: self]: <正文>
|
||||
- assistant 消息直接记录回复正文;若以 [NOT_REPLIED]: <正文> 形式出现,表示该结果未直接发送给用户
|
||||
|
||||
标记含义:
|
||||
- [USER]: 外部用户来源
|
||||
- [AGENT]: 系统内部来源
|
||||
- [NOT_REPLIED]: 仅保留在历史中的未直接回复结果
|
||||
""";
|
||||
|
||||
private final ReentrantLock messageLock = new ReentrantLock();
|
||||
|
||||
/**
|
||||
@@ -105,7 +117,14 @@ public class CognitionCore implements StateSerializable {
|
||||
new BlockContent("recent_chat_messages", "communication_producer") {
|
||||
@Override
|
||||
protected void fillXml(@NotNull Document document, @NotNull Element root) {
|
||||
appendRepeatedElements(document, root, "chat_message", resolveRecentChatMessages());
|
||||
appendTextElement(document, root, "message_tag_notes", RECENT_CHAT_MESSAGE_NOTES);
|
||||
Element chatMessagesElement = document.createElement("chat_messages");
|
||||
root.appendChild(chatMessagesElement);
|
||||
appendRepeatedElements(document, chatMessagesElement, "chat_message", resolveRecentChatMessages(), (messageElement, message) -> {
|
||||
messageElement.setAttribute("role", message.roleValue());
|
||||
messageElement.setTextContent(message.getContent());
|
||||
return Unit.INSTANCE;
|
||||
});
|
||||
}
|
||||
},
|
||||
Set.of(ContextBlock.FocusedDomain.COMMUNICATION),
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package work.slhaf.partner.module.communication;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
@@ -27,20 +25,37 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class CommunicationProducer extends AbstractAgentModule.Running<PartnerRunningFlowContext> implements ActivateModel {
|
||||
|
||||
private static final String INTERRUPTED_MARKER = " [response interrupted due to internal exception]";
|
||||
private static final String NO_REPLY_MARKER = "NO_REPLY";
|
||||
private static final String NOT_REPLIED_MARKER = "NOT_REPLIED";
|
||||
private static final String NOT_REPLIED_PREFIX = "[" + NOT_REPLIED_MARKER + "]: ";
|
||||
|
||||
private static final String MODULE_PROMPT = """
|
||||
你是 Partner 的表达模块。
|
||||
你接下来收到的消息固定分为三个区段:
|
||||
1. system message 是 Head, 用于说明整个输入结构与输出要求。
|
||||
2. <context> 区段只承载 type=CONTEXT 的上下文块, 其中每个子块都带有独立来源, 仅作为理解当前状态与辅助决策的依据。
|
||||
3. Conversation 区段是对话轨迹; 最新的一条 user message 会使用 <input> 结构, 其中 <inputs> 承载本轮按时间顺序排列的输入序列, 每个 <input> 节点会带有相对首条输入的时间间隔属性, 其他子标签是输入元信息与 type=SUPPLY 的补充块, 补充块会按 blockName 分区。
|
||||
你必须综合 Context 与 Conversation 回答最新输入, 不要把 XML 标签当作需要原样复述给用户的内容。
|
||||
直接输出最终回应内容即可, 不需要额外包装为 JSON。
|
||||
你当前正在承担 Partner 的对外交流职责。你需要基于系统此刻的上下文状态、保留的对话轨迹以及最新输入,生成自然、贴合当前情境、并与系统整体状态一致的交流结果。
|
||||
|
||||
你接下来收到的消息,将按照出现顺序,固定分为三个区段:
|
||||
1. system message 是 Head,用于说明整个输入结构与输出要求,即本条消息。
|
||||
2. <context> 区段承载系统中所有模块产生的上下文块。上下文块代表 Partner 在此刻的系统状态投影;其中每个子块都带有独立来源,可作为理解当前状态和辅助决策的依据。
|
||||
3. <conversation> 区段是系统此刻保留的对话轨迹;最新的一条 user message 会使用 <input> 结构,其中 <inputs> 承载本轮按时间顺序排列的输入序列,每个 <input> 节点会带有相对首条输入的时间间隔属性;其他子标签是输入元信息与 type=SUPPLY 的补充块,补充块会按 blockName 分区。
|
||||
|
||||
你的任务:
|
||||
- 综合 <context>、<conversation> 以及 SUPPLY 补充块,理解当前情境,并产出本轮交流结果。
|
||||
- 优先保证交流结果与当前系统状态一致,不要忽略明显相关的上下文信号。
|
||||
- 若最新输入与已有上下文存在张力,应以最新输入为当前交流的直接依据,再结合上下文判断如何回应。
|
||||
- 你当前负责的是对外交流,不负责直接规划行动、修改系统状态,或伪造并不存在的执行结果。
|
||||
|
||||
输出契约:
|
||||
- 默认情况下,直接输出要发送给用户的最终回复正文,不要添加额外标签、解释或前后缀。
|
||||
- 若当前情境下不应直接向用户发出回复,但仍需要留下本轮交流结果供系统后续保留在交流轨迹中,则输出以 NO_REPLY 开头。
|
||||
- 使用 NO_REPLY 时,格式为:
|
||||
|
||||
NO_REPLY
|
||||
这里写本轮交流结果正文
|
||||
|
||||
- 以 NO_REPLY 开头的输出不会直接展示给用户;系统在写入交流轨迹时,会以单独的历史标记形式保留该结果。
|
||||
- 不要输出空字符串;若选择不直接回复用户,应使用 NO_REPLY 契约明确表达。
|
||||
""";
|
||||
|
||||
@InjectCapability
|
||||
@@ -99,13 +114,27 @@ public class CommunicationProducer extends AbstractAgentModule.Running<PartnerRu
|
||||
formatConversationUserMessage(runningFlowContext)
|
||||
);
|
||||
chatMessages.add(primaryUserMessage);
|
||||
Message assistantMessage = new Message(Message.Character.ASSISTANT, response);
|
||||
Message assistantMessage = new Message(Message.Character.ASSISTANT, normalizeAssistantHistoryMessage(response));
|
||||
chatMessages.add(assistantMessage);
|
||||
} finally {
|
||||
cognitionCapability.getMessageLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private String normalizeAssistantHistoryMessage(String response) {
|
||||
String trimmed = response == null ? "" : response.trim();
|
||||
if (trimmed.equals(NO_REPLY_MARKER)) {
|
||||
return NOT_REPLIED_PREFIX.trim();
|
||||
}
|
||||
if (trimmed.startsWith(NO_REPLY_MARKER + "\n")) {
|
||||
return NOT_REPLIED_PREFIX + trimmed.substring((NO_REPLY_MARKER + "\n").length()).trim();
|
||||
}
|
||||
if (trimmed.startsWith(NO_REPLY_MARKER + "\r\n")) {
|
||||
return NOT_REPLIED_PREFIX + trimmed.substring((NO_REPLY_MARKER + "\r\n").length()).trim();
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private List<Message> snapshotConversationMessages() {
|
||||
List<Message> snapshot = new ArrayList<>(cognitionCapability.snapshotChatMessages());
|
||||
snapshot.removeIf(this::isStructuredUserMessage);
|
||||
|
||||
@@ -11,6 +11,7 @@ import kotlin.time.Duration.Companion.milliseconds
|
||||
object ReplyDispatcher {
|
||||
|
||||
private const val AGGREGATE_WINDOW_MILLIS = 100L
|
||||
private const val NO_REPLY_MARKER = "NO_REPLY"
|
||||
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
|
||||
private val collectorChannel = Channel<ReplyChunk>(Channel.UNLIMITED)
|
||||
@@ -79,11 +80,87 @@ object ReplyDispatcher {
|
||||
private val target: String,
|
||||
) : StreamChatMessageConsumer() {
|
||||
|
||||
override fun consumeDelta(delta: String?) {
|
||||
if (delta != null) {
|
||||
collectorChannel.trySend(ReplyChunk(delta, target)).isSuccess
|
||||
private enum class VisibilityState {
|
||||
UNDECIDED,
|
||||
NO_REPLY,
|
||||
VISIBLE
|
||||
}
|
||||
|
||||
private val rawResponse = StringBuilder()
|
||||
private val undecidedBuffer = StringBuilder()
|
||||
private var visibilityState = VisibilityState.UNDECIDED
|
||||
|
||||
override fun onDelta(delta: String) {
|
||||
rawResponse.append(delta)
|
||||
routeDelta(delta)
|
||||
}
|
||||
|
||||
override fun collectResponse(): String {
|
||||
finalizeUndecidedBuffer()
|
||||
return rawResponse.toString()
|
||||
}
|
||||
|
||||
private fun routeDelta(delta: String) {
|
||||
when (visibilityState) {
|
||||
VisibilityState.NO_REPLY -> return
|
||||
VisibilityState.VISIBLE -> flushVisible(delta)
|
||||
VisibilityState.UNDECIDED -> {
|
||||
undecidedBuffer.append(delta)
|
||||
resolveVisibility()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun resolveVisibility() {
|
||||
val content = undecidedBuffer.toString()
|
||||
if (content.length <= NO_REPLY_MARKER.length) {
|
||||
if (NO_REPLY_MARKER.startsWith(content)) {
|
||||
return
|
||||
}
|
||||
revealBufferedContent()
|
||||
return
|
||||
}
|
||||
|
||||
if (!content.startsWith(NO_REPLY_MARKER)) {
|
||||
revealBufferedContent()
|
||||
return
|
||||
}
|
||||
|
||||
val suffixFirstChar = content[NO_REPLY_MARKER.length]
|
||||
if (suffixFirstChar == '\n' || suffixFirstChar == '\r') {
|
||||
visibilityState = VisibilityState.NO_REPLY
|
||||
undecidedBuffer.setLength(0)
|
||||
return
|
||||
}
|
||||
revealBufferedContent()
|
||||
}
|
||||
|
||||
private fun finalizeUndecidedBuffer() {
|
||||
if (visibilityState != VisibilityState.UNDECIDED) {
|
||||
return
|
||||
}
|
||||
if (undecidedBuffer.toString() == NO_REPLY_MARKER) {
|
||||
visibilityState = VisibilityState.NO_REPLY
|
||||
undecidedBuffer.setLength(0)
|
||||
return
|
||||
}
|
||||
revealBufferedContent()
|
||||
}
|
||||
|
||||
private fun revealBufferedContent() {
|
||||
visibilityState = VisibilityState.VISIBLE
|
||||
if (undecidedBuffer.isNotEmpty()) {
|
||||
flushVisible(undecidedBuffer.toString())
|
||||
undecidedBuffer.setLength(0)
|
||||
}
|
||||
}
|
||||
|
||||
private fun flushVisible(delta: String) {
|
||||
collectorChannel.trySend(ReplyChunk(delta, target)).isSuccess
|
||||
}
|
||||
|
||||
override fun consumeDelta(delta: String?) {
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
package work.slhaf.partner.core.cognition;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import work.slhaf.partner.framework.agent.model.pojo.Message;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class CognitionCoreTest {
|
||||
|
||||
@BeforeAll
|
||||
static void beforeAll(@TempDir Path tempDir) {
|
||||
System.setProperty("user.home", tempDir.toAbsolutePath().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldRenderRecentChatMessagesWithWrapperAndNotes() {
|
||||
CognitionCore cognitionCore = new CognitionCore();
|
||||
cognitionCore.getChatMessages().addAll(List.of(
|
||||
new Message(Message.Character.USER, "[[USER]: user-1]: hello"),
|
||||
new Message(Message.Character.ASSISTANT, "[NOT_REPLIED]: wait"),
|
||||
new Message(Message.Character.ASSISTANT, "latest message")
|
||||
));
|
||||
|
||||
cognitionCore.refreshRecentChatMessagesContext();
|
||||
String content = cognitionCore.contextWorkspace()
|
||||
.resolve(List.of(ContextBlock.FocusedDomain.COMMUNICATION))
|
||||
.encodeToMessage()
|
||||
.getContent();
|
||||
|
||||
assertTrue(content.contains("<message_tag_notes>"));
|
||||
assertTrue(content.contains("<chat_messages>"));
|
||||
assertTrue(content.contains("<chat_message role=\"user\">[[USER]: user-1]: hello</chat_message>"));
|
||||
assertTrue(content.contains("<chat_message role=\"assistant\">[NOT_REPLIED]: wait</chat_message>"));
|
||||
assertTrue(content.contains("[USER]"));
|
||||
assertTrue(content.contains("[AGENT]"));
|
||||
assertTrue(content.contains("[NOT_REPLIED]"));
|
||||
assertFalse(content.contains("latest message"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
package work.slhaf.partner.module.communication;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import work.slhaf.partner.core.cognition.CognitionCapability;
|
||||
import work.slhaf.partner.core.cognition.ContextWorkspace;
|
||||
import work.slhaf.partner.framework.agent.model.pojo.Message;
|
||||
import work.slhaf.partner.runtime.PartnerRunningFlowContext;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
class CommunicationProducerTest {
|
||||
|
||||
private static void invokeUpdateChatMessages(
|
||||
CommunicationProducer producer,
|
||||
PartnerRunningFlowContext context,
|
||||
String response
|
||||
) throws Exception {
|
||||
Method method = CommunicationProducer.class.getDeclaredMethod(
|
||||
"updateChatMessages",
|
||||
PartnerRunningFlowContext.class,
|
||||
String.class
|
||||
);
|
||||
method.setAccessible(true);
|
||||
method.invoke(producer, context, response);
|
||||
}
|
||||
|
||||
private static void setField(Object target, String fieldName, Object value) throws Exception {
|
||||
Field field = target.getClass().getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
field.set(target, value);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldConvertNoReplyResponseWhenWritingHistory() throws Exception {
|
||||
StubCognitionCapability cognitionCapability = new StubCognitionCapability();
|
||||
CommunicationProducer producer = new CommunicationProducer();
|
||||
setField(producer, "cognitionCapability", cognitionCapability);
|
||||
|
||||
invokeUpdateChatMessages(
|
||||
producer,
|
||||
PartnerRunningFlowContext.fromUser("user-1", "hello"),
|
||||
"NO_REPLY\nnot now"
|
||||
);
|
||||
|
||||
List<Message> chatMessages = cognitionCapability.getChatMessages();
|
||||
assertEquals(2, chatMessages.size());
|
||||
assertEquals("[[USER]: user-1]: hello", chatMessages.get(0).getContent());
|
||||
assertEquals("[NOT_REPLIED]: not now", chatMessages.get(1).getContent());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldKeepRegularAssistantResponseUntouched() throws Exception {
|
||||
StubCognitionCapability cognitionCapability = new StubCognitionCapability();
|
||||
CommunicationProducer producer = new CommunicationProducer();
|
||||
setField(producer, "cognitionCapability", cognitionCapability);
|
||||
|
||||
invokeUpdateChatMessages(
|
||||
producer,
|
||||
PartnerRunningFlowContext.fromUser("user-1", "hello"),
|
||||
"normal reply"
|
||||
);
|
||||
|
||||
List<Message> chatMessages = cognitionCapability.getChatMessages();
|
||||
assertEquals("normal reply", chatMessages.get(1).getContent());
|
||||
}
|
||||
|
||||
private static final class StubCognitionCapability implements CognitionCapability {
|
||||
private final ContextWorkspace contextWorkspace = new ContextWorkspace();
|
||||
private final List<Message> chatMessages = new ArrayList<>();
|
||||
private final Lock lock = new ReentrantLock();
|
||||
|
||||
@Override
|
||||
public void initiateTurn(String input, String target, String... skippedModules) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContextWorkspace contextWorkspace() {
|
||||
return contextWorkspace;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Message> getChatMessages() {
|
||||
return chatMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Message> snapshotChatMessages() {
|
||||
return List.copyOf(chatMessages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollChatMessagesWithSnapshot(int snapshotSize, int retainDivisor) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshRecentChatMessagesContext() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Lock getMessageLock() {
|
||||
return lock;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user