推进核心服务注册机制,并调整了Partner的模块结构

- 为了方便调试,将项目分为两个子模块,demo模块中进行新机制的开发工作,core模块为原来的Partner项目;
- 新增了多个注解,用于适配新的核心服务注册机制;
- 在`CapabilityRegisterFactory`中,将首先启动`statusCheck`,检查各个注解是否正常工作,包括以下内容:
   - `CapabilityCore`核心服务与`Capability`接口是否匹配
   - 核心服务中的`CapabilityMethod`是否与`Capability`接口中的方法匹配
   - 是否存在待协调方法`ToCoordinatedMethod`以及对应的存在于`BaseCognationManager`子类实现中
This commit is contained in:
2025-07-15 16:48:27 +08:00
parent 98d830d08b
commit dd10b00fb6
148 changed files with 1082 additions and 500 deletions

View File

@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>work.slhaf</groupId>
<artifactId>Partner</artifactId>
<version>0.5.0</version>
</parent>
<artifactId>Partner-Capability-Demo</artifactId>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@@ -0,0 +1,15 @@
package work.slhaf.demo;
import work.slhaf.demo.capability.BaseCognationManager;
import work.slhaf.demo.capability.interfaces.Coordinated;
import java.util.ArrayList;
import java.util.List;
public class MyCognationManager extends BaseCognationManager {
@Coordinated(capability = "memory")
public List<String> selectMemory(String path){
return null;
}
}

View File

@@ -0,0 +1,4 @@
package work.slhaf.demo.capability;
public class BaseCognationManager {
}

View File

