推进 ActionExecutor 下的‘行动生成与执行’部分

- 新增 RunnerClient 抽象类,并划分 SandboxRunnerClient、LocalRunnerClient两个子类(内容待完善)。前者负责对接 SandboxRunner 模块,后者直接使用本地作为执行环境(但不推荐)。
- 将 ActionWatchService 划为 LocalRunnerClient 的内部类,负责采用本地执行环境时,监听行动程序变化
- 完善 ActionRepairer 处的修复逻辑
- 调整 MetaAction 中路径获取逻辑

这提交方式真该调整一下了,这阶段推进容易攒太多,但又不好停手。或许阶段目标可以保留,但推进点应该可以细化🤔
This commit is contained in:
2025-12-15 21:54:24 +08:00
parent 6e3deced77
commit 4b852e0049
18 changed files with 652 additions and 296 deletions

View File

@@ -6,6 +6,7 @@ public final class Constant {
public static final String DATA = "./data";
public static final String MEMORY_DATA = DATA + "/memory";
public static final String ACTION_PROGRAM = DATA + "/action";
public static final String TMP_ACTION_DIR_LOCAL = DATA + "/tmp";
}
}

View File

@@ -7,6 +7,7 @@ import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.PhaserRecord;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.runner.SandboxRunnerClient;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -46,5 +47,5 @@ public interface ActionCapability {
boolean checkExists(String... actionKeys);
void execute(MetaAction metaAction);
SandboxRunnerClient runnerClient();
}

View File

