将记忆模块的缓存逻辑迁移至 MemoryCore; 移除了 CacheCore,并将 CoordinatedManager 中原记忆模块与缓存模块中的逻辑迁移至现记忆模块中,确保语义正确

This commit is contained in:
2025-10-16 15:22:19 +08:00
parent cb1a25e9d5
commit 85818556f8
9 changed files with 345 additions and 386 deletions

View File

@@ -5,22 +5,13 @@ import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.CoordinateManager; import work.slhaf.partner.api.agent.factory.capability.annotation.CoordinateManager;
import work.slhaf.partner.api.agent.factory.capability.annotation.Coordinated; import work.slhaf.partner.api.agent.factory.capability.annotation.Coordinated;
import work.slhaf.partner.api.chat.constant.ChatConstant; import work.slhaf.partner.api.chat.constant.ChatConstant;
import work.slhaf.partner.core.cache.CacheCore;
import work.slhaf.partner.core.cognation.CognationCore; import work.slhaf.partner.core.cognation.CognationCore;
import work.slhaf.partner.core.memory.MemoryCore; import work.slhaf.partner.core.memory.MemoryCore;
import work.slhaf.partner.core.memory.pojo.MemoryResult;
import work.slhaf.partner.core.memory.pojo.MemorySlice;
import work.slhaf.partner.core.memory.pojo.MemorySliceResult;
import work.slhaf.partner.core.perceive.PerceiveCore;
import java.io.IOException;
import java.io.Serial; import java.io.Serial;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDate; import java.util.HashSet;
import java.util.*; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static work.slhaf.partner.common.util.ExtractUtil.extractUserId; import static work.slhaf.partner.common.util.ExtractUtil.extractUserId;
@@ -32,83 +23,13 @@ public class CoordinatedManager implements Serializable {
@Serial @Serial
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final Lock sliceInsertLock = new ReentrantLock();
//在框架将自动注入core,详见CapabilityRegistryFactory //在框架将自动注入core,详见CapabilityRegistryFactory
private CognationCore cognationCore; private CognationCore cognationCore;
private CacheCore cacheCore;
private MemoryCore memoryCore; private MemoryCore memoryCore;
private PerceiveCore perceiveCore;
@Coordinated(capability = "memory")
public MemoryResult selectMemory(String topicPathStr) {
MemoryResult memoryResult;
List<String> topicPath = List.of(topicPathStr.split("->"));
try {
List<String> path = new ArrayList<>(topicPath);
//每日刷新缓存
cacheCore.checkCacheDate();
//检测缓存并更新计数, 查看是否需要放入缓存
cacheCore.updateCacheCounter(path);
//查看是否存在缓存,如果存在,则直接返回
if ((memoryResult = cacheCore.selectCache(path)) != null) {
return memoryResult;
}
memoryResult = memoryCore.selectMemory(path);
//尝试更新缓存
cacheCore.updateCache(topicPath, memoryResult);
} catch (Exception e) {
log.error("[CoordinatedManager] selectMemory error: ", e);
log.error("[CoordinatedManager] 路径: {}", topicPathStr);
log.error("[CoordinatedManager] 主题树: {}", memoryCore.getTopicTree());
memoryResult = new MemoryResult();
memoryResult.setRelatedMemorySliceResult(new ArrayList<>());
memoryResult.setMemorySliceResult(new CopyOnWriteArrayList<>());
}
return cacheFilter(memoryResult);
}
@Coordinated(capability = "memory")
public MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException {
return cacheFilter(memoryCore.selectMemory(date));
}
private MemoryResult cacheFilter(MemoryResult memoryResult) {
//过滤掉与缓存重复的切片
CopyOnWriteArrayList<MemorySliceResult> memorySliceResult = memoryResult.getMemorySliceResult();
List<MemorySlice> relatedMemorySliceResult = memoryResult.getRelatedMemorySliceResult();
cacheCore.getDialogMap().forEach((k, v) -> {
memorySliceResult.removeIf(m -> m.getMemorySlice().getSummary().equals(v));
relatedMemorySliceResult.removeIf(m -> m.getSummary().equals(v));
});
return memoryResult;
}
@Coordinated(capability = "memory")
public void insertSlice(MemorySlice memorySlice, String topicPath) {
sliceInsertLock.lock();
List<String> topicPathList = Arrays.stream(topicPath.split("->")).toList();
try {
//检查是否存在当天对应的memorySlice并确定是否插入
//每日刷新缓存
cacheCore.checkCacheDate();
//如果topicPath在memorySliceCache中存在对应缓存由于进行的插入操作则需要移除该缓存但不清除相关计数
cacheCore.clearCacheByTopicPath(topicPathList);
memoryCore.insertMemory(topicPathList, memorySlice);
if (!memorySlice.isPrivate()) {
cacheCore.updateUserDialogMap(memorySlice);
}
} catch (Exception e) {
log.error("[CoordinatedManager] 插入记忆时出错: ", e);
}
log.debug("[CoordinatedManager] 插入切片: {}, 路径: {}", memorySlice, topicPath);
sliceInsertLock.unlock();
}
private boolean isCacheSingleUser() { private boolean isCacheSingleUser() {
return cacheCore.getUserDialogMap().size() <= 1; return memoryCore.getUserDialogMap().size() <= 1;
} }
@Coordinated(capability = "cognation") @Coordinated(capability = "cognation")

View File

@@ -1,25 +0,0 @@
package work.slhaf.partner.core.cache;
import work.slhaf.partner.api.agent.factory.capability.annotation.Capability;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Capability(value = "cache")
public interface CacheCapability {
HashMap<LocalDateTime, String> getDialogMap();
ConcurrentHashMap<LocalDateTime, String> getUserDialogMap(String userId);
void updateDialogMap(LocalDateTime dateTime, String newDialogCache);
String getDialogMapStr();
String getUserDialogMapStr(String userId);
void updateActivatedSlices(String userId, List<EvaluatedSlice> memorySlices);
String getActivatedSlicesStr(String userId);
HashMap<String, List<EvaluatedSlice>> getActivatedSlices();
void clearActivatedSlices(String userId);
boolean hasActivatedSlices(String userId);
int getActivatedSlicesSize(String userId);
List<EvaluatedSlice> getActivatedSlices(String userId);
}

View File

@@ -1,233 +0,0 @@
package work.slhaf.partner.core.cache;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod;
import work.slhaf.partner.core.PartnerCore;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import work.slhaf.partner.core.memory.pojo.MemoryResult;
import work.slhaf.partner.core.memory.pojo.MemorySlice;
import java.io.IOException;
import java.io.Serial;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@EqualsAndHashCode(callSuper = true)
@Slf4j
@CapabilityCore(value = "cache")
@Getter
@Setter
public class CacheCore extends PartnerCore<CacheCore> {
@Serial
private static final long serialVersionUID = 1L;
/**
* 近两日的对话总结缓存, 用于为大模型提供必要的记忆补充, hashmap以切片的存储时间为键总结为值
* 该部分作为'主LLM'system prompt常驻
* 该部分作为近两日的整体对话缓存, 不区分用户
*/
private HashMap<LocalDateTime, String> dialogMap = new HashMap<>();
/**
* 近两日的区分用户的对话总结缓存在prompt结构上比dialogMap层级深一层, dialogMap更具近两日整体对话的摘要性质
*/
private ConcurrentHashMap<String/*userId*/, ConcurrentHashMap<LocalDateTime, String>> userDialogMap = new ConcurrentHashMap<>();
/**
* memorySliceCache计数器每日清空
*/
private ConcurrentHashMap<List<String> /*触发查询的主题列表*/, Integer> memoryNodeCacheCounter = new ConcurrentHashMap<>();
/**
* 记忆切片缓存,每日清空
* 用于记录作为终点节点调用次数最多的记忆节点的切片数据
*/
private ConcurrentHashMap<List<String> /*主题路径*/, MemoryResult /*切片列表*/> memorySliceCache = new ConcurrentHashMap<>();
/**
* 缓存日期
*/
private LocalDate cacheDate;
/**
* 已被选中的切片时间戳集合,需要及时清理
*/
private Set<Long> selectedSlices = new HashSet<>();
private HashMap<String, List<EvaluatedSlice>> activatedSlices = new HashMap<>();
public CacheCore() throws IOException, ClassNotFoundException {
}
@CapabilityMethod
public void updateDialogMap(LocalDateTime dateTime, String newDialogCache) {
List<LocalDateTime> keysToRemove = new ArrayList<>();
dialogMap.forEach((k, v) -> {
if (dateTime.minusDays(2).isAfter(k)) {
keysToRemove.add(k);
}
});
for (LocalDateTime temp : keysToRemove) {
dialogMap.remove(temp);
}
keysToRemove.clear();
//放入新缓存
dialogMap.put(dateTime, newDialogCache);
}
@CapabilityMethod
public HashMap<LocalDateTime, String> getDialogMap() {
return dialogMap;
}
@CapabilityMethod
public ConcurrentHashMap<LocalDateTime, String> getUserDialogMap(String userId) {
return this.getUserDialogMap().get(userId);
}
@CapabilityMethod
public String getDialogMapStr() {
StringBuilder str = new StringBuilder();
this.getDialogMap().forEach((dateTime, dialog) -> str.append("\n\n").append("[").append(dateTime).append("]\n")
.append(dialog));
return str.toString();
}
@CapabilityMethod
public String getUserDialogMapStr(String userId) {
if (this.getUserDialogMap().containsKey(userId)) {
StringBuilder str = new StringBuilder();
Collection<String> dialogMapValues = this.getDialogMap().values();
this.getUserDialogMap().get(userId).forEach((dateTime, dialog) -> {
if (dialogMapValues.contains(dialog)) {
return;
}
str.append("\n\n").append("[").append(dateTime).append("]\n")
.append(dialog);
});
return str.toString();
} else {
return null;
}
}
public void updateCacheCounter(List<String> topicPath) {
if (memoryNodeCacheCounter.containsKey(topicPath)) {
Integer tempCount = memoryNodeCacheCounter.get(topicPath);
memoryNodeCacheCounter.put(topicPath, ++tempCount);
} else {
memoryNodeCacheCounter.put(topicPath, 1);
}
}
public void checkCacheDate() {
if (cacheDate == null || cacheDate.isBefore(LocalDate.now())) {
memorySliceCache.clear();
memoryNodeCacheCounter.clear();
cacheDate = LocalDate.now();
}
}
public void updateCache(List<String> topicPath, MemoryResult memoryResult) {
Integer tempCount = memoryNodeCacheCounter.get(topicPath);
if (tempCount == null) {
log.warn("[CacheCore] tempCount为null? memoryNodeCacheCounter: {}; topicPath: {}", memoryNodeCacheCounter, topicPath);
return;
}
if (tempCount >= 5) {
memorySliceCache.put(topicPath, memoryResult);
}
}
public void updateUserDialogMap(MemorySlice slice) {
String summary = slice.getSummary();
LocalDateTime now = LocalDateTime.now();
//更新userDialogMap
//移除两天前上下文缓存(切片总结)
List<LocalDateTime> keysToRemove = new ArrayList<>();
userDialogMap.forEach((k, v) -> v.forEach((i, j) -> {
if (now.minusDays(2).isAfter(i)) {
keysToRemove.add(i);
}
}));
for (LocalDateTime dateTime : keysToRemove) {
userDialogMap.forEach((k, v) -> v.remove(dateTime));
}
//放入新缓存
userDialogMap
.computeIfAbsent(slice.getStartUserId(), k -> new ConcurrentHashMap<>())
.merge(now, summary, (oldVal, newVal) -> oldVal + " " + newVal);
}
public void clearCacheByTopicPath(List<String> topicPath) {
memorySliceCache.remove(topicPath);
}
public MemoryResult selectCache(List<String> path) {
if (memorySliceCache.containsKey(path)) {
return memorySliceCache.get(path);
}
return null;
}
@CapabilityMethod
public void updateActivatedSlices(String userId, List<EvaluatedSlice> memorySlices) {
activatedSlices.put(userId, memorySlices);
log.debug("[CoordinatedManager] 已更新激活切片, userId: {}", userId);
}
@CapabilityMethod
public String getActivatedSlicesStr(String userId) {
if (activatedSlices.containsKey(userId)) {
StringBuilder str = new StringBuilder();
activatedSlices.get(userId).forEach(slice -> str.append("\n\n").append("[").append(slice.getDate()).append("]\n")
.append(slice.getSummary()));
return str.toString();
} else {
return null;
}
}
@CapabilityMethod
public HashMap<String, List<EvaluatedSlice>> getActivatedSlices() {
return activatedSlices;
}
@CapabilityMethod
public void clearActivatedSlices(String userId) {
activatedSlices.remove(userId);
}
@CapabilityMethod
public boolean hasActivatedSlices(String userId) {
if (!activatedSlices.containsKey(userId)) {
return false;
}
return !activatedSlices.get(userId).isEmpty();
}
@CapabilityMethod
public int getActivatedSlicesSize(String userId) {
return activatedSlices.get(userId).size();
}
@CapabilityMethod
public List<EvaluatedSlice> getActivatedSlices(String userId) {
return activatedSlices.get(userId);
}
@Override
protected String getCoreKey() {
return "cache-core";
}
}

View File

@@ -83,14 +83,14 @@ public class CognationCore extends PartnerCore<CognationCore> {
@CapabilityMethod @CapabilityMethod
public void addMetaMessage(String userId, MetaMessage metaMessage) { public void addMetaMessage(String userId, MetaMessage metaMessage) {
log.debug("[{}] 当前会话历史: {}", JSONObject.toJSONString(singleMetaMessageMap)); log.debug("[{}] 当前会话历史: {}", getCoreKey(), JSONObject.toJSONString(singleMetaMessageMap));
if (singleMetaMessageMap.containsKey(userId)) { if (singleMetaMessageMap.containsKey(userId)) {
singleMetaMessageMap.get(userId).add(metaMessage); singleMetaMessageMap.get(userId).add(metaMessage);
} else { } else {
singleMetaMessageMap.put(userId, new java.util.ArrayList<>()); singleMetaMessageMap.put(userId, new java.util.ArrayList<>());
singleMetaMessageMap.get(userId).add(metaMessage); singleMetaMessageMap.get(userId).add(metaMessage);
} }
log.debug("[SessionManager] 会话历史更新: {}", JSONObject.toJSONString(singleMetaMessageMap)); log.debug("[{}] 会话历史更新: {}", getCoreKey(), JSONObject.toJSONString(singleMetaMessageMap));
} }
@CapabilityMethod @CapabilityMethod

View File

@@ -1,25 +1,52 @@
package work.slhaf.partner.core.memory; package work.slhaf.partner.core.memory;
import work.slhaf.partner.api.agent.factory.capability.annotation.Capability; import work.slhaf.partner.api.agent.factory.capability.annotation.Capability;
import work.slhaf.partner.api.agent.factory.capability.annotation.ToCoordinated; import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import work.slhaf.partner.core.memory.pojo.MemoryResult; import work.slhaf.partner.core.memory.pojo.MemoryResult;
import work.slhaf.partner.core.memory.pojo.MemorySlice; import work.slhaf.partner.core.memory.pojo.MemorySlice;
import java.io.IOException; import java.io.IOException;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Capability(value = "memory") @Capability(value = "memory")
public interface MemoryCapability { public interface MemoryCapability {
void cleanSelectedSliceFilter(); void cleanSelectedSliceFilter();
String getTopicTree(); String getTopicTree();
@ToCoordinated HashMap<LocalDateTime, String> getDialogMap();
ConcurrentHashMap<LocalDateTime, String> getUserDialogMap(String userId);
void updateDialogMap(LocalDateTime dateTime, String newDialogCache);
String getDialogMapStr();
String getUserDialogMapStr(String userId);
void updateActivatedSlices(String userId, List<EvaluatedSlice> memorySlices);
String getActivatedSlicesStr(String userId);
HashMap<String, List<EvaluatedSlice>> getActivatedSlices();
void clearActivatedSlices(String userId);
boolean hasActivatedSlices(String userId);
int getActivatedSlicesSize(String userId);
List<EvaluatedSlice> getActivatedSlices(String userId);
MemoryResult selectMemory(String topicPathStr); MemoryResult selectMemory(String topicPathStr);
@ToCoordinated
MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException; MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException;
@ToCoordinated
void insertSlice(MemorySlice memorySlice, String topicPath); void insertSlice(MemorySlice memorySlice, String topicPath);
} }

View File

@@ -3,11 +3,13 @@ package work.slhaf.partner.core.memory;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore; import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod; import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod;
import work.slhaf.partner.core.PartnerCore; import work.slhaf.partner.core.PartnerCore;
import work.slhaf.partner.core.memory.exception.UnExistedDateIndexException; import work.slhaf.partner.core.memory.exception.UnExistedDateIndexException;
import work.slhaf.partner.core.memory.exception.UnExistedTopicException; import work.slhaf.partner.core.memory.exception.UnExistedTopicException;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import work.slhaf.partner.core.memory.pojo.MemoryResult; import work.slhaf.partner.core.memory.pojo.MemoryResult;
import work.slhaf.partner.core.memory.pojo.MemorySlice; import work.slhaf.partner.core.memory.pojo.MemorySlice;
import work.slhaf.partner.core.memory.pojo.MemorySliceResult; import work.slhaf.partner.core.memory.pojo.MemorySliceResult;
@@ -17,14 +19,18 @@ import work.slhaf.partner.core.memory.pojo.node.TopicNode;
import java.io.IOException; import java.io.IOException;
import java.io.Serial; import java.io.Serial;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@CapabilityCore(value = "memory") @CapabilityCore(value = "memory")
@Getter @Getter
@Setter @Setter
@Slf4j
public class MemoryCore extends PartnerCore<MemoryCore> { public class MemoryCore extends PartnerCore<MemoryCore> {
@Serial @Serial
@@ -56,12 +62,17 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
*/ */
private Set<Long> selectedSlices = new HashSet<>(); private Set<Long> selectedSlices = new HashSet<>();
private HashMap<String,List<String>> userIndex = new HashMap<>(); private HashMap<String, List<String>> userIndex = new HashMap<>();
private MemoryCache cache = new MemoryCache();
private final Lock sliceInsertLock = new ReentrantLock();
public MemoryCore() throws IOException, ClassNotFoundException { public MemoryCore() throws IOException, ClassNotFoundException {
} }
@CapabilityMethod
public MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException { public MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException {
MemoryResult memoryResult = new MemoryResult(); MemoryResult memoryResult = new MemoryResult();
CopyOnWriteArrayList<MemorySliceResult> targetSliceList = new CopyOnWriteArrayList<>(); CopyOnWriteArrayList<MemorySliceResult> targetSliceList = new CopyOnWriteArrayList<>();
@@ -79,7 +90,175 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
} }
} }
memoryResult.setMemorySliceResult(targetSliceList); memoryResult.setMemorySliceResult(targetSliceList);
return memoryResult; return cacheFilter(memoryResult);
}
@CapabilityMethod
public void insertSlice(MemorySlice memorySlice, String topicPath) {
sliceInsertLock.lock();
List<String> topicPathList = Arrays.stream(topicPath.split("->")).toList();
try {
//检查是否存在当天对应的memorySlice并确定是否插入
//每日刷新缓存
checkCacheDate();
//如果topicPath在memorySliceCache中存在对应缓存由于进行的插入操作则需要移除该缓存但不清除相关计数
clearCacheByTopicPath(topicPathList);
insertMemory(topicPathList, memorySlice);
if (!memorySlice.isPrivate()) {
updateUserDialogMap(memorySlice);
}
} catch (Exception e) {
log.error("[CoordinatedManager] 插入记忆时出错: ", e);
}
log.debug("[CoordinatedManager] 插入切片: {}, 路径: {}", memorySlice, topicPath);
sliceInsertLock.unlock();
}
@CapabilityMethod
public String getTopicTree() {
StringBuilder stringBuilder = new StringBuilder();
for (Map.Entry<String, TopicNode> entry : topicNodes.entrySet()) {
String rootName = entry.getKey();
TopicNode rootNode = entry.getValue();
stringBuilder.append(rootName).append("[root]").append("\r\n");
printSubTopicsTreeFormat(rootNode, "", stringBuilder);
}
return stringBuilder.toString();
}
@CapabilityMethod
public void updateDialogMap(LocalDateTime dateTime, String newDialogCache) {
List<LocalDateTime> keysToRemove = new ArrayList<>();
HashMap<LocalDateTime, String> dialogMap = cache.dialogMap;
dialogMap.forEach((k, v) -> {
if (dateTime.minusDays(2).isAfter(k)) {
keysToRemove.add(k);
}
});
for (LocalDateTime temp : keysToRemove) {
dialogMap.remove(temp);
}
keysToRemove.clear();
//放入新缓存
dialogMap.put(dateTime, newDialogCache);
}
@CapabilityMethod
public HashMap<LocalDateTime, String> getDialogMap() {
return cache.dialogMap;
}
@CapabilityMethod
public ConcurrentHashMap<LocalDateTime, String> getUserDialogMap(String userId) {
return cache.userDialogMap.get(userId);
}
@CapabilityMethod
public String getDialogMapStr() {
StringBuilder str = new StringBuilder();
this.getDialogMap().forEach((dateTime, dialog) -> str.append("\n\n").append("[").append(dateTime).append("]\n")
.append(dialog));
return str.toString();
}
@CapabilityMethod
public String getUserDialogMapStr(String userId) {
ConcurrentHashMap<String, ConcurrentHashMap<LocalDateTime, String>> userDialogMap = cache.userDialogMap;
if (userDialogMap.containsKey(userId)) {
StringBuilder str = new StringBuilder();
Collection<String> dialogMapValues = this.getDialogMap().values();
userDialogMap.get(userId).forEach((dateTime, dialog) -> {
if (dialogMapValues.contains(dialog)) {
return;
}
str.append("\n\n").append("[").append(dateTime).append("]\n")
.append(dialog);
});
return str.toString();
} else {
return null;
}
}
@CapabilityMethod
public MemoryResult selectMemory(String topicPathStr) {
MemoryResult memoryResult;
List<String> topicPath = List.of(topicPathStr.split("->"));
try {
List<String> path = new ArrayList<>(topicPath);
//每日刷新缓存
checkCacheDate();
//检测缓存并更新计数, 查看是否需要放入缓存
updateCacheCounter(path);
//查看是否存在缓存,如果存在,则直接返回
if ((memoryResult = selectCache(path)) != null) {
return memoryResult;
}
memoryResult = selectMemory(path);
//尝试更新缓存
updateCache(topicPath, memoryResult);
} catch (Exception e) {
log.error("[CoordinatedManager] selectMemory error: ", e);
log.error("[CoordinatedManager] 路径: {}", topicPathStr);
log.error("[CoordinatedManager] 主题树: {}", getTopicTree());
memoryResult = new MemoryResult();
memoryResult.setRelatedMemorySliceResult(new ArrayList<>());
memoryResult.setMemorySliceResult(new CopyOnWriteArrayList<>());
}
return cacheFilter(memoryResult);
}
@CapabilityMethod
public void updateActivatedSlices(String userId, List<EvaluatedSlice> memorySlices) {
cache.activatedSlices.put(userId, memorySlices);
log.debug("[CoordinatedManager] 已更新激活切片, userId: {}", userId);
}
@CapabilityMethod
public String getActivatedSlicesStr(String userId) {
HashMap<String, List<EvaluatedSlice>> activatedSlices = cache.activatedSlices;
if (activatedSlices.containsKey(userId)) {
StringBuilder str = new StringBuilder();
activatedSlices.get(userId).forEach(slice -> str.append("\n\n").append("[").append(slice.getDate()).append("]\n")
.append(slice.getSummary()));
return str.toString();
} else {
return null;
}
}
@CapabilityMethod
public HashMap<String, List<EvaluatedSlice>> getActivatedSlices() {
return cache.activatedSlices;
}
@CapabilityMethod
public void clearActivatedSlices(String userId) {
cache.activatedSlices.remove(userId);
}
@CapabilityMethod
public boolean hasActivatedSlices(String userId) {
HashMap<String, List<EvaluatedSlice>> activatedSlices = cache.activatedSlices;
if (!activatedSlices.containsKey(userId)) {
return false;
}
return !activatedSlices.get(userId).isEmpty();
}
@CapabilityMethod
public int getActivatedSlicesSize(String userId) {
return cache.activatedSlices.get(userId).size();
}
@CapabilityMethod
public List<EvaluatedSlice> getActivatedSlices(String userId) {
return cache.activatedSlices.get(userId);
}
@CapabilityMethod
public void cleanSelectedSliceFilter() {
this.selectedSlices.clear();
} }
private List<List<MemorySlice>> loadSlicesByDate(LocalDate date) throws IOException, ClassNotFoundException { private List<List<MemorySlice>> loadSlicesByDate(LocalDate date) throws IOException, ClassNotFoundException {
@@ -95,18 +274,6 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
return list; return list;
} }
@CapabilityMethod
public String getTopicTree() {
StringBuilder stringBuilder = new StringBuilder();
for (Map.Entry<String, TopicNode> entry : topicNodes.entrySet()) {
String rootName = entry.getKey();
TopicNode rootNode = entry.getValue();
stringBuilder.append(rootName).append("[root]").append("\r\n");
printSubTopicsTreeFormat(rootNode, "", stringBuilder);
}
return stringBuilder.toString();
}
private void printSubTopicsTreeFormat(TopicNode node, String prefix, StringBuilder stringBuilder) { private void printSubTopicsTreeFormat(TopicNode node, String prefix, StringBuilder stringBuilder) {
if (node.getTopicNodes() == null || node.getTopicNodes().isEmpty()) return; if (node.getTopicNodes() == null || node.getTopicNodes().isEmpty()) return;
@@ -119,7 +286,7 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
} }
} }
public void insertMemory(List<String> topicPath, MemorySlice slice) throws IOException, ClassNotFoundException { private void insertMemory(List<String> topicPath, MemorySlice slice) throws IOException, ClassNotFoundException {
LocalDate now = LocalDate.now(); LocalDate now = LocalDate.now();
boolean hasSlice = false; boolean hasSlice = false;
MemoryNode node = null; MemoryNode node = null;
@@ -300,8 +467,6 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
} }
} }
private TopicNode getTargetParentNode(List<String> topicPath, String targetTopic) { private TopicNode getTargetParentNode(List<String> topicPath, String targetTopic) {
String topTopic = topicPath.getFirst(); String topTopic = topicPath.getFirst();
if (!existedTopics.containsKey(topTopic)) { if (!existedTopics.containsKey(topTopic)) {
@@ -323,13 +488,126 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
return targetParentNode; return targetParentNode;
} }
@CapabilityMethod public void updateCacheCounter(List<String> topicPath) {
public void cleanSelectedSliceFilter() { ConcurrentHashMap<List<String>, Integer> memoryNodeCacheCounter = cache.memoryNodeCacheCounter;
this.selectedSlices.clear(); if (memoryNodeCacheCounter.containsKey(topicPath)) {
Integer tempCount = memoryNodeCacheCounter.get(topicPath);
memoryNodeCacheCounter.put(topicPath, ++tempCount);
} else {
memoryNodeCacheCounter.put(topicPath, 1);
}
}
private void checkCacheDate() {
if (cache.cacheDate == null || cache.cacheDate.isBefore(LocalDate.now())) {
cache.memorySliceCache.clear();
cache.memoryNodeCacheCounter.clear();
cache.cacheDate = LocalDate.now();
}
}
private void updateCache(List<String> topicPath, MemoryResult memoryResult) {
ConcurrentHashMap<List<String>, Integer> memoryNodeCacheCounter = cache.memoryNodeCacheCounter;
Integer tempCount = memoryNodeCacheCounter.get(topicPath);
if (tempCount == null) {
log.warn("[CacheCore] tempCount为null? memoryNodeCacheCounter: {}; topicPath: {}", memoryNodeCacheCounter, topicPath);
return;
}
if (tempCount >= 5) {
cache.memorySliceCache.put(topicPath, memoryResult);
}
}
private void updateUserDialogMap(MemorySlice slice) {
String summary = slice.getSummary();
LocalDateTime now = LocalDateTime.now();
ConcurrentHashMap<String, ConcurrentHashMap<LocalDateTime, String>> userDialogMap = cache.userDialogMap;
//更新userDialogMap
//移除两天前上下文缓存(切片总结)
List<LocalDateTime> keysToRemove = new ArrayList<>();
userDialogMap.forEach((k, v) -> v.forEach((i, j) -> {
if (now.minusDays(2).isAfter(i)) {
keysToRemove.add(i);
}
}));
for (LocalDateTime dateTime : keysToRemove) {
userDialogMap.forEach((k, v) -> v.remove(dateTime));
}
//放入新缓存
userDialogMap
.computeIfAbsent(slice.getStartUserId(), k -> new ConcurrentHashMap<>())
.merge(now, summary, (oldVal, newVal) -> oldVal + " " + newVal);
}
private void clearCacheByTopicPath(List<String> topicPath) {
cache.memorySliceCache.remove(topicPath);
}
private MemoryResult selectCache(List<String> path) {
ConcurrentHashMap<List<String>, MemoryResult> memorySliceCache = cache.memorySliceCache;
if (memorySliceCache.containsKey(path)) {
return memorySliceCache.get(path);
}
return null;
} }
@Override @Override
protected String getCoreKey() { protected String getCoreKey() {
return "memory-core"; return "memory-core";
} }
public ConcurrentHashMap<String, ConcurrentHashMap<LocalDateTime, String>> getUserDialogMap() {
return cache.userDialogMap;
}
private MemoryResult cacheFilter(MemoryResult memoryResult) {
//过滤掉与缓存重复的切片
CopyOnWriteArrayList<MemorySliceResult> memorySliceResult = memoryResult.getMemorySliceResult();
List<MemorySlice> relatedMemorySliceResult = memoryResult.getRelatedMemorySliceResult();
cache.dialogMap.forEach((k, v) -> {
memorySliceResult.removeIf(m -> m.getMemorySlice().getSummary().equals(v));
relatedMemorySliceResult.removeIf(m -> m.getSummary().equals(v));
});
return memoryResult;
}
@SuppressWarnings("FieldMayBeFinal")
private static class MemoryCache {
/**
* 近两日的对话总结缓存, 用于为大模型提供必要的记忆补充, hashmap以切片的存储时间为键总结为值
* 该部分作为'主LLM'system prompt常驻
* 该部分作为近两日的整体对话缓存, 不区分用户
*/
private HashMap<LocalDateTime, String> dialogMap = new HashMap<>();
/**
* 近两日的区分用户的对话总结缓存在prompt结构上比dialogMap层级深一层, dialogMap更具近两日整体对话的摘要性质
*/
private ConcurrentHashMap<String/*userId*/, ConcurrentHashMap<LocalDateTime, String>> userDialogMap = new ConcurrentHashMap<>();
/**
* memorySliceCache计数器每日清空
*/
private ConcurrentHashMap<List<String> /*触发查询的主题列表*/, Integer> memoryNodeCacheCounter = new ConcurrentHashMap<>();
/**
* 记忆切片缓存,每日清空
* 用于记录作为终点节点调用次数最多的记忆节点的切片数据
*/
private ConcurrentHashMap<List<String> /*主题路径*/, MemoryResult /*切片列表*/> memorySliceCache = new ConcurrentHashMap<>();
/**
* 缓存日期
*/
private LocalDate cacheDate;
private HashMap<String, List<EvaluatedSlice>> activatedSlices = new HashMap<>();
private MemoryCache() {
}
}
} }