@@ -0,0 +1,232 @@
package work.slhaf.demo.capability;
import lombok.Setter;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import work.slhaf.demo.capability.exception.*;
import work.slhaf.demo.capability.interfaces.*;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;
public class CapabilityRegisterFactory {
public static volatile CapabilityRegisterFactory capabilityRegisterFactory;
@Setter
private Reflections reflections;
private CapabilityRegisterFactory() {
}
public static CapabilityRegisterFactory getInstance() {
if (capabilityRegisterFactory == null) {
synchronized (CapabilityRegisterFactory.class) {
if (capabilityRegisterFactory == null) {
capabilityRegisterFactory = new CapabilityRegisterFactory();
capabilityRegisterFactory.setReflections(getReflections());
}
}
}
return capabilityRegisterFactory;
}
private static Reflections getReflections() {
//后续可替换为根据传入的启动类获取路径
Collection<URL> urls = ClasspathHelper.forJavaClassPath();
return new Reflections(
new ConfigurationBuilder()
.setUrls(urls)
.setScanners(Scanners.TypesAnnotated, Scanners.MethodsAnnotated, Scanners.SubTypes)
);
}
public void registerCapabilities() {
//检查可注册能力是否正常
statusCheck();
//扫描现有Capability, value为键返回函数路由表, 函数路由表内部通过反射调用对应core的方法
//扫描时也需要排除掉
}
private void statusCheck() {
Set<Class<?>> cores = reflections.getTypesAnnotatedWith(CapabilityCore.class);
Set<Class<?>> capabilities = reflections.getTypesAnnotatedWith(Capability.class);
checkCountAndCapabilities(cores, capabilities);
checkCapabilityMethods(cores, capabilities);
checkCoordinatedMethods(capabilities);
}
private void checkCoordinatedMethods(Set<Class<?>> capabilities) {
//检查各个capability中是否含有ToCoordinated注解
//如果含有则需要查找AbstractCognationManager的子类,看这里是否有对应的Coordinated注解所在方法
Set<String> methodsToCoordinated = capabilities.stream()
.flatMap(capability -> Arrays.stream(capability.getDeclaredMethods()))
.filter(method -> method.isAnnotationPresent(ToCoordinated.class))
.map(method -> {
String capabilityValue = method.getDeclaringClass().getAnnotation(Capability.class).value();
return capabilityValue + "." + methodSignature(method);
})
.collect(Collectors.toSet());
if (!methodsToCoordinated.isEmpty()) {
Set<Class<? extends BaseCognationManager>> subTypesOfAbsCM = reflections.getSubTypesOf(BaseCognationManager.class);
Set<String> methodsCoordinated = getMethodsCoordinated(subTypesOfAbsCM);
if (!methodsCoordinated.equals(methodsToCoordinated)) {
// 找出缺少的协调方法
Set<String> missingMethods = new HashSet<>(methodsToCoordinated);
missingMethods.removeAll(methodsCoordinated);
// 找出多余的协调方法
Set<String> extraMethods = new HashSet<>(methodsCoordinated);
extraMethods.removeAll(methodsToCoordinated);
// 抛出异常或记录错误
if (!missingMethods.isEmpty()) {
throw new UnMatchedCoordinatedMethodException("缺少协调方法: " + String.join(", ", missingMethods));
}
if (!extraMethods.isEmpty()) {
throw new UnMatchedCoordinatedMethodException("发现多余的协调方法: " + String.join(", ", extraMethods));
}
}
}
}
private Set<String> getMethodsCoordinated(Set<Class<? extends BaseCognationManager>> subTypesOfAbsCM) {
Set<String> methodsCoordinated = new HashSet<>();
for (Class<? extends BaseCognationManager> cm : subTypesOfAbsCM) {
Method[] methods = cm.getMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(Coordinated.class)) {
methodsCoordinated.add(method.getAnnotation(Coordinated.class).capability() + "." + methodSignature(method));
}
}
}
return methodsCoordinated;
}
private void checkCapabilityMethods(Set<Class<?>> cores, Set<Class<?>> capabilities) {
HashMap<String, List<Method>> capabilitiesMethods = getCapabilityMethods(capabilities);
StringBuilder sb = new StringBuilder();
for (Class<?> core : cores) {
List<Method> methodsWithAnnotation = Arrays.stream(core.getMethods())
.filter(method -> method.isAnnotationPresent(CapabilityMethod.class))
.toList();
List<Method> capabilityMethods = capabilitiesMethods.get(core.getAnnotation(CapabilityCore.class).value());
LackRecord lackRecord = checkMethodsMatched(methodsWithAnnotation, capabilityMethods);
if (lackRecord.hasNotEmptyRecord()) {
sb.append(lackRecord.toLackErrorMsg(core.getAnnotation(CapabilityCore.class).value()));
}
}
if (!sb.isEmpty()) {
throw new UnMatchedCapabilityMethodException(sb.toString());
}
}
private LackRecord checkMethodsMatched(List<Method> methodsWithAnnotation, List<Method> capabilityMethods) {
Set<String> collectedMethodsWithAnnotation = methodsWithAnnotation.stream()
.filter(method -> !method.isAnnotationPresent(ToCoordinated.class))
.map(this::methodSignature)
.collect(Collectors.toSet());
Set<String> collectedCapabilityMethods = capabilityMethods.stream()
.filter(method -> !method.isAnnotationPresent(ToCoordinated.class))
.map(this::methodSignature)
.collect(Collectors.toSet());
return checkMethodsMatched(collectedMethodsWithAnnotation, collectedCapabilityMethods);
}
private LackRecord checkMethodsMatched(Set<String> collectedMethodsWithAnnotation, Set<String> collectedCapabilityMethods) {
List<String> coreLack = new ArrayList<>();
List<String> capLack = new ArrayList<>();
// 找出 core 中多余的方法
for (String coreSig : collectedMethodsWithAnnotation) {
if (!collectedCapabilityMethods.contains(coreSig)) {
capLack.add(coreSig);
}
}
// 找出 capability 中多余的方法
for (String capSig : collectedCapabilityMethods) {
if (!collectedMethodsWithAnnotation.contains(capSig)) {
coreLack.add(capSig);
}
}
return new LackRecord(coreLack, capLack);
}
private String methodSignature(Method method) {
StringBuilder sb = new StringBuilder();
sb.append("(");
sb.append(method.getReturnType().getName()).append(" ");
sb.append(method.getName()).append("(");
Class<?>[] paramTypes = method.getParameterTypes();
for (int i = 0; i < paramTypes.length; i++) {
sb.append(paramTypes[i].getName());
if (i < paramTypes.length - 1) sb.append(",");
}
sb.append(")").append(")");
return sb.toString();
}
private HashMap<String, List<Method>> getCapabilityMethods(Set<Class<?>> capabilities) {
HashMap<String, List<Method>> capabilityMethods = new HashMap<>();
capabilities.forEach(capability -> {
capabilityMethods.put(capability.getAnnotation(Capability.class).value(), Arrays.stream(capability.getMethods()).toList());
});
return capabilityMethods;
}
private void checkCountAndCapabilities(Set<Class<?>> cores, Set<Class<?>> capabilities) {
if (cores.size() != capabilities.size()) {
throw new UnMatchedCapabilityException("Capability 注册异常: 已存在的CapabilityCore与Capability数量不匹配!");
}
if (!checkValuesMatched(cores, capabilities)) {
throw new UnMatchedCapabilityException("Capability 注册异常: 已存在的CapabilityCore与Capability不匹配!");
}
}
private boolean checkValuesMatched(Set<Class<?>> cores, Set<Class<?>> capabilities) {
Set<String> coresValues = new HashSet<>();
Set<String> capabilitiesValues = new HashSet<>();
for (Class<?> core : cores) {
CapabilityCore annotation = core.getAnnotation(CapabilityCore.class);
if (annotation != null) {
if (coresValues.contains(annotation.value())) {
throw new DuplicateCapabilityException(String.format("Capability 注册异常: 重复的Capability核心: %s", annotation.value()));
}
coresValues.add(annotation.value());
}
}
for (Class<?> capability : capabilities) {
Capability annotation = capability.getAnnotation(Capability.class);
if (annotation != null) {
if (capabilitiesValues.contains(annotation.value())) {
throw new DuplicateCapabilityException(String.format("Capability 注册异常: 重复的Capability接口: %s", annotation.value()));
}
capabilitiesValues.add(annotation.value());
}
}
return coresValues.equals(capabilitiesValues);
}
record LackRecord(List<String> coreLack, List<String> capLack) {
public boolean hasNotEmptyRecord() {
return !coreLack.isEmpty() || !capLack.isEmpty();
}
public String toLackErrorMsg(String capabilityName) {
StringBuilder sb = new StringBuilder("\n").append(capabilityName).append("\n");
if (!coreLack.isEmpty()) {
sb.append("缺少Core方法:").append("\n").append(coreLack).append("\n");
}
if (!capLack.isEmpty()) {
sb.append("缺少Capability方法:").append("\n").append(capLack).append("\n");
}
return sb.toString();
}
}
}