@@ -16,6 +16,8 @@ import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
import work.slhaf.partner.core.action.exception.ActionDataNotFoundException;
import work.slhaf.partner.core.action.exception.MetaActionNotFoundException;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.core.action.runner.SandboxRunnerClient;
import java.io.IOException;
import java.util.*;
@@ -57,10 +59,11 @@ public class ActionCore extends PartnerCore<ActionCore> {
*/
private final Map<String, MetaActionInfo> existedMetaActions = new HashMap<>();
private final List<PhaserRecord> phaserRecords = new ArrayList<>();
private final SandboxRunnerClient sandboxRunnerClient = new SandboxRunnerClient();
private RunnerClient runnerClient;
public ActionCore() throws IOException, ClassNotFoundException {
new ActionWatchService(existedMetaActions, virtualExecutor).launch();
// TODO 通过 AgentConfigManager指定
runnerClient = new SandboxRunnerClient(existedMetaActions, virtualExecutor);
setupShutdownHook();
}
@@ -248,8 +251,8 @@ public class ActionCore extends PartnerCore<ActionCore> {
}
@CapabilityMethod
public void execute(MetaAction metaAction) {
sandboxRunnerClient.run(metaAction);
public RunnerClient runnerClient() {
return runnerClient;
}
/**

View File

@@ -1,192 +0,0 @@
package work.slhaf.partner.core.action;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.exception.ActionInitFailedException;
import work.slhaf.partner.core.action.exception.ActionLoadFailedException;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
// TODO 后续需迁移至 SandboxRunner作为容器内的监听逻辑
@SuppressWarnings("unchecked")
@Slf4j
class ActionWatchService {
private final HashMap<Path, WatchKey> registeredPaths = new HashMap<>();
private final Map<String, MetaActionInfo> existedMetaActions;
private final ExecutorService virtualExecutor;
public ActionWatchService(Map<String, MetaActionInfo> existedMetaActions, ExecutorService virtualExecutor) {
this.existedMetaActions = existedMetaActions;
this.virtualExecutor = virtualExecutor;
}
public void launch() {
Path path = Path.of(ACTION_PROGRAM);
scanActions(path.toFile());
launchActionDirectoryWatcher(path);
}
private void launchActionDirectoryWatcher(Path path) {
WatchService watchService;
try {
watchService = FileSystems.getDefault().newWatchService();
setupShutdownHook(watchService);
registerParentToWatch(path, watchService);
registerSubToWatch(path, watchService);
virtualExecutor.execute(registerWatchTask(path, watchService));
} catch (IOException e) {
throw new ActionInitFailedException("行动程序目录监听器启动失败", e);
}
}
private void setupShutdownHook(WatchService watchService) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
watchService.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
private Runnable registerWatchTask(Path path, WatchService watchService) {
return () -> {
log.info("行动程序目录监听器已启动");
while (true) {
WatchKey key;
try {
key = watchService.take();
List<WatchEvent<?>> events = key.pollEvents();
for (WatchEvent<?> e : events) {
WatchEvent<Path> event = (WatchEvent<Path>) e;
WatchEvent.Kind<Path> kind = event.kind();
Path context = event.context();
log.info("行动程序目录变更事件: {} - {}", kind.name(), context.toString());
Path thisDir = (Path) key.watchable();
//根据事件发生的目录进行分流,分为父目录事件和子程序事件
if (thisDir.equals(path)) {
handleParentDirEvent(kind, thisDir, context, watchService);
} else {
handleSubDirEvent(kind, thisDir);
}
}
} catch (InterruptedException e) {
log.info("监听线程被中断,准备退出...");
Thread.currentThread().interrupt(); // 恢复中断标志
break;
} catch (ClosedWatchServiceException e) {
log.info("WatchService 已关闭,监听线程退出。");
break;
}
}
};
}
private void handleSubDirEvent(WatchEvent.Kind<Path> kind, Path thisDir) {
// path为触发本次行动的文件的路径(当前位于某个action目录下)
// 先判定发生的目录前缀是否匹配(action、desc),否则忽略
if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
// CREATE、MODIFY 事件将触发一次检测看当前thisDir中action和desc是否都具备如果通过则尝试加载(put)。
boolean complete = checkComplete(thisDir);
if (!complete) return;
try {
MetaActionInfo newActionInfo = new MetaActionInfo(thisDir.toFile());
existedMetaActions.put(thisDir.toString(), newActionInfo);
} catch (ActionLoadFailedException e) {
log.warn("行动信息重新加载失败,触发行为: {}", kind.name());
}
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
// DELETE 事件将会把该 MetaActionInfo 从记录中移除
existedMetaActions.remove(thisDir.toString());
}
}
private boolean checkComplete(Path thisDir) {
File[] files = thisDir.toFile().listFiles();
if (files == null) {
log.error("当前目录无法访问: [{}]", thisDir);
return false;
}
boolean existedAction = false;
boolean existedDesc = false;
for (File file : files) {
String fileName = file.getName();
String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.'));
if (nameWithoutExt.equals("action")) existedAction = true;
else if (nameWithoutExt.equals("desc")) existedDesc = true;
}
return existedAction && existedDesc;
}
private void handleParentDirEvent(WatchEvent.Kind<Path> kind, Path thisDir, Path context, WatchService watchService) {
Path path = Path.of(thisDir.toString(), context.toString());
// MODIFY 事件不进行处理
if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
try {
path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
} catch (IOException e) {
log.error("新增行动程序目录监听失败: {}", path, e);
}
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
WatchKey remove = registeredPaths.remove(path);
remove.cancel();
}
}
private void registerSubToWatch(Path path, WatchService watchService) throws IOException {
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs) throws IOException {
if (dir.getFileName().startsWith(".")) return FileVisitResult.CONTINUE;
WatchKey key = dir.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
registeredPaths.put(dir, key);
return FileVisitResult.CONTINUE;
}
});
}
private void registerParentToWatch(Path path, WatchService watchService) throws IOException {
WatchKey key = path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
registeredPaths.put(path, key);
}
private void scanActions(File file) {
if (!file.exists() || file.isFile()) {
throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath());
}
File[] files = file.listFiles();
if (files == null) {
throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath());
}
for (File f : files) {
try {
MetaActionInfo actionInfo = new MetaActionInfo(f);
existedMetaActions.put(f.getName(), actionInfo);
log.info("行动程序[{}]已加载", actionInfo.getKey());
} catch (ActionLoadFailedException e) {
log.warn("行动程序未加载: {}", e.getMessage());
}
}
}
}

View File

@@ -1,36 +0,0 @@
package work.slhaf.partner.core.action;
import work.slhaf.partner.core.action.entity.MetaAction;
import java.nio.file.Path;
/**
* 基于 Http 与 WebSocket 的沙盒执行器客户端,负责:
* <ul>
* <li>
* 发送行动单元数据
* </li>
* <li>
* 实时更新获取已存在行动列表
* </li>
* <li>
* 向传入的 MetaAction 回写执行结果
* </li>
* </ul>
*/
class SandboxRunnerClient {
public SandboxRunnerClient() {
// 连接沙盒执行器(websocket)
}
public void run(MetaAction metaAction) {
// 获取已存在行动列表
Path path = metaAction.checkAndGetPath();
if (!metaAction.getResult().isSuccess()) {
return;
}
// 调用沙盒执行器
}
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.partner.core.action.entity;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import java.util.List;
@Data
public class GeneratedData {
private List<String> dependencies;
private String code;
private String codeType;
private boolean serialize;
private JSONObject responseSchema;
}

View File

@@ -1,14 +1,12 @@
package work.slhaf.partner.core.action.entity;
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
import lombok.Data;
import org.jetbrains.annotations.NotNull;
import java.io.File;
import java.nio.file.Path;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
import lombok.Data;
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
/**
* 行动链中的单一元素,封装了调用外部行动程序的必要信息与结果容器,可被{@link work.slhaf.partner.core.action.ActionCapability}执行
@@ -27,7 +25,7 @@ public class MetaAction implements Comparable<MetaAction> {
/**
* 行动结果,包括执行状态和相应内容(执行结果或者错误信息)
*/
private final Result result = new Result();
private Result result = new Result();
/**
* 执行顺序,升序排列
*/
@@ -44,18 +42,12 @@ public class MetaAction implements Comparable<MetaAction> {
private Path path;
public Path checkAndGetPath() {
public void resetPath() {
path = switch (type) {
case PLUGIN -> Path.of(ACTION_PROGRAM, key, "action.jar");
case SCRIPT -> Path.of(ACTION_PROGRAM, key, "action.py");
case MCP -> Path.of(ACTION_PROGRAM, key, "action.json");
};
File action = path.toFile();
if (!action.exists()) {
result.setStatus(ResultStatus.FAILED);
result.setData("Action file not found: " + action.getAbsolutePath());
}
return path;
}
@Override

View File

@@ -0,0 +1,320 @@
package work.slhaf.partner.core.action.runner;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.MetaActionType;
import work.slhaf.partner.core.action.exception.ActionInitFailedException;
import work.slhaf.partner.core.action.exception.ActionLoadFailedException;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static work.slhaf.partner.common.Constant.Path.ACTION_PROGRAM;
import static work.slhaf.partner.common.Constant.Path.TMP_ACTION_DIR_LOCAL;
@Slf4j
public class LocalRunnerClient extends RunnerClient {
public LocalRunnerClient(Map<String, MetaActionInfo> existedMetaActions, ExecutorService executor) {
super(existedMetaActions, executor);
ActionWatchService watchService = new ActionWatchService();
watchService.launch();
}
@Override
protected RunnerResponse doRun(MetaAction metaAction) {
RunnerResponse response;
try {
// 由于三种方式返回的内容结构变化太大,所以选择油具体执行逻辑返回真正的 Response 对象
response = switch (metaAction.getType()) {
case MetaActionType.SCRIPT -> doRunWithScript(metaAction);
case MetaActionType.MCP -> doRunWithMcp(metaAction);
case MetaActionType.PLUGIN -> doRunWithPlugin(metaAction);
};
} catch (Exception e) {
response = new RunnerResponse();
response.setOk(false);
response.setData(e.getLocalizedMessage());
}
return response;
}
private RunnerResponse doRunWithMcp(MetaAction metaAction) {
RunnerResponse response = new RunnerResponse();
return response;
}
private RunnerResponse doRunWithPlugin(MetaAction metaAction) {
RunnerResponse response = new RunnerResponse();
return response;
}
private RunnerResponse doRunWithScript(MetaAction metaAction) {
RunnerResponse response = new RunnerResponse();
return response;
}
@Override
protected Path doBuildTempPath(MetaAction tempAction, String codeType) {
return Path.of(TMP_ACTION_DIR_LOCAL, System.currentTimeMillis() + "-" + tempAction.getKey() + codeType);
}
@Override
protected void doSerialize(MetaAction tempAction, String code, String codeType) throws IOException {
Path path = tempAction.getPath();
File file = path.toFile();
file.createNewFile();
Files.writeString(path, code);
}
@Override
public JSONObject listSysDependencies() {
// 先只列出系统/环境的 Python 依赖
// TODO 在 AgentConfigManager 内配置启用的脚本语言及对应的扩展名
// 这里的逻辑后续需要替换为“根据 AgentConfigManager 读取到的脚本语言启用情况,遍历并列出当前系统环境依赖”
// 还需要将返回值调整为相应的数据类
// 后续还需要将不同语言的处理逻辑分散到不同方法内,这里为了验证,先写死在当前方法
JSONObject sysDependencies = new JSONObject();
sysDependencies.put("language", "Python");
JSONArray dependencies = sysDependencies.putArray("dependencies");
SystemExecResult pyResult = exec("pip", "li", "--format=feeze");
System.out.println(pyResult);
if (pyResult.isOk()) {
List<String> resultList = pyResult.getResultList();
for (String result : resultList) {
JSONObject element = dependencies.addObject();
String[] split = result.split("==");
element.put("name", split[0]);
element.put("version", split[1]);
}
} else {
JSONObject element = dependencies.addObject();
element.put("error", pyResult.getTotal());
}
return sysDependencies;
}
private SystemExecResult exec(String... command) {
SystemExecResult result = new SystemExecResult();
List<String> resultList = new ArrayList<>();
result.setResultList(resultList);
StringBuilder s = new StringBuilder();
try {
Process process = Runtime.getRuntime().exec(command);
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
s.append(line);
resultList.add(line);
}
int exitCode = process.waitFor();
result.setOk(exitCode == 0);
result.setTotal(s.toString().isEmpty() ? "响应为空" : s.toString());
} catch (Exception e) {
result.setOk(false);
result.setTotal(e.getLocalizedMessage());
}
if (result.getTotal().isEmpty()) {
result.setOk(false);
}
return result;
}
@Data
private static class SystemExecResult {
private boolean ok;
private String total;
private List<String> resultList;
}
private class ActionWatchService {
private final HashMap<Path, WatchKey> registeredPaths = new HashMap<>();
private void launch() {
Path path = Path.of(ACTION_PROGRAM);
scanActions(path.toFile());
launchActionDirectoryWatcher(path);
}
private void launchActionDirectoryWatcher(Path path) {
WatchService watchService;
try {
watchService = FileSystems.getDefault().newWatchService();
setupShutdownHook(watchService);
registerParentToWatch(path, watchService);
registerSubToWatch(path, watchService);
executor.execute(registerWatchTask(path, watchService));
} catch (IOException e) {
throw new ActionInitFailedException("行动程序目录监听器启动失败", e);
}
}
private void setupShutdownHook(WatchService watchService) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
watchService.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
private Runnable registerWatchTask(Path path, WatchService watchService) {
return () -> {
log.info("行动程序目录监听器已启动");
while (true) {
WatchKey key;
try {
key = watchService.take();
List<WatchEvent<?>> events = key.pollEvents();
for (WatchEvent<?> e : events) {
@SuppressWarnings("unchecked")
WatchEvent<Path> event = (WatchEvent<Path>) e;
WatchEvent.Kind<Path> kind = event.kind();
Path context = event.context();
log.info("行动程序目录变更事件: {} - {}", kind.name(), context.toString());
Path thisDir = (Path) key.watchable();
// 根据事件发生的目录进行分流,分为父目录事件和子程序事件
if (thisDir.equals(path)) {
handleParentDirEvent(kind, thisDir, context, watchService);
} else {
handleSubDirEvent(kind, thisDir);
}
}
} catch (InterruptedException e) {
log.info("监听线程被中断,准备退出...");
Thread.currentThread().interrupt(); // 恢复中断标志
break;
} catch (ClosedWatchServiceException e) {
log.info("WatchService 已关闭,监听线程退出。");
break;
}
}
};
}
private void handleSubDirEvent(WatchEvent.Kind<Path> kind, Path thisDir) {
// path为触发本次行动的文件的路径(当前位于某个action目录下)
// 先判定发生的目录前缀是否匹配(action、desc),否则忽略
if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
// CREATE、MODIFY 事件将触发一次检测看当前thisDir中action和desc是否都具备如果通过则尝试加载(put)。
boolean complete = checkComplete(thisDir);
if (!complete)
return;
try {
MetaActionInfo newActionInfo = new MetaActionInfo(thisDir.toFile());
existedMetaActions.put(thisDir.toString(), newActionInfo);
} catch (ActionLoadFailedException e) {
log.warn("行动信息重新加载失败,触发行为: {}", kind.name());
}
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
// DELETE 事件将会把该 MetaActionInfo 从记录中移除
existedMetaActions.remove(thisDir.toString());
}
}
private boolean checkComplete(Path thisDir) {
File[] files = thisDir.toFile().listFiles();
if (files == null) {
log.error("当前目录无法访问: [{}]", thisDir);
return false;
}
boolean existedAction = false;
boolean existedDesc = false;
for (File file : files) {
String fileName = file.getName();
String nameWithoutExt = fileName.substring(0, fileName.lastIndexOf('.'));
if (nameWithoutExt.equals("action"))
existedAction = true;
else if (nameWithoutExt.equals("desc"))
existedDesc = true;
}
return existedAction && existedDesc;
}
private void handleParentDirEvent(WatchEvent.Kind<Path> kind, Path thisDir, Path context,
WatchService watchService) {
Path path = Path.of(thisDir.toString(), context.toString());
// MODIFY 事件不进行处理
if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
try {
path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
} catch (IOException e) {
log.error("新增行动程序目录监听失败: {}", path, e);
}
} else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
WatchKey remove = registeredPaths.remove(path);
remove.cancel();
}
}
private void registerSubToWatch(Path path, WatchService watchService) throws IOException {
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public @NotNull FileVisitResult preVisitDirectory(@NotNull Path dir, @NotNull BasicFileAttributes attrs)
throws IOException {
if (dir.getFileName().startsWith("."))
return FileVisitResult.CONTINUE;
WatchKey key = dir.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
registeredPaths.put(dir, key);
return FileVisitResult.CONTINUE;
}
});
}
private void registerParentToWatch(Path path, WatchService watchService) throws IOException {
WatchKey key = path.register(watchService,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
registeredPaths.put(path, key);
}
private void scanActions(File file) {
if (!file.exists() || file.isFile()) {
throw new ActionInitFailedException("未找到行动程序目录: " + file.getAbsolutePath());
}
File[] files = file.listFiles();
if (files == null) {
throw new ActionInitFailedException("目录无法访问: " + file.getAbsolutePath());
}
for (File f : files) {
try {
MetaActionInfo actionInfo = new MetaActionInfo(f);
existedMetaActions.put(f.getName(), actionInfo);
log.info("行动程序[{}]已加载", actionInfo.getKey());
} catch (ActionLoadFailedException e) {
log.warn("行动程序加载失败", e);
}
}
}
}
}