View File

@@ -7,7 +7,6 @@ import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability; import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule; import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule; import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.core.cache.CacheCapability;
import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.core.memory.exception.UnExistedDateIndexException; import work.slhaf.partner.core.memory.exception.UnExistedDateIndexException;
@@ -36,8 +35,6 @@ import java.util.List;
@AgentModule(name="memory_selector",order=2) @AgentModule(name="memory_selector",order=2)
public class MemorySelector extends PreRunningModule { public class MemorySelector extends PreRunningModule {
@InjectCapability
private CacheCapability cacheCapability;
@InjectCapability @InjectCapability
private MemoryCapability memoryCapability; private MemoryCapability memoryCapability;
@InjectCapability @InjectCapability
@@ -54,9 +51,9 @@ public class MemorySelector extends PreRunningModule {
//获取主题路径 //获取主题路径
ExtractorResult extractorResult = memorySelectExtractor.execute(runningFlowContext); ExtractorResult extractorResult = memorySelectExtractor.execute(runningFlowContext);
if (extractorResult.isRecall() || !extractorResult.getMatches().isEmpty()) { if (extractorResult.isRecall() || !extractorResult.getMatches().isEmpty()) {
cacheCapability.clearActivatedSlices(userId); memoryCapability.clearActivatedSlices(userId);
List<EvaluatedSlice> evaluatedSlices = selectAndEvaluateMemory(runningFlowContext, extractorResult); List<EvaluatedSlice> evaluatedSlices = selectAndEvaluateMemory(runningFlowContext, extractorResult);
cacheCapability.updateActivatedSlices(userId, evaluatedSlices); memoryCapability.updateActivatedSlices(userId, evaluatedSlices);
} }
setModuleContextRecall(runningFlowContext); setModuleContextRecall(runningFlowContext);
} }
@@ -81,10 +78,10 @@ public class MemorySelector extends PreRunningModule {
private void setModuleContextRecall(PartnerRunningFlowContext runningFlowContext) { private void setModuleContextRecall(PartnerRunningFlowContext runningFlowContext) {
String userId = runningFlowContext.getUserId(); String userId = runningFlowContext.getUserId();
boolean recall = cacheCapability.hasActivatedSlices(userId); boolean recall = memoryCapability.hasActivatedSlices(userId);
runningFlowContext.getModuleContext().getExtraContext().put("recall", recall); runningFlowContext.getModuleContext().getExtraContext().put("recall", recall);
if (recall) { if (recall) {
runningFlowContext.getModuleContext().getExtraContext().put("recall_count", cacheCapability.getActivatedSlicesSize(userId)); runningFlowContext.getModuleContext().getExtraContext().put("recall_count", memoryCapability.getActivatedSlicesSize(userId));
} }
} }
@@ -119,7 +116,7 @@ public class MemorySelector extends PreRunningModule {
} }
private void removeDuplicateSlice(MemoryResult memoryResult) { private void removeDuplicateSlice(MemoryResult memoryResult) {
Collection<String> values = cacheCapability.getDialogMap().values(); Collection<String> values = memoryCapability.getDialogMap().values();
memoryResult.getRelatedMemorySliceResult().removeIf(m -> values.contains(m.getSummary())); memoryResult.getRelatedMemorySliceResult().removeIf(m -> values.contains(m.getSummary()));
memoryResult.getMemorySliceResult().removeIf(m -> values.contains(m.getMemorySlice().getSummary())); memoryResult.getMemorySliceResult().removeIf(m -> values.contains(m.getMemorySlice().getSummary()));
} }
@@ -138,17 +135,17 @@ public class MemorySelector extends PreRunningModule {
protected HashMap<String, String> getPromptDataMap(String userId) { protected HashMap<String, String> getPromptDataMap(String userId) {
HashMap<String, String> map = new HashMap<>(); HashMap<String, String> map = new HashMap<>();
String dialogMapStr = cacheCapability.getDialogMapStr(); String dialogMapStr = memoryCapability.getDialogMapStr();
if (!dialogMapStr.isEmpty()) { if (!dialogMapStr.isEmpty()) {
map.put("[记忆缓存] <你最近两日和所有聊天者的对话记忆印象>", dialogMapStr); map.put("[记忆缓存] <你最近两日和所有聊天者的对话记忆印象>", dialogMapStr);
} }
String userDialogMapStr = cacheCapability.getUserDialogMapStr(userId); String userDialogMapStr = memoryCapability.getUserDialogMapStr(userId);
if (userDialogMapStr != null && !userDialogMapStr.isEmpty() && !cognationCapability.isSingleUser()) { if (userDialogMapStr != null && !userDialogMapStr.isEmpty() && !cognationCapability.isSingleUser()) {
map.put("[用户记忆缓存] <与最新一条消息的发送者的近两天对话记忆印象, 可能与[记忆缓存]稍有重复>", userDialogMapStr); map.put("[用户记忆缓存] <与最新一条消息的发送者的近两天对话记忆印象, 可能与[记忆缓存]稍有重复>", userDialogMapStr);
} }
String sliceStr = cacheCapability.getActivatedSlicesStr(userId); String sliceStr = memoryCapability.getActivatedSlicesStr(userId);
if (sliceStr != null && !sliceStr.isEmpty()) { if (sliceStr != null && !sliceStr.isEmpty()) {
map.put("[记忆切片] <你与最新一条消息的发送者的相关回忆, 不会与[记忆缓存]重复, 如果有重复你也可以指出来()>", sliceStr); map.put("[记忆切片] <你与最新一条消息的发送者的相关回忆, 不会与[记忆缓存]重复, 如果有重复你也可以指出来()>", sliceStr);
} }

View File

@@ -11,7 +11,6 @@ import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateM
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.api.chat.pojo.MetaMessage; import work.slhaf.partner.api.chat.pojo.MetaMessage;
import work.slhaf.partner.core.cache.CacheCapability;
import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice; import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
@@ -37,8 +36,6 @@ public class MemorySelectExtractor extends AgentRunningSubModule<PartnerRunningF
private MemoryCapability memoryCapability; private MemoryCapability memoryCapability;
@InjectCapability @InjectCapability
private CognationCapability cognationCapability; private CognationCapability cognationCapability;
@InjectCapability
private CacheCapability cacheCapability;
@Override @Override
@@ -58,7 +55,7 @@ public class MemorySelectExtractor extends AgentRunningSubModule<PartnerRunningF
ExtractorResult extractorResult; ExtractorResult extractorResult;
try { try {
List<EvaluatedSlice> activatedMemorySlices = cacheCapability.getActivatedSlices(context.getUserId()); List<EvaluatedSlice> activatedMemorySlices = memoryCapability.getActivatedSlices(context.getUserId());
ExtractorInput extractorInput = ExtractorInput.builder() ExtractorInput extractorInput = ExtractorInput.builder()
.text(context.getInput()) .text(context.getInput())
.date(context.getDateTime().toLocalDate()) .date(context.getDateTime().toLocalDate())

View File

@@ -11,7 +11,6 @@ import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.chat.constant.ChatConstant; import work.slhaf.partner.api.chat.constant.ChatConstant;
import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor;
import work.slhaf.partner.core.cache.CacheCapability;
import work.slhaf.partner.core.cognation.CognationCapability; import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability; import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.core.memory.pojo.MemorySlice; import work.slhaf.partner.core.memory.pojo.MemorySlice;
@@ -45,8 +44,6 @@ public class MemoryUpdater extends PostRunningModule {
@InjectCapability @InjectCapability
private MemoryCapability memoryCapability; private MemoryCapability memoryCapability;
@InjectCapability @InjectCapability
private CacheCapability cacheCapability;
@InjectCapability
private PerceiveCapability perceiveCapability; private PerceiveCapability perceiveCapability;
@InjectModule @InjectModule
@@ -160,11 +157,11 @@ public class MemoryUpdater extends PostRunningModule {
setInvolvedUserId(userId, memorySlice, chatMessages); setInvolvedUserId(userId, memorySlice, chatMessages);
memoryCapability.insertSlice(memorySlice, summarizeResult.getTopicPath()); memoryCapability.insertSlice(memorySlice, summarizeResult.getTopicPath());
cacheCapability.updateDialogMap(LocalDateTime.now(), summarizeResult.getSummary()); memoryCapability.updateDialogMap(LocalDateTime.now(), summarizeResult.getSummary());
} else { } else {
log.debug("[MemoryUpdater] 不存在多人聊天记录, 将以单聊总结为对话缓存的主要输入: {}", singleMemorySummary); log.debug("[MemoryUpdater] 不存在多人聊天记录, 将以单聊总结为对话缓存的主要输入: {}", singleMemorySummary);
cacheCapability.updateDialogMap(LocalDateTime.now(), totalSummarizer.execute(singleMemorySummary)); memoryCapability.updateDialogMap(LocalDateTime.now(), totalSummarizer.execute(singleMemorySummary));
} }
log.debug("[MemoryUpdater] 对话缓存更新完毕"); log.debug("[MemoryUpdater] 对话缓存更新完毕");
log.debug("[MemoryUpdater] 多人聊天记忆更新流程结束..."); log.debug("[MemoryUpdater] 多人聊天记忆更新流程结束...");