View File

@@ -0,0 +1,17 @@
package work.slhaf.demo.capability.ability;
import work.slhaf.demo.capability.interfaces.Capability;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@Capability(value = "cache")
public interface CacheCapability {
HashMap<LocalDateTime, String> getDialogMap();
ConcurrentHashMap<LocalDateTime, String> getUserDialogMap(String userId);
void updateDialogMap(LocalDateTime dateTime, String newDialogCache);
String getDialogMapStr();
String getUserDialogMapStr(String userId);
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.demo.capability.ability;
import work.slhaf.demo.capability.interfaces.Capability;
import work.slhaf.demo.capability.interfaces.ToCoordinated;
import java.util.List;
@Capability(value = "memory")
public interface MemoryCapability {
void cleanSelectedSliceFilter();
String getTopicTree();
List<String> listMemory(String userId);
@ToCoordinated
List<String> selectMemory(String path);
}

View File

@@ -0,0 +1,10 @@
package work.slhaf.demo.capability.ability;
import work.slhaf.demo.capability.interfaces.Capability;
@Capability(value = "perceive")
public interface PerceiveCapability {
String getUser(String id);
String addUser(String userInfo, String platform, String userNickName);
void updateUser(String user);
}

View File

@@ -0,0 +1,7 @@
package work.slhaf.demo.capability.exception;
public class CapabilityCheckFailedException extends RuntimeException {
public CapabilityCheckFailedException(String message) {
super("Capability注册失败: "+message);
}
}

View File

@@ -0,0 +1,7 @@
package work.slhaf.demo.capability.exception;
public class DuplicateCapabilityException extends CapabilityCheckFailedException{
public DuplicateCapabilityException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,7 @@
package work.slhaf.demo.capability.exception;
public class UnMatchedCapabilityException extends CapabilityCheckFailedException{
public UnMatchedCapabilityException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,7 @@
package work.slhaf.demo.capability.exception;
public class UnMatchedCapabilityMethodException extends CapabilityCheckFailedException{
public UnMatchedCapabilityMethodException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,7 @@
package work.slhaf.demo.capability.exception;
public class UnMatchedCoordinatedMethodException extends CapabilityCheckFailedException{
public UnMatchedCoordinatedMethodException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.demo.capability.interfaces;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 用于注解能力接口,需要与`@CapabilityCore`对应的`value`一致
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Capability {
String value();
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.demo.capability.interfaces;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 用于注解Core服务需标识一个value致用于核心服务发现
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface CapabilityCore {
String value();
}

View File

@@ -0,0 +1,11 @@
package work.slhaf.demo.capability.interfaces;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CapabilityMethod {
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.demo.capability.interfaces;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 用于标注协调方法,`value`值需与对应的`@ToCoordinated`保持一致
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Coordinated {
String capability();
}

View File

@@ -0,0 +1,14 @@
package work.slhaf.demo.capability.interfaces;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 用于注入`Capability`
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface InjectCapability {
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.demo.capability.interfaces;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 当`@Capability`所注接口中如果存在方法需要协调多个Core服务的调用可以通过该注解进行排除
* value值为方法对应标识需与协调实现处的方法标识保持一致
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ToCoordinated {
}

View File

@@ -0,0 +1,57 @@
package work.slhaf.demo.core;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.demo.capability.interfaces.CapabilityCore;
import work.slhaf.demo.capability.interfaces.CapabilityMethod;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
@CapabilityCore(value = "cache")
@Slf4j
public class CacheCore {
public static volatile CacheCore cacheCore;
public static CacheCore getInstance() {
if (cacheCore == null) {
synchronized (CacheCore.class) {
if (cacheCore == null) {
cacheCore = new CacheCore();
}
}
}
return cacheCore;
}
@CapabilityMethod
public HashMap<LocalDateTime, String> getDialogMap() {
log.info("cache: getDialogMap");
return new HashMap<>();
}
@CapabilityMethod
public ConcurrentHashMap<LocalDateTime, String> getUserDialogMap(String userId) {
log.info("cache: getUserDialogMap");
return new ConcurrentHashMap<>();
}
@CapabilityMethod
public void updateDialogMap(LocalDateTime dateTime, String newDialogCache) {
log.info("cache: updateDialogMap");
}
@CapabilityMethod
public String getDialogMapStr() {
log.info("cache: getDialogMapStr");
return "";
}
@CapabilityMethod
public String getUserDialogMapStr(String userId) {
log.info("cache: getUserDialogMapStr");
return "";
}
}

View File

@@ -0,0 +1,43 @@
package work.slhaf.demo.core;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.demo.capability.interfaces.CapabilityCore;
import work.slhaf.demo.capability.interfaces.CapabilityMethod;
import java.util.ArrayList;
import java.util.List;
@CapabilityCore(value = "memory")
@Slf4j
public class MemoryCore {
public static volatile MemoryCore memoryCore;
public static MemoryCore getInstance() {
if (memoryCore == null){
synchronized (MemoryCore.class){
if (memoryCore == null){
memoryCore = new MemoryCore();
}
}
}
return memoryCore;
}
@CapabilityMethod
public void cleanSelectedSliceFilter(){
log.info("memory: cleanSelectedSliceFilter");
}
@CapabilityMethod
public String getTopicTree(){
log.info("memory: getTopicTree");
return "";
}
@CapabilityMethod
public List<String> listMemory(String userId){
log.info("memory: listMemory");
return new ArrayList<>();
}
}

View File

@@ -0,0 +1,41 @@
package work.slhaf.demo.core;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.demo.capability.interfaces.CapabilityCore;
import work.slhaf.demo.capability.interfaces.CapabilityMethod;
@CapabilityCore(value = "perceive")
@Slf4j
public class PerceiveCore {
public static volatile PerceiveCore perceiveCore;
public static PerceiveCore getInstance() {
if (perceiveCore == null){
synchronized (PerceiveCore.class){
if (perceiveCore == null){
perceiveCore = new PerceiveCore();
}
}
}
return perceiveCore;
}
@CapabilityMethod
public String getUser(String id){
log.info("perceive: getUser");
return "";
}
@CapabilityMethod
public String addUser(String userInfo, String platform, String userNickName){
log.info("perceive: addUser");
return "";
}
@CapabilityMethod
public void updateUser(String user){
log.info("perceive: updateUser");
}
}