View File

@@ -0,0 +1,108 @@
package work.slhaf.partner.core.action.runner;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.core.action.entity.GeneratedData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaAction.Result;
import work.slhaf.partner.core.action.entity.MetaAction.ResultStatus;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@Slf4j
public abstract class RunnerClient {
protected final Map<String, MetaActionInfo> existedMetaActions;
protected final ExecutorService executor;
/**
* ActionCore 将注入虚拟线程池
*/
public RunnerClient(Map<String, MetaActionInfo> existedMetaActions, ExecutorService executor) {
this.existedMetaActions = existedMetaActions;
this.executor = executor;
}
/**
* 执行行动程序
*/
public void run(MetaAction metaAction) {
// 获取已存在行动列表
Result result = metaAction.getResult();
if (!result.getStatus().equals(ResultStatus.WAITING)) {
return;
}
RunnerResponse response = doRun(metaAction);
result.setData(response.getData());
result.setStatus(response.isOk() ? ResultStatus.SUCCESS : ResultStatus.FAILED);
}
//TODO 将执行划分为 MCP、OriginalScript两种类型SCRIPT、PLUGIN、MCP的分类不再必要
protected abstract RunnerResponse doRun(MetaAction metaAction);
/**
* 将临时行动程序放入等待队列,根据其是否需要持久序列化,监听其执行状态,执行成功则持久序列化
*
* @throws IOException
*/
public Path getPathAndSerialize(MetaAction tempAction, GeneratedData generatedData) throws IOException {
String code = generatedData.getCode();
String codeType = generatedData.getCodeType();
Path path = doBuildTempPath(tempAction, codeType);
tempAction.setPath(path);
doSerialize(tempAction, code, codeType);
if (generatedData.isSerialize()) {
waitingSerialize(tempAction, code, codeType, generatedData.getResponseSchema());
}
return path;
}
private void waitingSerialize(MetaAction tempAction, String code, String codeType, JSONObject jsonObject) {
executor.execute(() -> {
Result result = tempAction.getResult();
while (true) {
switch (result.getStatus()) {
case ResultStatus.WAITING -> {
try {
Thread.sleep(300);
} catch (InterruptedException ignored) {
}
}
case ResultStatus.FAILED -> {
break;
}
case ResultStatus.SUCCESS -> {
tempAction.resetPath();
try {
doSerialize(tempAction, code, codeType);
} catch (IOException e) {
log.error("行动程序序列化出错: {}", tempAction.getKey(), e);
}
}
}
}
});
}
protected abstract Path doBuildTempPath(MetaAction tempAction, String codeType);
protected abstract void doSerialize(MetaAction tempAction, String code, String codeType) throws IOException;
/**
* 列出执行环境下的系统依赖情况
*/
public abstract JSONObject listSysDependencies();
@Data
protected static class RunnerResponse {
private boolean ok;
private String data;
}
}

