mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 08:43:02 +08:00
推进 ActionExecutor、针对action core做出了一些调整
- 将 ActionWatchService 抽取为独立的类,使用构造参数传递所需内容 - ActionCore 中除了pendingAction外,将只维护一个行动池,通过用户键和STATUS区分类型 - 开始推进 ActionExecutor,但其中的同组并发、动态行动链、行动间参数对齐、参数重构等内容需要仔细考虑
This commit is contained in:
@@ -5,13 +5,13 @@ import work.slhaf.partner.core.action.entity.ActionData;
|
|||||||
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
|
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
@Capability(value = "action")
|
@Capability(value = "action")
|
||||||
public interface ActionCapability {
|
public interface ActionCapability {
|
||||||
void putPreparedAction(String uuid, ActionData actionData);
|
void putPreparedAction(String uuid, ActionData actionData);
|
||||||
|
|
||||||
List<ActionData> popPreparedAction(String userId);
|
|
||||||
|
|
||||||
List<ActionData> popPendingAction(String userId);
|
List<ActionData> popPendingAction(String userId);
|
||||||
|
|
||||||
List<ActionData> listPreparedAction(String userId);
|
List<ActionData> listPreparedAction(String userId);
|
||||||
@@ -23,4 +23,8 @@ public interface ActionCapability {
|
|||||||
List<String> selectTendencyCache(String input);
|
List<String> selectTendencyCache(String input);
|
||||||
|
|
||||||
void updateTendencyCache(CacheAdjustData data);
|
void updateTendencyCache(CacheAdjustData data);
|
||||||
|
|
||||||
|
ExecutorService getExecutor(ActionCore.ExecutorType type);
|
||||||
|
|
||||||
|
Set<String> getExistedMetaActions();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package work.slhaf.partner.core.action;
|
package work.slhaf.partner.core.action;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jetbrains.annotations.NotNull;
|
|
||||||
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore;
|
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore;
|
||||||
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod;
|
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod;
|
||||||
import work.slhaf.partner.common.vector.VectorClient;
|
import work.slhaf.partner.common.vector.VectorClient;
|
||||||
@@ -11,25 +10,15 @@ import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
|||||||
import work.slhaf.partner.core.action.entity.cache.ActionCacheData;
|
import work.slhaf.partner.core.action.entity.cache.ActionCacheData;
|
||||||
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
|
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
|
||||||
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
|
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
|
||||||
import work.slhaf.partner.core.action.exception.ActionInitFailedException;
|
|
||||||
import work.slhaf.partner.core.action.exception.ActionLoadFailedException;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.*;
|
import java.util.*;
|
||||||
import java.nio.file.attribute.BasicFileAttributes;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
|
|
||||||
|
|
||||||
@SuppressWarnings("FieldMayBeFinal")
|
@SuppressWarnings("FieldMayBeFinal")
|
||||||
@CapabilityCore(value = "action")
|
@CapabilityCore(value = "action")
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@@ -38,7 +27,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
|
|||||||
/**
|
/**
|
||||||
* 对应本次交互即将执行或将要放置在行动池的预备任务,因此将以本次交互的uuid为键,其起到的作用相当于暂时的模块上下文
|
* 对应本次交互即将执行或将要放置在行动池的预备任务,因此将以本次交互的uuid为键,其起到的作用相当于暂时的模块上下文
|
||||||
*/
|
*/
|
||||||
private HashMap<String, List<ActionData>> preparedActions = new HashMap<>();
|
private HashMap<String, List<ActionData>> actionPool = new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 待确认任务,以userId区分不同用户,因为需要跨请求确认
|
* 待确认任务,以userId区分不同用户,因为需要跨请求确认
|
||||||
@@ -58,7 +47,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
|
|||||||
private final LinkedHashMap<String, MetaActionInfo> existedMetaActions = new LinkedHashMap<>();
|
private final LinkedHashMap<String, MetaActionInfo> existedMetaActions = new LinkedHashMap<>();
|
||||||
|
|
||||||
public ActionCore() throws IOException, ClassNotFoundException {
|
public ActionCore() throws IOException, ClassNotFoundException {
|
||||||
new ActionWatchService().launch();
|
new ActionWatchService(existedMetaActions, virtualExecutor).launch();
|
||||||
}
|
}
|
||||||
|
|
||||||
@CapabilityMethod
|
@CapabilityMethod
|
||||||
@@ -79,23 +68,19 @@ public class ActionCore extends PartnerCore<ActionCore> {
|
|||||||
|
|
||||||
@CapabilityMethod
|
@CapabilityMethod
|
||||||
public synchronized void putPreparedAction(String uuid, ActionData actionData) {
|
public synchronized void putPreparedAction(String uuid, ActionData actionData) {
|
||||||
preparedActions.computeIfAbsent(uuid, k -> {
|
actionPool.computeIfAbsent(uuid, k -> {
|
||||||
List<ActionData> temp = new ArrayList<>();
|
List<ActionData> temp = new ArrayList<>();
|
||||||
temp.add(actionData);
|
temp.add(actionData);
|
||||||
return temp;
|
return temp;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@CapabilityMethod
|
|
||||||
public synchronized List<ActionData> popPreparedAction(String userId) {
|
|
||||||
List<ActionData> infos = preparedActions.get(userId);
|
|
||||||
preparedActions.remove(userId);
|
|
||||||
return infos;
|
|
||||||
}
|
|
||||||
|
|
||||||
@CapabilityMethod
|
@CapabilityMethod
|
||||||
public List<ActionData> listPreparedAction(String userId) {
|
public List<ActionData> listPreparedAction(String userId) {
|
||||||
return preparedActions.get(userId);
|
List<ActionData> actions = actionPool.get(userId);
|
||||||
|
return actions.stream()
|
||||||
|
.filter(actionData -> actionData.getStatus().equals(ActionData.ActionStatus.PREPARE))
|
||||||
|
.toList();
|
||||||
}
|
}
|
||||||
|
|
||||||
@CapabilityMethod
|
@CapabilityMethod
|
||||||
@@ -155,6 +140,19 @@ public class ActionCore extends PartnerCore<ActionCore> {
|
|||||||
platformExecutor.execute(() -> adjustNotMatchPassed(notMatchPassed, inputVector, input, vectorClient));
|
platformExecutor.execute(() -> adjustNotMatchPassed(notMatchPassed, inputVector, input, vectorClient));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@CapabilityMethod
|
||||||
|
public ExecutorService getExecutor(ExecutorType type) {
|
||||||
|
return switch (type) {
|
||||||
|
case VIRTUAL -> virtualExecutor;
|
||||||
|
case PLATFORM -> platformExecutor;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@CapabilityMethod
|
||||||
|
public Set<String> getExistedMetaActions() {
|
||||||
|
return existedMetaActions.keySet();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 命中缓存且评估通过时
|
* 命中缓存且评估通过时
|
||||||
*
|
*
|
||||||
@@ -249,168 +247,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
|
|||||||
return "action-core";
|
return "action-core";
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
public enum ExecutorType {
|
||||||
private class ActionWatchService {
|
VIRTUAL, PLATFORM
|
||||||
|
|
||||||
private HashMap<Path, WatchKey> registeredPaths = new HashMap<>();
|
|
||||||
|
|
||||||
public void launch() {
|
|
||||||
Path path = Path.of(ACTION_PROGRAM);
|
|
||||||
scanActions(path.toFile());
|
|
||||||
launchActionDirectoryWatcher(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void launchActionDirectoryWatcher(Path path) {
|
|
||||||
WatchService watchService;
|
|
||||||
try {
|
|
||||||
watchService = FileSystems.getDefault().newWatchService();
|
|
||||||
setupShutdownHook(watchService);
|
|
||||||
registerParentToWatch(path, watchService);
|
|
||||||
registerSubToWatch(path, watchService);
|
|
||||||
virtualExecutor.execute(registerWatchTask(path, watchService));
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ActionInitFailedException("行动程序目录监听器启动失败", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setupShutdownHook(WatchService watchService) {
|
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
|
||||||
try {
|
|
||||||
watchService.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
private Runnable registerWatchTask(Path path, WatchService watchService) {
|
|
||||||
return () -> {
|
|
||||||
log.info("[{}] 行动程序目录监听器已启动", getCoreKey());
|
|
||||||
while (true) {
|
|
||||||
WatchKey key;
|
|
||||||
try {
|
|
||||||
key = watchService.take();
|
|
||||||
List<WatchEvent<?>> events = key.pollEvents();
|
|
||||||
for (WatchEvent<?> e : events) {
|
|
||||||
WatchEvent<Path> event = (WatchEvent<Path>) e;
|
|
||||||
WatchEvent.Kind<Path> kind = event.kind();
|
|
||||||
Path context = event.context();
|
|
||||||
log.info("[{}] 行动程序目录变更事件: {} - {}", getCoreKey(), kind.name(), context.toString());
|
|
||||||
Path thisDir = (Path) key.watchable();
|
|
||||||
//根据事件发生的目录进行分流,分为父目录事件和子程序事件
|
|
||||||
if (thisDir.equals(path)) {
|
|
||||||
handleParentDirEvent(kind, thisDir, context, watchService);
|
|
||||||
} else {
|
|
||||||
handleSubDirEvent(kind, thisDir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.info("监听线程被中断,准备退出...");
|
|
||||||
Thread.currentThread().interrupt(); // 恢复中断标志
|
|
||||||
break;
|
|
||||||
} catch (ClosedWatchServiceException e) {
|
|
||||||
log.info("WatchService 已关闭,监听线程退出。");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleSubDirEvent(WatchEvent.Kind<Path> kind, Path thisDir) {
|
|
||||||
// path为触发本次行动的文件的路径(当前位于某个action目录下)
|
|
||||||
// 先判定发生的目录前缀是否匹配(action、desc),否则忽略
|
|
||||||
if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
|
|
||||||
// CREATE、MODIFY 事件将触发一次检测,看当前thisDir中action和desc是否都具备,如果通过则尝试加载(put)。
|
|
||||||
boolean complete = checkComplete(thisDir);
|
|
||||||
if (!complete) return;
|
|
||||||
try {
|
|
||||||
MetaActionInfo newActionInfo = new MetaActionInfo(thisDir.toFile());
|
|
||||||
existedMetaActions.put(thisDir.toString(), newActionInfo);
|
|
||||||
} catch (ActionLoadFailedException e) {
|
|
||||||
log.warn("[{}] 行动信息重新加载失败,触发行为: {}", getCoreKey(), kind.name());
|
|
||||||
}
|
|
||||||
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
|
|
||||||
// DELETE 事件将会把该 MetaActionInfo 从记录中移除
|
|
||||||
existedMetaActions.remove(thisDir.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean checkComplete(Path thisDir) {
|
|
||||||
File[] files = thisDir.toFile().listFiles();
|
|
||||||
if (files == null) {
|
|
||||||
log.error("[{}]当前目录无法访问: [{}]", getCoreKey(), thisDir);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
boolean existedAction = false;
|
|
||||||
boolean existedDesc = false;
|
|
||||||
for (File file : files) {
|
|
||||||
String fileName = file.getName();
|
|
||||||
String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.'));
|
|
||||||
if (nameWithoutExt.equals("action")) existedAction = true;
|
|
||||||
else if (nameWithoutExt.equals("desc")) existedDesc = true;
|
|
||||||
}
|
|
||||||
return existedAction && existedDesc;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleParentDirEvent(WatchEvent.Kind<Path> kind, Path thisDir, Path context, WatchService watchService) {
|
|
||||||
Path path = Path.of(thisDir.toString(), context.toString());
|
|
||||||
// MODIFY 事件不进行处理
|
|
||||||
if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
|
|
||||||
try {
|
|
||||||
path.register(watchService,
|
|
||||||
StandardWatchEventKinds.ENTRY_CREATE,
|
|
||||||
StandardWatchEventKinds.ENTRY_DELETE,
|
|
||||||
StandardWatchEventKinds.ENTRY_MODIFY);
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("[{}] 新增行动程序目录监听失败: {}", getCoreKey(), path, e);
|
|
||||||
}
|
|
||||||
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
|
|
||||||
WatchKey remove = registeredPaths.remove(path);
|
|
||||||
remove.cancel();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void registerSubToWatch(Path path, WatchService watchService) throws IOException {
|
|
||||||
Files.walkFileTree(path, new SimpleFileVisitor<>() {
|
|
||||||
@Override
|
|
||||||
public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs) throws IOException {
|
|
||||||
if (dir.getFileName().startsWith(".")) return FileVisitResult.CONTINUE;
|
|
||||||
WatchKey key = dir.register(watchService,
|
|
||||||
StandardWatchEventKinds.ENTRY_CREATE,
|
|
||||||
StandardWatchEventKinds.ENTRY_DELETE,
|
|
||||||
StandardWatchEventKinds.ENTRY_MODIFY);
|
|
||||||
registeredPaths.put(dir, key);
|
|
||||||
return FileVisitResult.CONTINUE;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void registerParentToWatch(Path path, WatchService watchService) throws IOException {
|
|
||||||
WatchKey key = path.register(watchService,
|
|
||||||
StandardWatchEventKinds.ENTRY_CREATE,
|
|
||||||
StandardWatchEventKinds.ENTRY_DELETE,
|
|
||||||
StandardWatchEventKinds.ENTRY_MODIFY);
|
|
||||||
registeredPaths.put(path, key);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void scanActions(File file) {
|
|
||||||
if (!file.exists() || file.isFile()) {
|
|
||||||
throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath());
|
|
||||||
}
|
|
||||||
File[] files = file.listFiles();
|
|
||||||
if (files == null) {
|
|
||||||
throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath());
|
|
||||||
}
|
|
||||||
for (File f : files) {
|
|
||||||
try {
|
|
||||||
MetaActionInfo actionInfo = new MetaActionInfo(f);
|
|
||||||
existedMetaActions.put(f.getName(), actionInfo);
|
|
||||||
log.info("[{}] 行动程序[{}]已加载", getCoreKey(), actionInfo.getKey());
|
|
||||||
} catch (ActionLoadFailedException e) {
|
|
||||||
log.warn("[{}] 行动程序未加载: {}", getCoreKey(), e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,191 @@
|
|||||||
|
package work.slhaf.partner.core.action;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaActionInfo;
|
||||||
|
import work.slhaf.partner.core.action.exception.ActionInitFailedException;
|
||||||
|
import work.slhaf.partner.core.action.exception.ActionLoadFailedException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.*;
|
||||||
|
import java.nio.file.attribute.BasicFileAttributes;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Slf4j
|
||||||
|
class ActionWatchService {
|
||||||
|
|
||||||
|
private HashMap<Path, WatchKey> registeredPaths = new HashMap<>();
|
||||||
|
private LinkedHashMap<String, MetaActionInfo> existedMetaActions;
|
||||||
|
private ExecutorService virtualExecutor;
|
||||||
|
|
||||||
|
public ActionWatchService(LinkedHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService virtualExecutor) {
|
||||||
|
this.existedMetaActions = existedMetaActions;
|
||||||
|
this.virtualExecutor = virtualExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void launch() {
|
||||||
|
Path path = Path.of(ACTION_PROGRAM);
|
||||||
|
scanActions(path.toFile());
|
||||||
|
launchActionDirectoryWatcher(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void launchActionDirectoryWatcher(Path path) {
|
||||||
|
WatchService watchService;
|
||||||
|
try {
|
||||||
|
watchService = FileSystems.getDefault().newWatchService();
|
||||||
|
setupShutdownHook(watchService);
|
||||||
|
registerParentToWatch(path, watchService);
|
||||||
|
registerSubToWatch(path, watchService);
|
||||||
|
virtualExecutor.execute(registerWatchTask(path, watchService));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ActionInitFailedException("行动程序目录监听器启动失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupShutdownHook(WatchService watchService) {
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||||
|
try {
|
||||||
|
watchService.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Runnable registerWatchTask(Path path, WatchService watchService) {
|
||||||
|
return () -> {
|
||||||
|
log.info("行动程序目录监听器已启动");
|
||||||
|
while (true) {
|
||||||
|
WatchKey key;
|
||||||
|
try {
|
||||||
|
key = watchService.take();
|
||||||
|
List<WatchEvent<?>> events = key.pollEvents();
|
||||||
|
for (WatchEvent<?> e : events) {
|
||||||
|
WatchEvent<Path> event = (WatchEvent<Path>) e;
|
||||||
|
WatchEvent.Kind<Path> kind = event.kind();
|
||||||
|
Path context = event.context();
|
||||||
|
log.info("行动程序目录变更事件: {} - {}", kind.name(), context.toString());
|
||||||
|
Path thisDir = (Path) key.watchable();
|
||||||
|
//根据事件发生的目录进行分流,分为父目录事件和子程序事件
|
||||||
|
if (thisDir.equals(path)) {
|
||||||
|
handleParentDirEvent(kind, thisDir, context, watchService);
|
||||||
|
} else {
|
||||||
|
handleSubDirEvent(kind, thisDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.info("监听线程被中断,准备退出...");
|
||||||
|
Thread.currentThread().interrupt(); // 恢复中断标志
|
||||||
|
break;
|
||||||
|
} catch (ClosedWatchServiceException e) {
|
||||||
|
log.info("WatchService 已关闭,监听线程退出。");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleSubDirEvent(WatchEvent.Kind<Path> kind, Path thisDir) {
|
||||||
|
// path为触发本次行动的文件的路径(当前位于某个action目录下)
|
||||||
|
// 先判定发生的目录前缀是否匹配(action、desc),否则忽略
|
||||||
|
if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
|
||||||
|
// CREATE、MODIFY 事件将触发一次检测,看当前thisDir中action和desc是否都具备,如果通过则尝试加载(put)。
|
||||||
|
boolean complete = checkComplete(thisDir);
|
||||||
|
if (!complete) return;
|
||||||
|
try {
|
||||||
|
MetaActionInfo newActionInfo = new MetaActionInfo(thisDir.toFile());
|
||||||
|
existedMetaActions.put(thisDir.toString(), newActionInfo);
|
||||||
|
} catch (ActionLoadFailedException e) {
|
||||||
|
log.warn("行动信息重新加载失败,触发行为: {}", kind.name());
|
||||||
|
}
|
||||||
|
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
|
||||||
|
// DELETE 事件将会把该 MetaActionInfo 从记录中移除
|
||||||
|
existedMetaActions.remove(thisDir.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkComplete(Path thisDir) {
|
||||||
|
File[] files = thisDir.toFile().listFiles();
|
||||||
|
if (files == null) {
|
||||||
|
log.error("[{}]当前目录无法访问: [{}]", thisDir);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
boolean existedAction = false;
|
||||||
|
boolean existedDesc = false;
|
||||||
|
for (File file : files) {
|
||||||
|
String fileName = file.getName();
|
||||||
|
String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.'));
|
||||||
|
if (nameWithoutExt.equals("action")) existedAction = true;
|
||||||
|
else if (nameWithoutExt.equals("desc")) existedDesc = true;
|
||||||
|
}
|
||||||
|
return existedAction && existedDesc;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleParentDirEvent(WatchEvent.Kind<Path> kind, Path thisDir, Path context, WatchService watchService) {
|
||||||
|
Path path = Path.of(thisDir.toString(), context.toString());
|
||||||
|
// MODIFY 事件不进行处理
|
||||||
|
if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
|
||||||
|
try {
|
||||||
|
path.register(watchService,
|
||||||
|
StandardWatchEventKinds.ENTRY_CREATE,
|
||||||
|
StandardWatchEventKinds.ENTRY_DELETE,
|
||||||
|
StandardWatchEventKinds.ENTRY_MODIFY);
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("新增行动程序目录监听失败: {}", path, e);
|
||||||
|
}
|
||||||
|
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
|
||||||
|
WatchKey remove = registeredPaths.remove(path);
|
||||||
|
remove.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerSubToWatch(Path path, WatchService watchService) throws IOException {
|
||||||
|
Files.walkFileTree(path, new SimpleFileVisitor<>() {
|
||||||
|
@Override
|
||||||
|
public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs) throws IOException {
|
||||||
|
if (dir.getFileName().startsWith(".")) return FileVisitResult.CONTINUE;
|
||||||
|
WatchKey key = dir.register(watchService,
|
||||||
|
StandardWatchEventKinds.ENTRY_CREATE,
|
||||||
|
StandardWatchEventKinds.ENTRY_DELETE,
|
||||||
|
StandardWatchEventKinds.ENTRY_MODIFY);
|
||||||
|
registeredPaths.put(dir, key);
|
||||||
|
return FileVisitResult.CONTINUE;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void registerParentToWatch(Path path, WatchService watchService) throws IOException {
|
||||||
|
WatchKey key = path.register(watchService,
|
||||||
|
StandardWatchEventKinds.ENTRY_CREATE,
|
||||||
|
StandardWatchEventKinds.ENTRY_DELETE,
|
||||||
|
StandardWatchEventKinds.ENTRY_MODIFY);
|
||||||
|
registeredPaths.put(path, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scanActions(File file) {
|
||||||
|
if (!file.exists() || file.isFile()) {
|
||||||
|
throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath());
|
||||||
|
}
|
||||||
|
File[] files = file.listFiles();
|
||||||
|
if (files == null) {
|
||||||
|
throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath());
|
||||||
|
}
|
||||||
|
for (File f : files) {
|
||||||
|
try {
|
||||||
|
MetaActionInfo actionInfo = new MetaActionInfo(f);
|
||||||
|
existedMetaActions.put(f.getName(), actionInfo);
|
||||||
|
log.info("行动程序[{}]已加载", actionInfo.getKey());
|
||||||
|
} catch (ActionLoadFailedException e) {
|
||||||
|
log.warn("行动程序未加载: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,7 +13,7 @@ public abstract class ActionData {
|
|||||||
protected String tendency;
|
protected String tendency;
|
||||||
protected ActionStatus status;
|
protected ActionStatus status;
|
||||||
protected List<MetaAction> actionChain;
|
protected List<MetaAction> actionChain;
|
||||||
protected String Result;
|
protected String result;
|
||||||
protected String reason;
|
protected String reason;
|
||||||
protected String description;
|
protected String description;
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import work.slhaf.partner.common.Constant;
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
|
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
|
||||||
|
|
||||||
@@ -14,7 +13,7 @@ import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
|
|||||||
* 行动链中的单一元素,实现{@link Runnable}接口,封装了调用外部行动程序的必要信息,可被执行
|
* 行动链中的单一元素,实现{@link Runnable}接口,封装了调用外部行动程序的必要信息,可被执行
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class MetaAction implements Comparable<MetaAction>, Callable<Void> {
|
public class MetaAction implements Comparable<MetaAction>, Runnable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 行动key,用于标识与定位行动程序
|
* 行动key,用于标识与定位行动程序
|
||||||
@@ -47,13 +46,12 @@ public class MetaAction implements Comparable<MetaAction>, Callable<Void> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() {
|
public void run() {
|
||||||
File action = loadFromFile();
|
File action = loadFromFile();
|
||||||
if (!action.exists()) {
|
if (!action.exists()) {
|
||||||
result = new Result();
|
result = new Result();
|
||||||
result.setSuccess(false);
|
result.setSuccess(false);
|
||||||
result.setData("Action file not found: " + action.getAbsolutePath());
|
result.setData("Action file not found: " + action.getAbsolutePath());
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
@@ -66,7 +64,6 @@ public class MetaAction implements Comparable<MetaAction>, Callable<Void> {
|
|||||||
result.setSuccess(false);
|
result.setSuccess(false);
|
||||||
result.setData(e.getMessage());
|
result.setData(e.getMessage());
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private File loadFromFile() {
|
private File loadFromFile() {
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ import java.util.concurrent.locks.Lock;
|
|||||||
public interface CognationCapability {
|
public interface CognationCapability {
|
||||||
|
|
||||||
List<Message> getChatMessages();
|
List<Message> getChatMessages();
|
||||||
void setChatMessages(List<Message> chatMessages);
|
|
||||||
void cleanMessage(List<Message> messages);
|
void cleanMessage(List<Message> messages);
|
||||||
Lock getMessageLock();
|
Lock getMessageLock();
|
||||||
void addMetaMessage(String userId, MetaMessage metaMessage);
|
void addMetaMessage(String userId, MetaMessage metaMessage);
|
||||||
|
|||||||
@@ -63,11 +63,6 @@ public class CognationCore extends PartnerCore<CognationCore> {
|
|||||||
return currentMemoryId;
|
return currentMemoryId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@CapabilityMethod
|
|
||||||
public void setChatMessages(List<Message> chatMessages) {
|
|
||||||
this.chatMessages = chatMessages;
|
|
||||||
}
|
|
||||||
|
|
||||||
@CapabilityMethod
|
@CapabilityMethod
|
||||||
public void cleanMessage(List<Message> messages) {
|
public void cleanMessage(List<Message> messages) {
|
||||||
messageLock.lock();
|
messageLock.lock();
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ public class ActionDispatcher extends PostRunningModule {
|
|||||||
//对于将触发的PLANNING action,理想做法是将执行工具做成执行链的形式,模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力,避免绑定固定流程
|
//对于将触发的PLANNING action,理想做法是将执行工具做成执行链的形式,模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力,避免绑定固定流程
|
||||||
executor.execute(() -> {
|
executor.execute(() -> {
|
||||||
String userId = context.getUserId();
|
String userId = context.getUserId();
|
||||||
List<ActionData> preparedActions = actionCapability.popPreparedAction(userId);
|
List<ActionData> preparedActions = actionCapability.listPreparedAction(userId);
|
||||||
//分类成PLANNING和IMMEDIATE两类
|
//分类成PLANNING和IMMEDIATE两类
|
||||||
List<ScheduledActionData> scheduledActions = new ArrayList<>();
|
List<ScheduledActionData> scheduledActions = new ArrayList<>();
|
||||||
List<ImmediateActionData> immediateActions = new ArrayList<>();
|
List<ImmediateActionData> immediateActions = new ArrayList<>();
|
||||||
|
|||||||
@@ -1,22 +1,120 @@
|
|||||||
package work.slhaf.partner.module.modules.action.dispatcher.executor;
|
package work.slhaf.partner.module.modules.action.dispatcher.executor;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
|
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
|
||||||
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
|
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
|
||||||
|
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
|
||||||
|
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
|
||||||
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
|
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
|
||||||
import work.slhaf.partner.core.action.ActionCapability;
|
import work.slhaf.partner.core.action.ActionCapability;
|
||||||
|
import work.slhaf.partner.core.action.ActionCore;
|
||||||
|
import work.slhaf.partner.core.action.entity.ActionData;
|
||||||
import work.slhaf.partner.core.action.entity.ImmediateActionData;
|
import work.slhaf.partner.core.action.entity.ImmediateActionData;
|
||||||
|
import work.slhaf.partner.core.action.entity.MetaAction;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
@AgentSubModule
|
@AgentSubModule
|
||||||
public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionData>, Void> {
|
public class ActionExecutor extends AgentRunningSubModule<List<ImmediateActionData>, Void> implements ActivateModel {
|
||||||
|
|
||||||
@InjectCapability
|
@InjectCapability
|
||||||
private ActionCapability actionCapability;
|
private ActionCapability actionCapability;
|
||||||
|
|
||||||
|
private ExecutorService virtualExecutor;
|
||||||
|
private ExecutorService platformExecutor;
|
||||||
|
|
||||||
|
@Init
|
||||||
|
public void init() {
|
||||||
|
virtualExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
|
||||||
|
platformExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void execute(List<ImmediateActionData> immediateActions) {
|
public Void execute(List<ImmediateActionData> immediateActions) {
|
||||||
|
for (ImmediateActionData actionData : immediateActions) {
|
||||||
|
handleActionData(actionData);
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleActionData(ImmediateActionData actionData) {
|
||||||
|
virtualExecutor.execute(() -> {
|
||||||
|
actionData.setStatus(ActionData.ActionStatus.EXECUTING);
|
||||||
|
List<MetaAction> actionChain = actionData.getActionChain();
|
||||||
|
actionChain.sort(MetaAction::compareTo);
|
||||||
|
List<MetaAction> virtual = new ArrayList<>();
|
||||||
|
List<MetaAction> platform = new ArrayList<>();
|
||||||
|
int order;
|
||||||
|
for (int index = 0; index < actionChain.size(); index++) {
|
||||||
|
MetaAction metaAction = actionChain.get(index);
|
||||||
|
// 根据io类型放入合适的列表
|
||||||
|
if (metaAction.isIo()) {
|
||||||
|
virtual.add(metaAction);
|
||||||
|
} else {
|
||||||
|
platform.add(metaAction);
|
||||||
|
}
|
||||||
|
// 记录当前order
|
||||||
|
order = metaAction.getOrder();
|
||||||
|
// 如果下一个行动单元的order与当前不同,则执行并清空当前组内容
|
||||||
|
if (actionChain.size() <= (index + 1) || actionChain.get(index + 1).getOrder() != order) {
|
||||||
|
runGroupAction(virtual, platform, actionChain);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO 考虑是否使用phaser来承担同组的动态任务新增
|
||||||
|
private void runGroupAction(List<MetaAction> virtual, List<MetaAction> platform, List<MetaAction> actionChain) {
|
||||||
|
boolean first = true;
|
||||||
|
do {
|
||||||
|
CountDownLatch latch = new CountDownLatch(virtual.size() + platform.size());
|
||||||
|
runGroupAction(virtual, virtualExecutor, actionChain, latch, first);
|
||||||
|
runGroupAction(platform, platformExecutor, actionChain, latch, first);
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("[{}] CountDownLatch被中断", modelKey());
|
||||||
|
}
|
||||||
|
first = false;
|
||||||
|
} while (!virtual.isEmpty() || !platform.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runGroupAction(List<MetaAction> actions, ExecutorService executor, List<MetaAction> actionChain, CountDownLatch latch, boolean first) {
|
||||||
|
if (!first && !new HashSet<>(actionChain).containsAll(actions)) {
|
||||||
|
// 该部分对应LLM新增本组执行单元时,将其添加至actionChain记录。对于后续组级别的新增,将直接在上一级调用处体现,除了注意并发安全外无需额外处理
|
||||||
|
int index = actionChain.indexOf(actions.getLast());
|
||||||
|
actionChain.addAll(index, actions);
|
||||||
|
}
|
||||||
|
for (MetaAction action : actions) {
|
||||||
|
executor.execute(() -> {
|
||||||
|
boolean success = true;
|
||||||
|
do {
|
||||||
|
// 该循环对应LLM的调整参数后重试
|
||||||
|
if (!success) {
|
||||||
|
//TODO LLM决策是重构参数、执行自对话反思、还是选择向用户求助(通过cognationCore暴露方法,可能需要修改其他模块以进行适应)
|
||||||
|
|
||||||
|
}
|
||||||
|
action.run();
|
||||||
|
success = action.getResult().isSuccess();
|
||||||
|
} while (!success);
|
||||||
|
latch.countDown();
|
||||||
|
//TODO 将执行结果写入特定对话角色记忆(cognationCore暴露方法)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String modelKey() {
|
||||||
|
return "action_executor";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean withBasicPrompt() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user