From 85818556f89ed3483f9e40981bfb4c91fa8fd489 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Thu, 16 Oct 2025 15:22:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E8=AE=B0=E5=BF=86=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E7=9A=84=E7=BC=93=E5=AD=98=E9=80=BB=E8=BE=91=E8=BF=81=E7=A7=BB?= =?UTF-8?q?=E8=87=B3=20MemoryCore;=20=E7=A7=BB=E9=99=A4=E4=BA=86=20CacheCo?= =?UTF-8?q?re=EF=BC=8C=E5=B9=B6=E5=B0=86=20CoordinatedManager=20=E4=B8=AD?= =?UTF-8?q?=E5=8E=9F=E8=AE=B0=E5=BF=86=E6=A8=A1=E5=9D=97=E4=B8=8E=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E6=A8=A1=E5=9D=97=E4=B8=AD=E7=9A=84=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E8=BF=81=E7=A7=BB=E8=87=B3=E7=8E=B0=E8=AE=B0=E5=BF=86=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E4=B8=AD=EF=BC=8C=E7=A1=AE=E4=BF=9D=E8=AF=AD=E4=B9=89?= =?UTF-8?q?=E6=AD=A3=E7=A1=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../partner/core/CoordinatedManager.java | 85 +---- .../partner/core/cache/CacheCapability.java | 25 -- .../slhaf/partner/core/cache/CacheCore.java | 233 ------------- .../partner/core/cognation/CognationCore.java | 4 +- .../partner/core/memory/MemoryCapability.java | 35 +- .../slhaf/partner/core/memory/MemoryCore.java | 318 ++++++++++++++++-- .../memory/selector/MemorySelector.java | 19 +- .../extractor/MemorySelectExtractor.java | 5 +- .../modules/memory/updater/MemoryUpdater.java | 7 +- 9 files changed, 345 insertions(+), 386 deletions(-) delete mode 100644 Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCapability.java delete mode 100644 Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCore.java diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/CoordinatedManager.java b/Partner-Main/src/main/java/work/slhaf/partner/core/CoordinatedManager.java index 27686167..ef34de10 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/CoordinatedManager.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/CoordinatedManager.java @@ -5,22 +5,13 @@ import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.capability.annotation.CoordinateManager; import work.slhaf.partner.api.agent.factory.capability.annotation.Coordinated; import work.slhaf.partner.api.chat.constant.ChatConstant; -import work.slhaf.partner.core.cache.CacheCore; import work.slhaf.partner.core.cognation.CognationCore; import work.slhaf.partner.core.memory.MemoryCore; -import work.slhaf.partner.core.memory.pojo.MemoryResult; -import work.slhaf.partner.core.memory.pojo.MemorySlice; -import work.slhaf.partner.core.memory.pojo.MemorySliceResult; -import work.slhaf.partner.core.perceive.PerceiveCore; -import java.io.IOException; import java.io.Serial; import java.io.Serializable; -import java.time.LocalDate; -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.HashSet; +import java.util.Set; import static work.slhaf.partner.common.util.ExtractUtil.extractUserId; @@ -32,83 +23,13 @@ public class CoordinatedManager implements Serializable { @Serial private static final long serialVersionUID = 1L; - private final Lock sliceInsertLock = new ReentrantLock(); - //在框架将自动注入core,详见CapabilityRegistryFactory private CognationCore cognationCore; - private CacheCore cacheCore; private MemoryCore memoryCore; - private PerceiveCore perceiveCore; - - @Coordinated(capability = "memory") - public MemoryResult selectMemory(String topicPathStr) { - MemoryResult memoryResult; - List topicPath = List.of(topicPathStr.split("->")); - try { - List path = new ArrayList<>(topicPath); - //每日刷新缓存 - cacheCore.checkCacheDate(); - //检测缓存并更新计数, 查看是否需要放入缓存 - cacheCore.updateCacheCounter(path); - //查看是否存在缓存,如果存在,则直接返回 - if ((memoryResult = cacheCore.selectCache(path)) != null) { - return memoryResult; - } - memoryResult = memoryCore.selectMemory(path); - //尝试更新缓存 - cacheCore.updateCache(topicPath, memoryResult); - } catch (Exception e) { - log.error("[CoordinatedManager] selectMemory error: ", e); - log.error("[CoordinatedManager] 路径: {}", topicPathStr); - log.error("[CoordinatedManager] 主题树: {}", memoryCore.getTopicTree()); - memoryResult = new MemoryResult(); - memoryResult.setRelatedMemorySliceResult(new ArrayList<>()); - memoryResult.setMemorySliceResult(new CopyOnWriteArrayList<>()); - } - return cacheFilter(memoryResult); - } - - @Coordinated(capability = "memory") - public MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException { - return cacheFilter(memoryCore.selectMemory(date)); - } - - private MemoryResult cacheFilter(MemoryResult memoryResult) { - //过滤掉与缓存重复的切片 - CopyOnWriteArrayList memorySliceResult = memoryResult.getMemorySliceResult(); - List relatedMemorySliceResult = memoryResult.getRelatedMemorySliceResult(); - cacheCore.getDialogMap().forEach((k, v) -> { - memorySliceResult.removeIf(m -> m.getMemorySlice().getSummary().equals(v)); - relatedMemorySliceResult.removeIf(m -> m.getSummary().equals(v)); - }); - return memoryResult; - } - - - @Coordinated(capability = "memory") - public void insertSlice(MemorySlice memorySlice, String topicPath) { - sliceInsertLock.lock(); - List topicPathList = Arrays.stream(topicPath.split("->")).toList(); - try { - //检查是否存在当天对应的memorySlice并确定是否插入 - //每日刷新缓存 - cacheCore.checkCacheDate(); - //如果topicPath在memorySliceCache中存在对应缓存,由于进行的插入操作,则需要移除该缓存,但不清除相关计数 - cacheCore.clearCacheByTopicPath(topicPathList); - memoryCore.insertMemory(topicPathList, memorySlice); - if (!memorySlice.isPrivate()) { - cacheCore.updateUserDialogMap(memorySlice); - } - } catch (Exception e) { - log.error("[CoordinatedManager] 插入记忆时出错: ", e); - } - log.debug("[CoordinatedManager] 插入切片: {}, 路径: {}", memorySlice, topicPath); - sliceInsertLock.unlock(); - } private boolean isCacheSingleUser() { - return cacheCore.getUserDialogMap().size() <= 1; + return memoryCore.getUserDialogMap().size() <= 1; } @Coordinated(capability = "cognation") diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCapability.java b/Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCapability.java deleted file mode 100644 index 43da5c81..00000000 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCapability.java +++ /dev/null @@ -1,25 +0,0 @@ -package work.slhaf.partner.core.cache; - -import work.slhaf.partner.api.agent.factory.capability.annotation.Capability; -import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -@Capability(value = "cache") -public interface CacheCapability { - HashMap getDialogMap(); - ConcurrentHashMap getUserDialogMap(String userId); - void updateDialogMap(LocalDateTime dateTime, String newDialogCache); - String getDialogMapStr(); - String getUserDialogMapStr(String userId); - void updateActivatedSlices(String userId, List memorySlices); - String getActivatedSlicesStr(String userId); - HashMap> getActivatedSlices(); - void clearActivatedSlices(String userId); - boolean hasActivatedSlices(String userId); - int getActivatedSlicesSize(String userId); - List getActivatedSlices(String userId); -} diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCore.java deleted file mode 100644 index 3bb9fd57..00000000 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCore.java +++ /dev/null @@ -1,233 +0,0 @@ -package work.slhaf.partner.core.cache; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore; -import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod; -import work.slhaf.partner.core.PartnerCore; -import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; -import work.slhaf.partner.core.memory.pojo.MemoryResult; -import work.slhaf.partner.core.memory.pojo.MemorySlice; - -import java.io.IOException; -import java.io.Serial; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -@EqualsAndHashCode(callSuper = true) -@Slf4j -@CapabilityCore(value = "cache") -@Getter -@Setter -public class CacheCore extends PartnerCore { - - @Serial - private static final long serialVersionUID = 1L; - - /** - * 近两日的对话总结缓存, 用于为大模型提供必要的记忆补充, hashmap以切片的存储时间为键,总结为值 - * 该部分作为'主LLM'system prompt常驻 - * 该部分作为近两日的整体对话缓存, 不区分用户 - */ - private HashMap dialogMap = new HashMap<>(); - - /** - * 近两日的区分用户的对话总结缓存,在prompt结构上比dialogMap层级深一层, dialogMap更具近两日整体对话的摘要性质 - */ - private ConcurrentHashMap> userDialogMap = new ConcurrentHashMap<>(); - - /** - * memorySliceCache计数器,每日清空 - */ - private ConcurrentHashMap /*触发查询的主题列表*/, Integer> memoryNodeCacheCounter = new ConcurrentHashMap<>(); - - /** - * 记忆切片缓存,每日清空 - * 用于记录作为终点节点调用次数最多的记忆节点的切片数据 - */ - private ConcurrentHashMap /*主题路径*/, MemoryResult /*切片列表*/> memorySliceCache = new ConcurrentHashMap<>(); - - /** - * 缓存日期 - */ - private LocalDate cacheDate; - - /** - * 已被选中的切片时间戳集合,需要及时清理 - */ - private Set selectedSlices = new HashSet<>(); - - private HashMap> activatedSlices = new HashMap<>(); - - public CacheCore() throws IOException, ClassNotFoundException { - } - - - @CapabilityMethod - public void updateDialogMap(LocalDateTime dateTime, String newDialogCache) { - List keysToRemove = new ArrayList<>(); - dialogMap.forEach((k, v) -> { - if (dateTime.minusDays(2).isAfter(k)) { - keysToRemove.add(k); - } - }); - for (LocalDateTime temp : keysToRemove) { - dialogMap.remove(temp); - } - keysToRemove.clear(); - //放入新缓存 - dialogMap.put(dateTime, newDialogCache); - } - - @CapabilityMethod - public HashMap getDialogMap() { - return dialogMap; - } - - @CapabilityMethod - public ConcurrentHashMap getUserDialogMap(String userId) { - return this.getUserDialogMap().get(userId); - } - - @CapabilityMethod - public String getDialogMapStr() { - StringBuilder str = new StringBuilder(); - this.getDialogMap().forEach((dateTime, dialog) -> str.append("\n\n").append("[").append(dateTime).append("]\n") - .append(dialog)); - return str.toString(); - } - - @CapabilityMethod - public String getUserDialogMapStr(String userId) { - if (this.getUserDialogMap().containsKey(userId)) { - StringBuilder str = new StringBuilder(); - Collection dialogMapValues = this.getDialogMap().values(); - this.getUserDialogMap().get(userId).forEach((dateTime, dialog) -> { - if (dialogMapValues.contains(dialog)) { - return; - } - str.append("\n\n").append("[").append(dateTime).append("]\n") - .append(dialog); - }); - return str.toString(); - } else { - return null; - } - } - - public void updateCacheCounter(List topicPath) { - if (memoryNodeCacheCounter.containsKey(topicPath)) { - Integer tempCount = memoryNodeCacheCounter.get(topicPath); - memoryNodeCacheCounter.put(topicPath, ++tempCount); - } else { - memoryNodeCacheCounter.put(topicPath, 1); - } - } - - public void checkCacheDate() { - if (cacheDate == null || cacheDate.isBefore(LocalDate.now())) { - memorySliceCache.clear(); - memoryNodeCacheCounter.clear(); - cacheDate = LocalDate.now(); - } - } - - public void updateCache(List topicPath, MemoryResult memoryResult) { - Integer tempCount = memoryNodeCacheCounter.get(topicPath); - if (tempCount == null) { - log.warn("[CacheCore] tempCount为null? memoryNodeCacheCounter: {}; topicPath: {}", memoryNodeCacheCounter, topicPath); - return; - } - if (tempCount >= 5) { - memorySliceCache.put(topicPath, memoryResult); - } - } - - public void updateUserDialogMap(MemorySlice slice) { - String summary = slice.getSummary(); - LocalDateTime now = LocalDateTime.now(); - - //更新userDialogMap - //移除两天前上下文缓存(切片总结) - List keysToRemove = new ArrayList<>(); - userDialogMap.forEach((k, v) -> v.forEach((i, j) -> { - if (now.minusDays(2).isAfter(i)) { - keysToRemove.add(i); - } - })); - for (LocalDateTime dateTime : keysToRemove) { - userDialogMap.forEach((k, v) -> v.remove(dateTime)); - } - //放入新缓存 - userDialogMap - .computeIfAbsent(slice.getStartUserId(), k -> new ConcurrentHashMap<>()) - .merge(now, summary, (oldVal, newVal) -> oldVal + " " + newVal); - - } - - public void clearCacheByTopicPath(List topicPath) { - memorySliceCache.remove(topicPath); - } - - public MemoryResult selectCache(List path) { - if (memorySliceCache.containsKey(path)) { - return memorySliceCache.get(path); - } - return null; - } - - @CapabilityMethod - public void updateActivatedSlices(String userId, List memorySlices) { - activatedSlices.put(userId, memorySlices); - log.debug("[CoordinatedManager] 已更新激活切片, userId: {}", userId); - } - - @CapabilityMethod - public String getActivatedSlicesStr(String userId) { - if (activatedSlices.containsKey(userId)) { - StringBuilder str = new StringBuilder(); - activatedSlices.get(userId).forEach(slice -> str.append("\n\n").append("[").append(slice.getDate()).append("]\n") - .append(slice.getSummary())); - return str.toString(); - } else { - return null; - } - } - - @CapabilityMethod - public HashMap> getActivatedSlices() { - return activatedSlices; - } - - @CapabilityMethod - public void clearActivatedSlices(String userId) { - activatedSlices.remove(userId); - } - - @CapabilityMethod - public boolean hasActivatedSlices(String userId) { - if (!activatedSlices.containsKey(userId)) { - return false; - } - return !activatedSlices.get(userId).isEmpty(); - } - - @CapabilityMethod - public int getActivatedSlicesSize(String userId) { - return activatedSlices.get(userId).size(); - } - - @CapabilityMethod - public List getActivatedSlices(String userId) { - return activatedSlices.get(userId); - } - - @Override - protected String getCoreKey() { - return "cache-core"; - } -} diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCore.java index 2c4587d6..1157c1a8 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/cognation/CognationCore.java @@ -83,14 +83,14 @@ public class CognationCore extends PartnerCore { @CapabilityMethod public void addMetaMessage(String userId, MetaMessage metaMessage) { - log.debug("[{}] 当前会话历史: {}", JSONObject.toJSONString(singleMetaMessageMap)); + log.debug("[{}] 当前会话历史: {}", getCoreKey(), JSONObject.toJSONString(singleMetaMessageMap)); if (singleMetaMessageMap.containsKey(userId)) { singleMetaMessageMap.get(userId).add(metaMessage); } else { singleMetaMessageMap.put(userId, new java.util.ArrayList<>()); singleMetaMessageMap.get(userId).add(metaMessage); } - log.debug("[SessionManager] 会话历史更新: {}", JSONObject.toJSONString(singleMetaMessageMap)); + log.debug("[{}] 会话历史更新: {}", getCoreKey(), JSONObject.toJSONString(singleMetaMessageMap)); } @CapabilityMethod diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCapability.java b/Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCapability.java index 57381567..0b62a9e8 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCapability.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCapability.java @@ -1,25 +1,52 @@ package work.slhaf.partner.core.memory; import work.slhaf.partner.api.agent.factory.capability.annotation.Capability; -import work.slhaf.partner.api.agent.factory.capability.annotation.ToCoordinated; +import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; import work.slhaf.partner.core.memory.pojo.MemoryResult; import work.slhaf.partner.core.memory.pojo.MemorySlice; import java.io.IOException; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; @Capability(value = "memory") public interface MemoryCapability { void cleanSelectedSliceFilter(); + String getTopicTree(); - @ToCoordinated + HashMap getDialogMap(); + + ConcurrentHashMap getUserDialogMap(String userId); + + void updateDialogMap(LocalDateTime dateTime, String newDialogCache); + + String getDialogMapStr(); + + String getUserDialogMapStr(String userId); + + void updateActivatedSlices(String userId, List memorySlices); + + String getActivatedSlicesStr(String userId); + + HashMap> getActivatedSlices(); + + void clearActivatedSlices(String userId); + + boolean hasActivatedSlices(String userId); + + int getActivatedSlicesSize(String userId); + + List getActivatedSlices(String userId); + MemoryResult selectMemory(String topicPathStr); - @ToCoordinated MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException; - @ToCoordinated void insertSlice(MemorySlice memorySlice, String topicPath); + } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCore.java b/Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCore.java index beb70878..46924c05 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCore.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCore.java @@ -3,11 +3,13 @@ package work.slhaf.partner.core.memory; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore; import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod; import work.slhaf.partner.core.PartnerCore; import work.slhaf.partner.core.memory.exception.UnExistedDateIndexException; import work.slhaf.partner.core.memory.exception.UnExistedTopicException; +import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; import work.slhaf.partner.core.memory.pojo.MemoryResult; import work.slhaf.partner.core.memory.pojo.MemorySlice; import work.slhaf.partner.core.memory.pojo.MemorySliceResult; @@ -17,14 +19,18 @@ import work.slhaf.partner.core.memory.pojo.node.TopicNode; import java.io.IOException; import java.io.Serial; import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @EqualsAndHashCode(callSuper = true) @CapabilityCore(value = "memory") @Getter @Setter +@Slf4j public class MemoryCore extends PartnerCore { @Serial @@ -56,12 +62,17 @@ public class MemoryCore extends PartnerCore { */ private Set selectedSlices = new HashSet<>(); - private HashMap> userIndex = new HashMap<>(); + private HashMap> userIndex = new HashMap<>(); + + private MemoryCache cache = new MemoryCache(); + + private final Lock sliceInsertLock = new ReentrantLock(); public MemoryCore() throws IOException, ClassNotFoundException { } + @CapabilityMethod public MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException { MemoryResult memoryResult = new MemoryResult(); CopyOnWriteArrayList targetSliceList = new CopyOnWriteArrayList<>(); @@ -79,7 +90,175 @@ public class MemoryCore extends PartnerCore { } } memoryResult.setMemorySliceResult(targetSliceList); - return memoryResult; + return cacheFilter(memoryResult); + } + + @CapabilityMethod + public void insertSlice(MemorySlice memorySlice, String topicPath) { + sliceInsertLock.lock(); + List topicPathList = Arrays.stream(topicPath.split("->")).toList(); + try { + //检查是否存在当天对应的memorySlice并确定是否插入 + //每日刷新缓存 + checkCacheDate(); + //如果topicPath在memorySliceCache中存在对应缓存,由于进行的插入操作,则需要移除该缓存,但不清除相关计数 + clearCacheByTopicPath(topicPathList); + insertMemory(topicPathList, memorySlice); + if (!memorySlice.isPrivate()) { + updateUserDialogMap(memorySlice); + } + } catch (Exception e) { + log.error("[CoordinatedManager] 插入记忆时出错: ", e); + } + log.debug("[CoordinatedManager] 插入切片: {}, 路径: {}", memorySlice, topicPath); + sliceInsertLock.unlock(); + } + + @CapabilityMethod + public String getTopicTree() { + StringBuilder stringBuilder = new StringBuilder(); + for (Map.Entry entry : topicNodes.entrySet()) { + String rootName = entry.getKey(); + TopicNode rootNode = entry.getValue(); + stringBuilder.append(rootName).append("[root]").append("\r\n"); + printSubTopicsTreeFormat(rootNode, "", stringBuilder); + } + return stringBuilder.toString(); + } + + @CapabilityMethod + public void updateDialogMap(LocalDateTime dateTime, String newDialogCache) { + List keysToRemove = new ArrayList<>(); + HashMap dialogMap = cache.dialogMap; + dialogMap.forEach((k, v) -> { + if (dateTime.minusDays(2).isAfter(k)) { + keysToRemove.add(k); + } + }); + for (LocalDateTime temp : keysToRemove) { + dialogMap.remove(temp); + } + keysToRemove.clear(); + //放入新缓存 + dialogMap.put(dateTime, newDialogCache); + } + + @CapabilityMethod + public HashMap getDialogMap() { + return cache.dialogMap; + } + + @CapabilityMethod + public ConcurrentHashMap getUserDialogMap(String userId) { + return cache.userDialogMap.get(userId); + } + + @CapabilityMethod + public String getDialogMapStr() { + StringBuilder str = new StringBuilder(); + this.getDialogMap().forEach((dateTime, dialog) -> str.append("\n\n").append("[").append(dateTime).append("]\n") + .append(dialog)); + return str.toString(); + } + + @CapabilityMethod + public String getUserDialogMapStr(String userId) { + ConcurrentHashMap> userDialogMap = cache.userDialogMap; + if (userDialogMap.containsKey(userId)) { + StringBuilder str = new StringBuilder(); + Collection dialogMapValues = this.getDialogMap().values(); + userDialogMap.get(userId).forEach((dateTime, dialog) -> { + if (dialogMapValues.contains(dialog)) { + return; + } + str.append("\n\n").append("[").append(dateTime).append("]\n") + .append(dialog); + }); + return str.toString(); + } else { + return null; + } + } + + @CapabilityMethod + public MemoryResult selectMemory(String topicPathStr) { + MemoryResult memoryResult; + List topicPath = List.of(topicPathStr.split("->")); + try { + List path = new ArrayList<>(topicPath); + //每日刷新缓存 + checkCacheDate(); + //检测缓存并更新计数, 查看是否需要放入缓存 + updateCacheCounter(path); + //查看是否存在缓存,如果存在,则直接返回 + if ((memoryResult = selectCache(path)) != null) { + return memoryResult; + } + memoryResult = selectMemory(path); + //尝试更新缓存 + updateCache(topicPath, memoryResult); + } catch (Exception e) { + log.error("[CoordinatedManager] selectMemory error: ", e); + log.error("[CoordinatedManager] 路径: {}", topicPathStr); + log.error("[CoordinatedManager] 主题树: {}", getTopicTree()); + memoryResult = new MemoryResult(); + memoryResult.setRelatedMemorySliceResult(new ArrayList<>()); + memoryResult.setMemorySliceResult(new CopyOnWriteArrayList<>()); + } + return cacheFilter(memoryResult); + } + + @CapabilityMethod + public void updateActivatedSlices(String userId, List memorySlices) { + cache.activatedSlices.put(userId, memorySlices); + log.debug("[CoordinatedManager] 已更新激活切片, userId: {}", userId); + } + + @CapabilityMethod + public String getActivatedSlicesStr(String userId) { + HashMap> activatedSlices = cache.activatedSlices; + if (activatedSlices.containsKey(userId)) { + StringBuilder str = new StringBuilder(); + activatedSlices.get(userId).forEach(slice -> str.append("\n\n").append("[").append(slice.getDate()).append("]\n") + .append(slice.getSummary())); + return str.toString(); + } else { + return null; + } + } + + @CapabilityMethod + public HashMap> getActivatedSlices() { + return cache.activatedSlices; + } + + @CapabilityMethod + public void clearActivatedSlices(String userId) { + cache.activatedSlices.remove(userId); + } + + @CapabilityMethod + public boolean hasActivatedSlices(String userId) { + HashMap> activatedSlices = cache.activatedSlices; + if (!activatedSlices.containsKey(userId)) { + return false; + } + return !activatedSlices.get(userId).isEmpty(); + } + + @CapabilityMethod + public int getActivatedSlicesSize(String userId) { + return cache.activatedSlices.get(userId).size(); + } + + @CapabilityMethod + public List getActivatedSlices(String userId) { + return cache.activatedSlices.get(userId); + } + + @CapabilityMethod + public void cleanSelectedSliceFilter() { + this.selectedSlices.clear(); } private List> loadSlicesByDate(LocalDate date) throws IOException, ClassNotFoundException { @@ -95,18 +274,6 @@ public class MemoryCore extends PartnerCore { return list; } - @CapabilityMethod - public String getTopicTree() { - StringBuilder stringBuilder = new StringBuilder(); - for (Map.Entry entry : topicNodes.entrySet()) { - String rootName = entry.getKey(); - TopicNode rootNode = entry.getValue(); - stringBuilder.append(rootName).append("[root]").append("\r\n"); - printSubTopicsTreeFormat(rootNode, "", stringBuilder); - } - return stringBuilder.toString(); - } - private void printSubTopicsTreeFormat(TopicNode node, String prefix, StringBuilder stringBuilder) { if (node.getTopicNodes() == null || node.getTopicNodes().isEmpty()) return; @@ -119,7 +286,7 @@ public class MemoryCore extends PartnerCore { } } - public void insertMemory(List topicPath, MemorySlice slice) throws IOException, ClassNotFoundException { + private void insertMemory(List topicPath, MemorySlice slice) throws IOException, ClassNotFoundException { LocalDate now = LocalDate.now(); boolean hasSlice = false; MemoryNode node = null; @@ -300,8 +467,6 @@ public class MemoryCore extends PartnerCore { } } - - private TopicNode getTargetParentNode(List topicPath, String targetTopic) { String topTopic = topicPath.getFirst(); if (!existedTopics.containsKey(topTopic)) { @@ -323,13 +488,126 @@ public class MemoryCore extends PartnerCore { return targetParentNode; } - @CapabilityMethod - public void cleanSelectedSliceFilter() { - this.selectedSlices.clear(); + public void updateCacheCounter(List topicPath) { + ConcurrentHashMap, Integer> memoryNodeCacheCounter = cache.memoryNodeCacheCounter; + if (memoryNodeCacheCounter.containsKey(topicPath)) { + Integer tempCount = memoryNodeCacheCounter.get(topicPath); + memoryNodeCacheCounter.put(topicPath, ++tempCount); + } else { + memoryNodeCacheCounter.put(topicPath, 1); + } + } + + private void checkCacheDate() { + if (cache.cacheDate == null || cache.cacheDate.isBefore(LocalDate.now())) { + cache.memorySliceCache.clear(); + cache.memoryNodeCacheCounter.clear(); + cache.cacheDate = LocalDate.now(); + } + } + + private void updateCache(List topicPath, MemoryResult memoryResult) { + ConcurrentHashMap, Integer> memoryNodeCacheCounter = cache.memoryNodeCacheCounter; + Integer tempCount = memoryNodeCacheCounter.get(topicPath); + if (tempCount == null) { + log.warn("[CacheCore] tempCount为null? memoryNodeCacheCounter: {}; topicPath: {}", memoryNodeCacheCounter, topicPath); + return; + } + if (tempCount >= 5) { + cache.memorySliceCache.put(topicPath, memoryResult); + } + } + + private void updateUserDialogMap(MemorySlice slice) { + String summary = slice.getSummary(); + LocalDateTime now = LocalDateTime.now(); + ConcurrentHashMap> userDialogMap = cache.userDialogMap; + + //更新userDialogMap + //移除两天前上下文缓存(切片总结) + List keysToRemove = new ArrayList<>(); + userDialogMap.forEach((k, v) -> v.forEach((i, j) -> { + if (now.minusDays(2).isAfter(i)) { + keysToRemove.add(i); + } + })); + for (LocalDateTime dateTime : keysToRemove) { + userDialogMap.forEach((k, v) -> v.remove(dateTime)); + } + //放入新缓存 + userDialogMap + .computeIfAbsent(slice.getStartUserId(), k -> new ConcurrentHashMap<>()) + .merge(now, summary, (oldVal, newVal) -> oldVal + " " + newVal); + + } + + private void clearCacheByTopicPath(List topicPath) { + cache.memorySliceCache.remove(topicPath); + } + + private MemoryResult selectCache(List path) { + ConcurrentHashMap, MemoryResult> memorySliceCache = cache.memorySliceCache; + if (memorySliceCache.containsKey(path)) { + return memorySliceCache.get(path); + } + return null; } @Override protected String getCoreKey() { return "memory-core"; } + + public ConcurrentHashMap> getUserDialogMap() { + return cache.userDialogMap; + } + + + private MemoryResult cacheFilter(MemoryResult memoryResult) { + //过滤掉与缓存重复的切片 + CopyOnWriteArrayList memorySliceResult = memoryResult.getMemorySliceResult(); + List relatedMemorySliceResult = memoryResult.getRelatedMemorySliceResult(); + cache.dialogMap.forEach((k, v) -> { + memorySliceResult.removeIf(m -> m.getMemorySlice().getSummary().equals(v)); + relatedMemorySliceResult.removeIf(m -> m.getSummary().equals(v)); + }); + return memoryResult; + } + + @SuppressWarnings("FieldMayBeFinal") + private static class MemoryCache { + + /** + * 近两日的对话总结缓存, 用于为大模型提供必要的记忆补充, hashmap以切片的存储时间为键,总结为值 + * 该部分作为'主LLM'system prompt常驻 + * 该部分作为近两日的整体对话缓存, 不区分用户 + */ + private HashMap dialogMap = new HashMap<>(); + + /** + * 近两日的区分用户的对话总结缓存,在prompt结构上比dialogMap层级深一层, dialogMap更具近两日整体对话的摘要性质 + */ + private ConcurrentHashMap> userDialogMap = new ConcurrentHashMap<>(); + + /** + * memorySliceCache计数器,每日清空 + */ + private ConcurrentHashMap /*触发查询的主题列表*/, Integer> memoryNodeCacheCounter = new ConcurrentHashMap<>(); + + /** + * 记忆切片缓存,每日清空 + * 用于记录作为终点节点调用次数最多的记忆节点的切片数据 + */ + private ConcurrentHashMap /*主题路径*/, MemoryResult /*切片列表*/> memorySliceCache = new ConcurrentHashMap<>(); + + /** + * 缓存日期 + */ + private LocalDate cacheDate; + + private HashMap> activatedSlices = new HashMap<>(); + + private MemoryCache() { + } + } } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/MemorySelector.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/MemorySelector.java index 3295814a..45394e2e 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/MemorySelector.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/MemorySelector.java @@ -7,7 +7,6 @@ import lombok.extern.slf4j.Slf4j; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule; import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule; -import work.slhaf.partner.core.cache.CacheCapability; import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.core.memory.exception.UnExistedDateIndexException; @@ -36,8 +35,6 @@ import java.util.List; @AgentModule(name="memory_selector",order=2) public class MemorySelector extends PreRunningModule { - @InjectCapability - private CacheCapability cacheCapability; @InjectCapability private MemoryCapability memoryCapability; @InjectCapability @@ -54,9 +51,9 @@ public class MemorySelector extends PreRunningModule { //获取主题路径 ExtractorResult extractorResult = memorySelectExtractor.execute(runningFlowContext); if (extractorResult.isRecall() || !extractorResult.getMatches().isEmpty()) { - cacheCapability.clearActivatedSlices(userId); + memoryCapability.clearActivatedSlices(userId); List evaluatedSlices = selectAndEvaluateMemory(runningFlowContext, extractorResult); - cacheCapability.updateActivatedSlices(userId, evaluatedSlices); + memoryCapability.updateActivatedSlices(userId, evaluatedSlices); } setModuleContextRecall(runningFlowContext); } @@ -81,10 +78,10 @@ public class MemorySelector extends PreRunningModule { private void setModuleContextRecall(PartnerRunningFlowContext runningFlowContext) { String userId = runningFlowContext.getUserId(); - boolean recall = cacheCapability.hasActivatedSlices(userId); + boolean recall = memoryCapability.hasActivatedSlices(userId); runningFlowContext.getModuleContext().getExtraContext().put("recall", recall); if (recall) { - runningFlowContext.getModuleContext().getExtraContext().put("recall_count", cacheCapability.getActivatedSlicesSize(userId)); + runningFlowContext.getModuleContext().getExtraContext().put("recall_count", memoryCapability.getActivatedSlicesSize(userId)); } } @@ -119,7 +116,7 @@ public class MemorySelector extends PreRunningModule { } private void removeDuplicateSlice(MemoryResult memoryResult) { - Collection values = cacheCapability.getDialogMap().values(); + Collection values = memoryCapability.getDialogMap().values(); memoryResult.getRelatedMemorySliceResult().removeIf(m -> values.contains(m.getSummary())); memoryResult.getMemorySliceResult().removeIf(m -> values.contains(m.getMemorySlice().getSummary())); } @@ -138,17 +135,17 @@ public class MemorySelector extends PreRunningModule { protected HashMap getPromptDataMap(String userId) { HashMap map = new HashMap<>(); - String dialogMapStr = cacheCapability.getDialogMapStr(); + String dialogMapStr = memoryCapability.getDialogMapStr(); if (!dialogMapStr.isEmpty()) { map.put("[记忆缓存] <你最近两日和所有聊天者的对话记忆印象>", dialogMapStr); } - String userDialogMapStr = cacheCapability.getUserDialogMapStr(userId); + String userDialogMapStr = memoryCapability.getUserDialogMapStr(userId); if (userDialogMapStr != null && !userDialogMapStr.isEmpty() && !cognationCapability.isSingleUser()) { map.put("[用户记忆缓存] <与最新一条消息的发送者的近两天对话记忆印象, 可能与[记忆缓存]稍有重复>", userDialogMapStr); } - String sliceStr = cacheCapability.getActivatedSlicesStr(userId); + String sliceStr = memoryCapability.getActivatedSlicesStr(userId); if (sliceStr != null && !sliceStr.isEmpty()) { map.put("[记忆切片] <你与最新一条消息的发送者的相关回忆, 不会与[记忆缓存]重复, 如果有重复你也可以指出来()>", sliceStr); } diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/extractor/MemorySelectExtractor.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/extractor/MemorySelectExtractor.java index 43a33185..23279790 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/extractor/MemorySelectExtractor.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/extractor/MemorySelectExtractor.java @@ -11,7 +11,6 @@ import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateM import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.api.chat.pojo.MetaMessage; -import work.slhaf.partner.core.cache.CacheCapability; import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; @@ -37,8 +36,6 @@ public class MemorySelectExtractor extends AgentRunningSubModule activatedMemorySlices = cacheCapability.getActivatedSlices(context.getUserId()); + List activatedMemorySlices = memoryCapability.getActivatedSlices(context.getUserId()); ExtractorInput extractorInput = ExtractorInput.builder() .text(context.getInput()) .date(context.getDateTime().toLocalDate()) diff --git a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/updater/MemoryUpdater.java b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/updater/MemoryUpdater.java index ad54c9a6..4b5d1eef 100644 --- a/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/updater/MemoryUpdater.java +++ b/Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/updater/MemoryUpdater.java @@ -11,7 +11,6 @@ import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule; import work.slhaf.partner.api.chat.constant.ChatConstant; import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; -import work.slhaf.partner.core.cache.CacheCapability; import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.core.memory.pojo.MemorySlice; @@ -45,8 +44,6 @@ public class MemoryUpdater extends PostRunningModule { @InjectCapability private MemoryCapability memoryCapability; @InjectCapability - private CacheCapability cacheCapability; - @InjectCapability private PerceiveCapability perceiveCapability; @InjectModule @@ -160,11 +157,11 @@ public class MemoryUpdater extends PostRunningModule { setInvolvedUserId(userId, memorySlice, chatMessages); memoryCapability.insertSlice(memorySlice, summarizeResult.getTopicPath()); - cacheCapability.updateDialogMap(LocalDateTime.now(), summarizeResult.getSummary()); + memoryCapability.updateDialogMap(LocalDateTime.now(), summarizeResult.getSummary()); } else { log.debug("[MemoryUpdater] 不存在多人聊天记录, 将以单聊总结为对话缓存的主要输入: {}", singleMemorySummary); - cacheCapability.updateDialogMap(LocalDateTime.now(), totalSummarizer.execute(singleMemorySummary)); + memoryCapability.updateDialogMap(LocalDateTime.now(), totalSummarizer.execute(singleMemorySummary)); } log.debug("[MemoryUpdater] 对话缓存更新完毕"); log.debug("[MemoryUpdater] 多人聊天记忆更新流程结束...");