View File

@@ -0,0 +1,54 @@
package work.slhaf.partner.core.action.runner;
import com.alibaba.fastjson2.JSONObject;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* 基于 Http 与 WebSocket 的沙盒执行器客户端,负责:
* <ul>
* <li>
* 发送行动单元数据
* </li>
* <li>
* 实时更新获取已存在行动列表
* </li>
* <li>
* 向传入的 MetaAction 回写执行结果
* </li>
* </ul>
*/
public class SandboxRunnerClient extends RunnerClient {
@Override
protected Path doBuildTempPath(MetaAction tempAction, String codeType) {
// TODO Auto-generated method stub
return null;
}
@Override
protected void doSerialize(MetaAction tempAction, String code, String codeType) {
// TODO Auto-generated method stub
}
public SandboxRunnerClient(Map<String, MetaActionInfo> existedMetaActions, ExecutorService executor) { // 连接沙盒执行器(websocket)
super(existedMetaActions, executor);
}
public RunnerResponse doRun(MetaAction metaAction) {
// 调用沙盒执行器
return null;
}
@Override
public JSONObject listSysDependencies() {
// TODO Auto-generated method stub
return null;
}
}

View File

@@ -11,6 +11,7 @@ import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.*;
import work.slhaf.partner.core.action.entity.ActionData.ActionStatus;
import work.slhaf.partner.core.action.entity.MetaAction.ResultStatus;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.*;
@@ -43,6 +44,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
private ExecutorService virtualExecutor;
private ExecutorService platformExecutor;
private RunnerClient runnerClient;
private final AssemblyHelper assemblyHelper = new AssemblyHelper();
@@ -50,6 +52,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
public void init() {
virtualExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
platformExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
runnerClient = actionCapability.runnerClient();
}
@Override
@@ -128,7 +131,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
// 这个功能应该交给 PhaserRecord 实现,尽量确保功能一致性
setActionParams(action, phaserRecord, userId);
do {
actionCapability.execute(action);
runnerClient.run(action);
MetaAction.Result result = action.getResult();
// 该循环对应LLM的调整参数后重试
if (!result.getStatus().equals(ResultStatus.SUCCESS)) {
@@ -140,7 +143,7 @@ public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, V
} else {
break;
}
actionCapability.execute(action);
runnerClient.run(action);
} while (true);
// TODO 执行结果不再需要写入特定位置,当前的 ActionCapability
// 内部的行动池已经足以承担这个功能,但这也就意味着行动池或许需要考虑特殊的序列化形式避免内存占用过高,

