diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/MemoryInputEntry.java b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/MemoryInputEntry.java new file mode 100644 index 00000000..31a182aa --- /dev/null +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/MemoryInputEntry.java @@ -0,0 +1,15 @@ +package work.slhaf.partner.module.memory.selector; + +import lombok.AllArgsConstructor; +import lombok.Data; +import work.slhaf.partner.framework.agent.interaction.flow.RunningFlowContext; + +import java.time.LocalDateTime; +import java.util.List; + +@Data +@AllArgsConstructor +public class MemoryInputEntry { + private LocalDateTime receivedDateTime; + private List inputs; +} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/MemorySelector.java b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/MemorySelector.java index b89b4ef1..c6e38b24 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/MemorySelector.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/MemorySelector.java @@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.w3c.dom.Document; import org.w3c.dom.Element; +import work.slhaf.partner.core.action.ActionCapability; +import work.slhaf.partner.core.action.ActionCore; import work.slhaf.partner.core.cognition.BlockContent; import work.slhaf.partner.core.cognition.CognitionCapability; import work.slhaf.partner.core.cognition.ContextBlock; @@ -23,7 +25,11 @@ import work.slhaf.partner.module.memory.selector.extractor.entity.ExtractorResul import work.slhaf.partner.runtime.PartnerRunningFlowContext; import java.time.LocalDate; +import java.time.ZoneId; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j public class MemorySelector extends AbstractAgentModule.Running { @@ -33,6 +39,7 @@ public class MemorySelector extends AbstractAgentModule.Running collectedInputs = new ArrayList<>(); + @InjectCapability + private ActionCapability actionCapability; @Override protected void doExecute(@NotNull PartnerRunningFlowContext runningFlowContext) { - List snapshotInputs = List.copyOf(runningFlowContext.getInputs()); + collectInputs(runningFlowContext); + tryStartMemoryRecallWorker(); + } + + private void collectInputs(PartnerRunningFlowContext runningFlowContext) { + inputsLock.lock(); + try { + collectedInputs.add(new MemoryInputEntry( + runningFlowContext.getFirstInputDateTime(), + List.copyOf(runningFlowContext.getInputs()) + )); + } finally { + inputsLock.unlock(); + } + } + + private void tryStartMemoryRecallWorker() { + if (!memoryCalling.compareAndSet(false, true)) { + return; + } + + actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL).execute(() -> { + try { + drainMemoryRecall(); + } finally { + memoryCalling.set(false); + if (hasCollectedInputs()) { + tryStartMemoryRecallWorker(); + } + } + }); + } + + private void drainMemoryRecall() { + while (true) { + List snapshotInputs = drainCollectedInputs(); + if (snapshotInputs.isEmpty()) { + return; + } + try { + recallMemory(snapshotInputs); + } catch (Exception e) { + log.error("[MemorySelector] 记忆召回任务执行失败", e); + } + } + } + + private void recallMemory(List memoryInputEntries) { ExtractorInput input = new ExtractorInput( - snapshotInputs, - memoryRuntime.getTopicTree(), - runningFlowContext.getFirstInputDateTime().toLocalDate() + memoryInputEntries, + memoryRuntime.getTopicTree() ); ExtractorResult extractorResult = memoryRecallCueExtractor.execute(input); if (extractorResult.getMatches().isEmpty()) { return; } - List activatedSlices = selectAndEvaluateMemory(snapshotInputs, extractorResult); + List activatedSlices = selectAndEvaluateMemory(flattenInputs(memoryInputEntries), extractorResult); updateMemoryContext(activatedSlices); } + private List drainCollectedInputs() { + inputsLock.lock(); + try { + if (collectedInputs.isEmpty()) { + return List.of(); + } + List snapshot = new ArrayList<>(collectedInputs); + collectedInputs.clear(); + snapshot.sort(Comparator.comparing(MemoryInputEntry::getReceivedDateTime)); + return snapshot; + } finally { + inputsLock.unlock(); + } + } + + private boolean hasCollectedInputs() { + inputsLock.lock(); + try { + return !collectedInputs.isEmpty(); + } finally { + inputsLock.unlock(); + } + } + + private List flattenInputs(List memoryInputEntries) { + if (memoryInputEntries.isEmpty()) { + return List.of(); + } + long firstEpochMillis = memoryInputEntries.stream() + .map(MemoryInputEntry::getReceivedDateTime) + .mapToLong(this::toEpochMillis) + .min() + .orElseThrow(); + + return memoryInputEntries.stream() + .flatMap(entry -> { + long entryEpochMillis = toEpochMillis(entry.getReceivedDateTime()); + return entry.getInputs().stream() + .map(input -> new RunningFlowContext.InputEntry( + entryEpochMillis + input.getOffsetMillis() - firstEpochMillis, + input.getContent() + )); + }) + .sorted(Comparator.comparingLong(RunningFlowContext.InputEntry::getOffsetMillis)) + .toList(); + } + + private long toEpochMillis(java.time.LocalDateTime dateTime) { + return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } + private void updateMemoryContext(List activatedSlices) { cognitionCapability.contextWorkspace().register(new ContextBlock( buildMemoryFullBlock(activatedSlices), diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/extractor/MemoryRecallCueExtractor.java b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/extractor/MemoryRecallCueExtractor.java index d0748bdb..09386377 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/extractor/MemoryRecallCueExtractor.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/extractor/MemoryRecallCueExtractor.java @@ -19,6 +19,7 @@ import work.slhaf.partner.module.memory.selector.extractor.entity.ExtractorResul import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; public class MemoryRecallCueExtractor extends AbstractAgentModule.Sub implements ActivateModel { @@ -29,12 +30,11 @@ public class MemoryRecallCueExtractor extends AbstractAgentModule.Sub { - appendListElement(document, inputsElement, "inputs", "input", input.getInputs(), (inputElement, entry) -> { - inputElement.setAttribute("interval-to-first", String.valueOf(entry.getOffsetMillis())); - inputElement.setTextContent(entry.getContent()); + appendChildElement(document, root, "memory_input_entries", (entriesElement) -> { + appendRepeatedElements(document, entriesElement, "memory_input_entry", input.getMemoryInputEntries().stream() + .sorted(Comparator.comparing(work.slhaf.partner.module.memory.selector.MemoryInputEntry::getReceivedDateTime)) + .toList(), (entryElement, memoryInputEntry) -> { + appendTextElement(document, entryElement, "received_date_time", memoryInputEntry.getReceivedDateTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); + appendListElement(document, entryElement, "inputs", "input", memoryInputEntry.getInputs(), (inputElement, entry) -> { + inputElement.setAttribute("interval-to-first", String.valueOf(entry.getOffsetMillis())); + inputElement.setTextContent(entry.getContent()); + return Unit.INSTANCE; + }); return Unit.INSTANCE; }); return Unit.INSTANCE; }); - appendTextElement(document, root, "current_date", input.getDate().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); appendTextElement(document, root, "memory_topic_tree", input.getTopic_tree()); } }.encodeToMessage(); diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/extractor/entity/ExtractorInput.java b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/extractor/entity/ExtractorInput.java index ba5031ee..be2f9320 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/extractor/entity/ExtractorInput.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/selector/extractor/entity/ExtractorInput.java @@ -2,15 +2,13 @@ package work.slhaf.partner.module.memory.selector.extractor.entity; import lombok.AllArgsConstructor; import lombok.Data; -import work.slhaf.partner.framework.agent.interaction.flow.RunningFlowContext; +import work.slhaf.partner.module.memory.selector.MemoryInputEntry; -import java.time.LocalDate; import java.util.List; @Data @AllArgsConstructor public class ExtractorInput { - private List inputs; + private List memoryInputEntries; private String topic_tree; - private LocalDate date; }