View File

@@ -7,6 +7,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
@@ -14,6 +15,9 @@ import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore.ExecutorType;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaAction.Result;
import work.slhaf.partner.core.action.entity.MetaAction.ResultStatus;
import work.slhaf.partner.core.action.runner.SandboxRunnerClient;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult;
@@ -53,6 +57,12 @@ public class ActionRepairer extends AgentRunningSubModule<RepairerInput, Repaire
private DynamicActionGenerator dynamicActionGenerator;
private AssembleHelper assembleHelper = new AssembleHelper();
private SandboxRunnerClient runnerClient;
@Init
void init() {
runnerClient = actionCapability.runnerClient();
}
@Override
public RepairerResult execute(RepairerInput data) {
@@ -94,8 +104,21 @@ public class ActionRepairer extends AgentRunningSubModule<RepairerInput, Repaire
RepairerResult result = new RepairerResult();
GeneratorResult generatorResult = dynamicActionGenerator.execute(generatorInput);
MetaAction tempAction = generatorResult.getTempAction();
actionCapability.execute(tempAction);
result.getFixedData().add(tempAction.getResult().getData());
if (tempAction == null) {
result.setStatus(RepairerStatus.FAILED);
return result;
}
runnerClient.run(tempAction);
// 根据 tempAction 的执行状态设置修复结果
Result actionResult = tempAction.getResult();
if (actionResult.getStatus() != ResultStatus.SUCCESS) {
result.setStatus(RepairerStatus.FAILED);
return result;
}
result.setStatus(RepairerStatus.OK);
result.getFixedData().add(actionResult.getData());
return result;
}
@@ -117,7 +140,7 @@ public class ActionRepairer extends AgentRunningSubModule<RepairerInput, Repaire
executor = action.isIo() ? virtual : platform;
executor.execute(() -> {
try {
actionCapability.execute(action);
runnerClient.run(action);
result.getFixedData().add(action.getResult().getData());
} catch (Exception e) {
log.error("行动单元执行失败: {}", key, e);

View File

@@ -1,19 +1,22 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import java.util.List;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult;
import work.slhaf.partner.common.util.ExtractUtil;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.entity.GeneratedData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionType;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult;
import java.nio.file.Path;
/**
* 负责依据输入内容生成可执行的动态行动单元,并选择是否持久化至 SandboxRunner 容器内
@@ -22,27 +25,38 @@ import work.slhaf.partner.core.action.entity.MetaActionType;
public class DynamicActionGenerator extends AgentRunningSubModule<GeneratorInput, GeneratorResult>
implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
private RunnerClient runnerClient;
@Init
void init() {
runnerClient = actionCapability.runnerClient();
}
@Override
public GeneratorResult execute(GeneratorInput input) {
GeneratorResult result = new GeneratorResult();
try {
// 由于 SCRIPT 类型程序都是在 SandboxRunner 内部的磁盘上加载然后执行的,
// 所以此处的输入内容也只需要指定输入参数、临时key、是否持久化即可路径将按照指定规则统一构建不可交给LLM生成
String prompt = buildPrompt(input);
// 响应结果需要包含几个特殊数据: 依赖项、代码内容、是否序列化、响应数据释义
ChatResponse response = this.singleChat(prompt);
GeneratorResponseData generatorData = JSONObject
.parseObject(ExtractUtil.extractJson(response.getMessage()), GeneratorResponseData.class);
GeneratedData generatorData = JSONObject
.parseObject(ExtractUtil.extractJson(response.getMessage()), GeneratedData.class);
MetaAction tempAction = buildAction(input);
waitingSerialize(tempAction, generatorData);
// 将临时行动单元序列化至临时文件夹,并设置程序路径、放置在队列中,等待执行状态变化,并根据序列化选项选择是否补充 MetaActionInfo 并持久序列化
// 通过 ActionCapability 暴露的接口序列化至临时文件夹同时返回Path对象并设置。队列建议交给 SandboxRunner
// 持有,包括监听与序列化线程
Path path = runnerClient.getPathAndSerialize(tempAction, generatorData);
tempAction.setPath(path);
result.setTempAction(tempAction);
return null;
} catch (Exception e) {
result.setTempAction(null);
}
/**
* 将临时行动单元序列化至临时文件夹,并设置程序路径、放置在队列中,等待执行状态变化,并根据序列化选项选择是否补充 MetaActionInfo 并持久序列化
*/
private void waitingSerialize(MetaAction tempAction, GeneratorResponseData generatorData) {
return result;
}
private MetaAction buildAction(GeneratorInput input) {
@@ -72,12 +86,4 @@ public class DynamicActionGenerator extends AgentRunningSubModule<GeneratorInput
public boolean withBasicPrompt() {
return false;
}
@Data
private class GeneratorResponseData {
private List<String> dependencies;
private String code;
private boolean serialize;
private JSONObject responseSchema;
}
}

View File

@@ -1,11 +1,7 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import java.util.HashMap;
import java.util.List;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
@@ -16,6 +12,9 @@ import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.Extra
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.HistoryAction;
import java.util.HashMap;
import java.util.List;
/**
* 负责依据输入内容进行行动单元的参数信息提取
*/

View File

@@ -1,9 +1,9 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import java.util.Map;
import lombok.Data;
import java.util.Map;
@Data
public class ExtractorResult {
private boolean ok;

View File

@@ -1,9 +1,9 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import java.util.Map;
import lombok.Data;
import java.util.Map;
@Data
public class GeneratorInput {
private String key;

View File

@@ -3,6 +3,7 @@ package work.slhaf.partner.module.modules.action.interventor.handler;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore.ExecutorType;
@@ -10,6 +11,7 @@ import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ActionData.ActionStatus;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.PhaserRecord;
import work.slhaf.partner.core.action.runner.SandboxRunnerClient;
import work.slhaf.partner.module.modules.action.interventor.entity.InterventionType;
import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention;
import work.slhaf.partner.module.modules.action.interventor.handler.entity.HandlerInput;
@@ -30,6 +32,13 @@ public class InterventionHandler extends AgentRunningSubModule<HandlerInput, Voi
@InjectCapability
private ActionCapability actionCapability;
private SandboxRunnerClient runnerClient;
@Init
void init() {
runnerClient = actionCapability.runnerClient();
}
/**
* 针对‘行动干预’做出处理
*
@@ -96,7 +105,7 @@ public class InterventionHandler extends AgentRunningSubModule<HandlerInput, Voi
.map(actionKey -> actionCapability.loadMetaAction(actionKey))
.toList();
//TODO 需要将干预逻辑下放至 ActionCapability ,因为 ActionExecutor 中也存在干预操作
// TODO 需要将干预逻辑下放至 ActionCapability ,因为 ActionExecutor 中也存在干预操作
switch (intervention.getType()) {
case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions);
case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions, phaser);
@@ -118,7 +127,8 @@ public class InterventionHandler extends AgentRunningSubModule<HandlerInput, Voi
* 在未进入执行阶段的行动单元组新增新的行动
*/
private void handleAppend(ActionData actionData, int order, List<MetaAction> actions) {
if (order <= actionData.getExecutingStage()) return;
if (order <= actionData.getExecutingStage())
return;
actionData.getActionChain().put(order, actions);
}
@@ -127,7 +137,8 @@ public class InterventionHandler extends AgentRunningSubModule<HandlerInput, Voi
* 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动, 如果插入位置正处于执行阶段, 则启动执行线程, 通过 Phaser 确保同步
*/
private void handleInsert(ActionData actionData, int order, List<MetaAction> actions, Phaser phaser) {
if (order < actionData.getExecutingStage()) return;
if (order < actionData.getExecutingStage())
return;
phaser.register();
try {
@@ -144,7 +155,7 @@ public class InterventionHandler extends AgentRunningSubModule<HandlerInput, Voi
executor = action.isIo() ? virtualExecutor : platformExecutor;
executor.execute(() -> {
try {
actionCapability.execute(action);
runnerClient.run(action);
} finally {
phaser.arriveAndDeregister();
}
@@ -158,7 +169,8 @@ public class InterventionHandler extends AgentRunningSubModule<HandlerInput, Voi
}
private void handleDelete(ActionData actionData, int order, List<MetaAction> actions) {
if (order <= actionData.getExecutingStage()) return;
if (order <= actionData.getExecutingStage())
return;
Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
if (actionChain.containsKey(order)) {

View File

@@ -0,0 +1,47 @@
import com.alibaba.fastjson2.JSONObject;
import org.junit.jupiter.api.Test;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.runner.LocalRunnerClient;
import work.slhaf.partner.core.action.runner.RunnerClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SystemTest {
@Test
void execTest() {
// exec("pwd");
// exec("ls", "-la");
String r = exec("pip", "st", "--format=freeze");
System.out.println(r);
}
private String exec(String... command) {
StringBuilder s = new StringBuilder();
ProcessBuilder processBuilder = new ProcessBuilder(command);
try {
Process process = processBuilder.start();
java.io.InputStream inputStream = process.getInputStream();
java.util.Scanner scanner = new java.util.Scanner(inputStream).useDelimiter("\\A");
if (scanner.hasNext()) {
s.append(scanner.next());
}
} catch (IOException e) {
e.printStackTrace();
}
return s.toString();
}
@Test
void localRunnerClientTest() {
Map<String, MetaActionInfo> existedMetaActions = new HashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
RunnerClient client = new LocalRunnerClient(existedMetaActions, executor);
JSONObject res = client.listSysDependencies();
System.out.println(res.toString());
}
}