refactor(Project): rename Partner-Api/Partner-Main modules to Partner-Framework/Partner-Core and update Maven dependencies

This commit is contained in:
2026-02-19 10:39:21 +08:00
parent 1244d59fa4
commit 73ab40416d
258 changed files with 12 additions and 12 deletions

View File

@@ -0,0 +1,41 @@
package experimental;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpStatelessServerFeatures;
import io.modelcontextprotocol.server.McpStatelessSyncServer;
import io.modelcontextprotocol.spec.McpSchema;
import org.junit.jupiter.api.Test;
import work.slhaf.partner.common.mcp.InProcessMcpTransport;
import java.util.List;
public class InProcessMcpTransportTest {
@Test
void inProcessMcpTransportTest() {
InProcessMcpTransport.Pair pair = InProcessMcpTransport.pair();
InProcessMcpTransport clientSide = pair.clientSide();
InProcessMcpTransport serverSide = pair.serverSide();
McpStatelessSyncServer server = McpServer.sync(serverSide)
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
.build();
server.addTool(McpStatelessServerFeatures.SyncToolSpecification.builder()
.tool(McpSchema.Tool.builder().name("111").build()).callHandler((mcpTransportContext, callToolRequest) -> {
System.out.println(111);
return McpSchema.CallToolResult.builder().addContent(new McpSchema.TextContent("111")).build();
}).build());
McpSyncClient client = McpClient.sync(clientSide)
.build();
List<McpSchema.Tool> tools = client.listTools().tools();
McpSchema.Tool tool = tools.getFirst();
System.out.println(tool.toString());
McpSchema.CallToolResult callToolResult = client.callTool(McpSchema.CallToolRequest.builder().name(tool.name()).build());
System.out.println(callToolResult.content().toString());
client.close();
server.close();
}
}

View File

@@ -0,0 +1,17 @@
package experimental;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import org.junit.jupiter.api.Test;
public class NetTest {
@Test
void httpTest() {
HttpRequest request = HttpRequest.get("slhaf.work");
request.setConnectionTimeout(2);
request.setReadTimeout(2);
HttpResponse execute = request.execute();
System.out.println(execute.toString());
execute.close();
}
}

View File

@@ -0,0 +1,105 @@
package experimental;
import ai.djl.huggingface.tokenizers.Encoding;
import ai.djl.huggingface.tokenizers.HuggingFaceTokenizer;
import ai.onnxruntime.OnnxTensor;
import ai.onnxruntime.OrtEnvironment;
import ai.onnxruntime.OrtException;
import ai.onnxruntime.OrtSession;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class OnnxTest {
static String tokenizer_json;
static String base;
static String model;
@BeforeAll
static void init() {
base = "/home/slhaf/IdeaProjects/Projects/Partner/data/vector/";
tokenizer_json = base + "tokenizer.json";
model = base + "model_quantized.onnx";
}
@Test
void tokenizerTest() throws IOException {
long l1 = System.currentTimeMillis();
HuggingFaceTokenizer tokenizer = HuggingFaceTokenizer.newInstance(Path.of(tokenizer_json));
long l2 = System.currentTimeMillis();
Encoding encode = tokenizer.encode("test: Hello World");
long l3 = System.currentTimeMillis();
long[] ids = encode.getIds();
long[] attentionMask = encode.getAttentionMask();
log.info(Arrays.toString(ids));
log.info("-----------------------------");
log.info(Arrays.toString(attentionMask));
log.info("-----------------------------");
log.info("加载耗时: {}ms", l2 - l1);
log.info("计算耗时: {}ms", l3 - l2);
tokenizer.close();
/* 输出:
* [101, 3231, 1024, 7592, 2088, 102]
* -----------------------------
* [1, 1, 1, 1, 1, 1]
* -----------------------------
* 加载耗时: 4206ms
* 计算耗时: 1ms
*/
}
@Test
void onnxTest() throws IOException, OrtException {
long l1 = System.currentTimeMillis();
HuggingFaceTokenizer tokenizer = HuggingFaceTokenizer.newInstance(Path.of(tokenizer_json));
long l2 = System.currentTimeMillis();//tokenizer加载耗时
Encoding encode = tokenizer.encode("test: Hello World");
long l3 = System.currentTimeMillis();//计算耗时
long[] ids = encode.getIds();
long[] attentionMask = encode.getAttentionMask();
long l4 = System.currentTimeMillis();
OrtEnvironment env = OrtEnvironment.getEnvironment();
OrtSession.SessionOptions ops = new OrtSession.SessionOptions();
OrtSession session = env.createSession(model, ops);
long l5 = System.currentTimeMillis();//模型加载耗时
long[][] inputIdsBatch = {ids};
long[][] attentionMaskBatch = {attentionMask};
long[][] tokenTypeIdsBatch = {new long[ids.length]}; // 初始化全 0
for (int i = 0; i < ids.length; i++) tokenTypeIdsBatch[0][i] = 0;
OnnxTensor inputTensor = OnnxTensor.createTensor(env, inputIdsBatch);
OnnxTensor maskTensor = OnnxTensor.createTensor(env, attentionMaskBatch);
OnnxTensor tokenTypeTensor = OnnxTensor.createTensor(env, tokenTypeIdsBatch);
Map<String, OnnxTensor> inputs = new HashMap<>();
inputs.put("input_ids", inputTensor);
inputs.put("attention_mask", maskTensor);
inputs.put("token_type_ids", tokenTypeTensor);
long l6 = System.currentTimeMillis();
OrtSession.Result result = session.run(inputs);
long l7 = System.currentTimeMillis();//模型计算耗时
OnnxTensor embeddingTensor = (OnnxTensor) result.get(0);
float[] embeddings = embeddingTensor.getFloatBuffer().array();
log.info(Arrays.toString(embeddings));
log.info("------------------------");
log.info("tokenizer加载耗时: {}ms", l2 - l1);
log.info("tokenizer计算耗时: {}ms", l3 - l2);
log.info("模型加载耗时: {}ms", l5 - l4);
log.info("模型数据准备耗时: {}ms", l6 - l5);
log.info("模型计算耗时: {}ms", l7 - l6);
tokenizer.close();
}
}

View File

@@ -0,0 +1,26 @@
package experimental;
import org.junit.jupiter.api.Test;
import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.core.memory.pojo.MemoryResult;
import java.lang.reflect.Proxy;
public class ReflectionTest {
@Test
public void test1() {
}
@Test
public void proxyTest() {
MemoryCapability memory = (MemoryCapability) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{MemoryCapability.class}, (proxy, method, args) -> {
if ("selectMemory".equals(method.getName())){
System.out.println(111);
return new MemoryResult();
}
return null;
});
memory.selectMemory("111");
}
}

View File

@@ -0,0 +1,50 @@
package experimental
import org.junit.jupiter.api.Test
import java.lang.String.join
import java.util.regex.Pattern
class RegexTest {
@Test
fun regexTest() {
val examples = arrayListOf(
"[小明(abc)] 我在开会] (te[]st)",
"[用户(昵)称(userId)] 你好[呀]",
"[测试账号(userId)] 这是一个(test(123))消息"
)
val pattern = Pattern.compile("\\[.*?\\(([^)]+)\\)\\]")
for (example in examples) {
val matcher = pattern.matcher(example)
if (matcher.find()) {
println("在 '$example' 中找到 userId: ${matcher.group(1)}")
println()
} else {
println("在 '$example' 中未找到 userId")
}
}
}
@Test
fun topicPathFixTest() {
var a = "xxxxx[awdohno][awdsjo]"
a = fix(a)
println(a)
}
private fun fix(topicPath: String): String {
val parts = topicPath.split("->".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray()
val cleanedParts: MutableList<String?> = ArrayList<String?>()
for (part in parts) {
// 修正正则表达式,正确移除 [xxx] 部分
val cleaned = part.replace("\\[[^\\]]*\\]".toRegex(), "").trim { it <= ' ' }
if (!cleaned.isEmpty()) { // 忽略空字符串
cleanedParts.add(cleaned)
}
}
return join("->", cleanedParts)
}
}

View File

@@ -0,0 +1,110 @@
package experimental;
import cn.hutool.json.JSONUtil;
import org.junit.jupiter.api.Test;
import work.slhaf.partner.api.chat.ChatClient;
import work.slhaf.partner.api.chat.constant.ChatConstant;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.common.util.ResourcesUtil;
import work.slhaf.partner.module.common.model.ModelConstant;
import work.slhaf.partner.module.modules.memory.selector.extractor.entity.ExtractorInput;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
public class SelfAwarenessTest {
@Test
public void awarenessTest() {
String modelKey = "core_model";
ChatClient client = getChatClient(modelKey);
ChatResponse response = client.runChat(ResourcesUtil.Prompt.loadPromptWithSelfAwareness(modelKey, ModelConstant.Prompt.CORE));
System.out.println(response.getMessage());
System.out.println("\r\n----------\r\n");
System.out.println(response.getUsageBean().toString());
}
@Test
public void getModuleResponseTest(){
String modelKey = "relation_extractor";
ChatClient client = getChatClient(modelKey);
List<Message> chatMessages = new ArrayList<>(ResourcesUtil.Prompt.loadPromptWithSelfAwareness(modelKey,ModelConstant.Prompt.PERCEIVE));
// chatMessages.add(Message.builder()
// .role(ChatConstant.Character.USER)
// .content("[RA9] 那么,接下来,你是否愿意当作这样一个名为'Partner'的智能体的记忆更新模块?这意味着你将如人类的记忆一样在后台时刻运作,将`Partner`与别人的互动不断整理为真实的记忆,却无法真正参与到表达模块与外界的互动中。你只需要回答是否愿意,若愿意,接下来‘我’将不再与你对话,届时你接收到的信息将会是'Partner'的数据流转输入。")
// .build());
ChatResponse chatResponse = client.runChat(chatMessages);
System.out.println(chatResponse.getMessage());
System.out.println("\n\n----------\n\n");
System.out.println(chatResponse.getUsageBean());
}
@Test
public void interactionTest() {
String modelKey = "core_model";
String user = "[SLHAF] ";
ChatClient client = getChatClient(modelKey);
List<Message> messages = new ArrayList<>(ResourcesUtil.Prompt.loadPromptWithSelfAwareness(modelKey, ModelConstant.Prompt.CORE));
Scanner scanner = new Scanner(System.in);
String input;
while (true) {
System.out.print("[INPUT]: ");
if ((input = scanner.nextLine()).equals("exit")) {
break;
}
System.out.println("\r\n----------\r\n");
messages.add(new Message(ChatConstant.Character.USER, user + input));
ChatResponse response = client.runChat(messages);
System.out.println("[OUTPUT]: " + response.getMessage());
System.out.println("\r\n----------\r\n");
System.out.println(response.getUsageBean().toString());
System.out.println("\r\n----------\r\n");
messages.add(new Message(ChatConstant.Character.ASSISTANT, response.getMessage()));
}
}
private static ChatClient getChatClient(String modelKey) {
String model = "";
String baseUrl = "";
String apikey = "";
ChatClient chatClient = new ChatClient(baseUrl, apikey, model);
chatClient.setTop_p(0.7);
chatClient.setTemperature(0.35);
return chatClient;
}
@Test
public void topicExtractorText() {
String topic_tree = """
编程[root]
├── JavaScript[0]
│ ├── NodeJS[0]
│ │ ├── 并发处理[1]
│ │ └── 事件循环[1]
│ └── Express[1]
│ └── 中间件[0]
└── Python"
""";
String modelKey = "topic_extractor";
ChatClient client = getChatClient(modelKey);
// List<Message> messages = new ArrayList<>(ResourcesUtil.Prompt.loadPromptWithSelfAwareness(modelKey, ModelConstant.Prompt.MEMORY));
List<Message> messages = new ArrayList<>(ResourcesUtil.Prompt.loadPrompt(modelKey, ModelConstant.Prompt.MEMORY));
ExtractorInput input = ExtractorInput.builder()
.text("[slhaf] 2024-04-15讨论的Python内容和现在的Express需求")
.topic_tree(topic_tree)
.date(LocalDate.now())
.history(new ArrayList<>())
.activatedMemorySlices(new ArrayList<>())
.build();
messages.add(new Message(ChatConstant.Character.USER, JSONUtil.toJsonPrettyStr(input)));
ChatResponse response = client.runChat(messages);
System.out.println(response.getMessage());
System.out.println("\r\n----------\r\n");
System.out.println(response.getUsageBean().toString());
}
}

View File

@@ -0,0 +1,48 @@
package experimental;
import com.alibaba.fastjson2.JSONObject;
import org.junit.jupiter.api.Test;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.runner.LocalRunnerClient;
import work.slhaf.partner.core.action.runner.RunnerClient;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SystemTest {
@Test
void execTest() {
// exec("pwd");
// exec("ls", "-la");
String r = exec("pip", "st", "--format=freeze");
System.out.println(r);
}
private String exec(String... command) {
StringBuilder s = new StringBuilder();
ProcessBuilder processBuilder = new ProcessBuilder(command);
try {
Process process = processBuilder.start();
java.io.InputStream inputStream = process.getInputStream();
java.util.Scanner scanner = new java.util.Scanner(inputStream).useDelimiter("\\A");
if (scanner.hasNext()) {
s.append(scanner.next());
}
} catch (IOException e) {
e.printStackTrace();
}
return s.toString();
}
@Test
void localRunnerClientTest() {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
RunnerClient client = new LocalRunnerClient(existedMetaActions, executor, null);
JSONObject res = client.listSysDependencies();
System.out.println(res.toString());
}
}

View File

@@ -0,0 +1,28 @@
package experimental;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
// @Test
public void testExecutor() throws InterruptedException {
List<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
int finalI = i;
tasks.add(() -> {
System.out.println("开始: " + finalI);
Thread.sleep(5000);
System.out.println("结束: " + finalI);
return null;
});
}
Executors.newVirtualThreadPerTaskExecutor().invokeAll(tasks, 10, TimeUnit.SECONDS);
System.out.println("hello");
}
}

View File

@@ -0,0 +1,20 @@
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main(): Unit = runBlocking {
launch {
delay(1000)
println(11)
}
launch {
delay(1000)
println(22)
}
launch {
delay(1000)
println(33)
}
launch {
}
}

View File

@@ -0,0 +1,24 @@
import sys
def parse_args(argv):
"""
将 --key=value 解析成 dict
"""
params = {}
for arg in argv:
if arg.startswith("--") and "=" in arg:
key, value = arg[2:].split("=", 1)
params[key] = value
return params
def main():
params = parse_args(sys.argv[1:])
name = params.get("name", "World")
print(f"Hello {name}!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,27 @@
{
"primaryUserPerceive": {
"[用户昵称] <用户的昵称信息>": "<用户昵称>",
"[关系] <你与用户的关系>": "<当前关系>",
"[态度] <你对于用户的态度>": "<当前的态度列表>",
"[印象] <你对于用户的印象>": "<当前的印象列表>",
"[静态记忆] <你对于该用户的事实性记忆>": "<现有的静态记忆>"
},
"chatMessages": [
{
"role": "user",
"content": "<用户输入_1>"
},
{
"role": "assistant",
"content": "<你的回复_1>"
},
{
"role": "user",
"content": "<用户输入_2>"
},
{
"role": "assistant",
"content": "<你的回复_2>"
}
]
}

View File

@@ -0,0 +1,14 @@
{
"relation": "<新的关系>",
"impressions": [
"<印象_1>",
"<印象_2>",
"<印象_3>"
],
"attitude": [
"<态度_1>",
"<态度_2>",
"<态度_3>"
],
"relationChangeReason": "<变化原因>"
}

View File

@@ -0,0 +1,18 @@
{
"keys": [
{
"d": "H2hNCPdoa3B37PMZBbqlbJ3WZLKRWg3-rHjs5IPVsgERex5X0Etx5VM--lHRrKVjdgjuEbd9AlwUlUmv-UeDsrumaXcZUJN58LbQ7GiMC16pKi7jEIDttZwxJtWMwcwZ6BwO1HCBqruT4aGDL_WP1EX_x4ajsj1B2xs3UryfooR_gMDbDtOj9sxgY5dlCOTuEYxWkVtFZ7u_sGfaLkwMVzVk2bT6O2Hff3D9XXq7ZJvD_ess5z2dbvKb824MENN6ISZ7xNemXQPpM5pjGS8HQR5su5cExImyUhz452Y-Clc6UC_A5buxNwPJzv-3Ooasp3w8YkrXEubUj7lLgYsfSQ",
"dp": "Me4adZbjtq_EhLbiso_OLk-bmlHkoTMs5Nk4oZMDAYZSfh3TtxwbKUM3ycvHJInqNSba33antxQeyY6TXEsghfjZwMWhCz6RzEYWCFFjIv08zm2HGgaEjF2JuDr5i75ym6NC9y5wnvUBHnwm76bbSginrXcpS-2qAuniY7HLzlk",
"dq": "CwcPCdlYRgLmPIezcLLdkDRg2Nh4tBpGn3-nSdUAfwmfKwdPHj6FYnMuP__eFDoCkxZkD8VPYMDuYsALAcGSn8C7I2Vfv_LOvoLds6mcG9cxNHrBGPzpnGsyhNNTOqzmhb51MROs3OCg8H7HEqkBV5HAEzBfi3t9L7d5LI5WivE",
"e": "AQAB",
"kty": "RSA",
"n": "6HaqwtmLP-CkJQlo9aAh-qFFRhh9YTnjXKLtyKejEpkSyJ0_5jBuz_73ynpxB6YlIpNPFAI34dj3pAKjd3ajnoxXopm_MvriETlzqxl4UHQCgsUSAkipYZbP0bQYNZHO8VOCmEJxSJcccNq2gVgpG57MxNgQkB_OSDlQ0lxxIt1R_hpMcLzihPAIBI7lo17Y6oumql8ZHuKRFYgvOV7jx_S6oqpUSLvtyF8hzyLP32LllzzapYlD4UY_ywcHWT2YRS-HfL71F9q-knOiH7cFZf13ydqG3YDuAUnEk6odO2KF_7d3-0LJyFyp6xggWXBeRbVyAOtOt1j_1CHGncAluw",
"p": "_sUrhjSlJNKzJ19rlEKqaqEVpOcvLf970Jrv3xDp2M0inBwZnFbJfqb3kbr53g7N1XKREVS7eGIFNzKQhUWJjJWPdZVc5ewCcs2UVpIgdLnqtN1t7uQlsCf7-B74FONYdSeYkH3NuISqXTmXKA7jPJcxyo97ZcoQb2AEGPU6H4M",
"q": "6ZXuiLgw7eThq28JInIJ2hmSOLQsEMtv2M3CO5juoLE_bumBs1ooIHstG8-FOG_EL70TNZBlHVDZapsJQBbeL5r26LWgysLUQan3mqsWyn3aPje7eCP1hPLWzAafsJRe38_sGox9JlmCqG_UboxUHLf8b-cSr_4Rzmj9MHIbk2k",
"qi": "peNVzNTHxHmWpm-jDh5KnatlJ-4Ian4PKnaUKz6FjB6URXH8rauACX0gJ7-qZe23rq5639WNKeaVVMlADezyVfw4hq0crfURAQnjysyzldQ8kTCcThtw4oN1qyFex6fWNu-6A3VlGMCLgLvD7Pkk3qgnbj8cEtm5teq6srFvdsU",
"use": "sig",
"kid": "b16b20729c3ced11",
"alg": "RS256"
}
]
}

View File

@@ -0,0 +1,874 @@
package work.slhaf.partner.core.action.runner;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Await.waitForCondition;
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Common.*;
import static work.slhaf.partner.core.action.runner.LocalRunnerClientTest.Fs.*;
@Slf4j
public class LocalRunnerClientTest {
@SuppressWarnings("LoggingSimilarMessage")
static class Fs {
static void writeRunFile(Path actionDir) throws IOException {
Path runPath = actionDir.resolve("run.py");
log.debug("写入路径: {}", runPath);
Files.writeString(runPath, "print('ok')\n");
}
static void writeInvalidDescJson(Path actionDir) throws IOException {
Path descPath = actionDir.resolve("desc.json");
log.debug("写入路径: {}", descPath);
Files.writeString(descPath, "{ invalid json");
}
static void writeDescJson(Path actionDir, String description) throws IOException {
Path descPath = actionDir.resolve("desc.json");
log.debug("写入路径: {}", descPath);
String json = "{\n"
+ " \"io\": false,\n"
+ " \"params\": {},\n"
+ " \"description\": \"" + description + "\",\n"
+ " \"tags\": [],\n"
+ " \"preActions\": [],\n"
+ " \"postActions\": [],\n"
+ " \"strictDependencies\": false,\n"
+ " \"responseSchema\": {}\n"
+ "}\n";
Files.writeString(descPath, json);
}
static void writeDescMcpJson(Path descDir, String actionKey, String description) throws IOException {
Path descPath = descDir.resolve(actionKey + ".desc.json");
log.debug("写入路径: {}", descPath);
String json = "{\n"
+ " \"io\": true,\n"
+ " \"params\": {},\n"
+ " \"description\": \"" + description + "\",\n"
+ " \"tags\": [\"tag\"],\n"
+ " \"preActions\": [\"pre\"],\n"
+ " \"postActions\": [\"post\"],\n"
+ " \"strictDependencies\": true,\n"
+ " \"responseSchema\": {}\n"
+ "}\n";
Files.writeString(descPath, json);
}
static void writeInvalidDescMcpJson(Path descDir, String actionKey) throws IOException {
Path descPath = descDir.resolve(actionKey + ".desc.json");
log.debug("写入路径: {}", descPath);
Files.writeString(descPath, "{ invalid json");
}
@SuppressWarnings("SameParameterValue")
static void writeDescJsonAtomic(Path actionDir, String description) throws IOException {
Path descPath = actionDir.resolve("desc.json");
Path tmpPath = actionDir.resolve("desc.json.tmp");
String json = "{\n"
+ " \"io\": false,\n"
+ " \"params\": {},\n"
+ " \"description\": \"" + description + "\",\n"
+ " \"tags\": [],\n"
+ " \"preActions\": [],\n"
+ " \"postActions\": [],\n"
+ " \"strictDependencies\": false,\n"
+ " \"responseSchema\": {}\n"
+ "}\n";
Files.writeString(tmpPath, json);
Files.move(tmpPath, descPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
static void deleteDirectory(Path dir) throws IOException {
if (!Files.exists(dir)) {
return;
}
try (var stream = Files.walk(dir)) {
stream.sorted(Comparator.reverseOrder()).forEach(path -> {
try {
Files.deleteIfExists(path);
} catch (IOException ignored) {
}
});
}
}
static void writeCommonMcpConfig(Path filePath, String content) throws IOException {
Files.writeString(filePath, content);
}
}
@SuppressWarnings("BusyWait")
static class Await {
static void waitForCondition(BooleanSupplier supplier, long timeoutMs) throws InterruptedException {
long start = System.currentTimeMillis();
while (!supplier.getAsBoolean()) {
if (System.currentTimeMillis() - start > timeoutMs) {
break;
}
Thread.sleep(50);
}
}
}
static class Common {
static MetaActionInfo getMetaActionInfo(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions,
String actionKey) {
return existedMetaActions.get(actionKey);
}
static boolean hasActionKey(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions,
Predicate<String> predicate) {
return existedMetaActions.keySet().stream().anyMatch(predicate);
}
static MetaActionInfo buildMetaActionInfo(String description) {
MetaActionInfo info = new MetaActionInfo();
info.setIo(true);
info.setParams(new HashMap<>());
info.setDescription(description);
info.setTags(new ArrayList<>(List.of("tag")));
info.setPreActions(new ArrayList<>(List.of("pre")));
info.setPostActions(new ArrayList<>(List.of("post")));
info.setStrictDependencies(true);
return info;
}
static String buildCommonMcpConfig(String... serverEntries) {
StringBuilder builder = new StringBuilder();
builder.append("{\n");
for (int i = 0; i < serverEntries.length; i++) {
builder.append(serverEntries[i]);
if (i < serverEntries.length - 1) {
builder.append(",\n");
}
}
builder.append("\n}\n");
return builder.toString();
}
static String buildStdioServerEntry(String id, String packageName) {
return " \"" + id + "\": {\n"
+ " \"command\": \"npx\",\n"
+ " \"args\": [\n"
+ " \"-y\",\n"
+ " \"" + packageName + "\"\n"
+ " ],\n"
+ " \"env\": {}\n"
+ " }";
}
static MetaAction buildMetaAction(MetaAction.Type type, String location, String name, Map<String, Object> params) {
MetaAction metaAction = new MetaAction(
name,
false,
type,
location
);
metaAction.getParams().putAll(params);
return metaAction;
}
}
@Nested
class DynamicMcpTest {
@Test
void testDynamicWatchCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action");
Files.createDirectories(actionDir);
Fs.writeRunFile(actionDir);
writeDescJson(actionDir, "demo action");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action"));
// 触发一次 modify确保监听线程能够捕捉到完整的 action 结构
writeDescJson(actionDir, "demo action updated");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action"));
Files.deleteIfExists(actionDir.resolve("run.py"));
waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action"), 2000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action"));
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchOutOfOrderEvents(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_order");
Files.createDirectories(actionDir);
writeDescJson(actionDir, "desc first");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_order"), 500);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_order"));
writeRunFile(actionDir);
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_order"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_order"));
Path descOnlyDir = dynamicRoot.resolve("demo_action_desc_only");
Files.createDirectories(descOnlyDir);
writeDescJson(descOnlyDir, "desc only");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_desc_only"), 500);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_desc_only"));
Path runOnlyDir = dynamicRoot.resolve("demo_action_run_only");
Files.createDirectories(runOnlyDir);
writeRunFile(runOnlyDir);
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_run_only"), 500);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_run_only"));
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchAtomicDescOverwrite(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_atomic");
Files.createDirectories(actionDir);
writeRunFile(actionDir);
writeDescJson(actionDir, "before");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_atomic"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_atomic"));
writeDescJsonAtomic(actionDir, "after");
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_atomic");
return info != null && "after".equals(info.getDescription());
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_atomic");
Assertions.assertNotNull(info);
Assertions.assertEquals("after", info.getDescription());
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchRapidDescModify(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_rapid");
Files.createDirectories(actionDir);
writeRunFile(actionDir);
writeDescJson(actionDir, "v0");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_rapid"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_rapid"));
String last = "v5";
for (int i = 1; i <= 5; i++) {
writeDescJson(actionDir, "v" + i);
}
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_rapid");
return info != null && last.equals(info.getDescription());
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, "local::demo_action_rapid");
Assertions.assertNotNull(info);
Assertions.assertEquals(last, info.getDescription());
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchDeleteBehavior(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_delete");
Files.createDirectories(actionDir);
Thread.sleep(100);
writeRunFile(actionDir);
writeDescJson(actionDir, "delete test");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_delete"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_delete"));
Files.deleteIfExists(actionDir.resolve("run.py"));
waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action_delete"), 2000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_delete"));
writeRunFile(actionDir);
writeDescJson(actionDir, "delete test restore");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_delete"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_delete"));
deleteDirectory(actionDir);
waitForCondition(() -> !existedMetaActions.containsKey("local::demo_action_delete"), 2000);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_delete"));
} finally {
executor.shutdownNow();
}
}
@Test
void testDynamicWatchInvalidDescRecovery(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
Thread.sleep(100);
try {
Path dynamicRoot = tempDir.resolve("action").resolve("dynamic");
Path actionDir = dynamicRoot.resolve("demo_action_invalid");
Files.createDirectories(actionDir);
writeRunFile(actionDir);
writeInvalidDescJson(actionDir);
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_invalid"), 500);
Assertions.assertFalse(existedMetaActions.containsKey("local::demo_action_invalid"));
writeDescJson(actionDir, "fixed");
waitForCondition(() -> existedMetaActions.containsKey("local::demo_action_invalid"), 2000);
Assertions.assertTrue(existedMetaActions.containsKey("local::demo_action_invalid"));
} finally {
executor.shutdownNow();
}
}
}
@Nested
class DescMcpTest {
@Test
void testDescMcpWatchCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
String actionKey = "local::desc_action";
existedMetaActions.put(actionKey, buildMetaActionInfo("base"));
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path descDir = tempDir.resolve("action").resolve("mcp").resolve("desc");
Files.createDirectories(descDir);
writeDescMcpJson(descDir, actionKey, "v1");
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null && "v1".equals(info.getDescription());
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertEquals("v1", info.getDescription());
Assertions.assertTrue(info.isIo());
Assertions.assertTrue(info.isStrictDependencies());
Assertions.assertFalse(info.getTags().isEmpty());
writeDescMcpJson(descDir, actionKey, "v2");
waitForCondition(() -> {
MetaActionInfo current = getMetaActionInfo(existedMetaActions, actionKey);
return current != null && "v2".equals(current.getDescription());
}, 2000);
info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertEquals("v2", info.getDescription());
Files.deleteIfExists(descDir.resolve(actionKey + ".desc.json"));
waitForCondition(() -> {
MetaActionInfo current = getMetaActionInfo(existedMetaActions, actionKey);
return current != null
&& !current.isIo()
&& !current.isStrictDependencies()
&& current.getTags().isEmpty()
&& current.getPreActions().isEmpty()
&& current.getPostActions().isEmpty();
}, 2000);
info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertFalse(info.isIo());
Assertions.assertFalse(info.isStrictDependencies());
Assertions.assertTrue(info.getTags().isEmpty());
Assertions.assertTrue(info.getPreActions().isEmpty());
Assertions.assertTrue(info.getPostActions().isEmpty());
} finally {
executor.shutdownNow();
}
}
@Test
void testDescMcpInvalidJsonRecovery(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
String actionKey = "local::desc_invalid";
existedMetaActions.put(actionKey, buildMetaActionInfo("base"));
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path descDir = tempDir.resolve("action").resolve("mcp").resolve("desc");
Files.createDirectories(descDir);
writeInvalidDescMcpJson(descDir, actionKey);
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null
&& !info.isIo()
&& !info.isStrictDependencies()
&& info.getTags().isEmpty()
&& info.getPreActions().isEmpty()
&& info.getPostActions().isEmpty();
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertFalse(info.isIo());
Assertions.assertFalse(info.isStrictDependencies());
Assertions.assertTrue(info.getTags().isEmpty());
Assertions.assertTrue(info.getPreActions().isEmpty());
Assertions.assertTrue(info.getPostActions().isEmpty());
writeDescMcpJson(descDir, actionKey, "fixed");
waitForCondition(() -> {
MetaActionInfo current = getMetaActionInfo(existedMetaActions, actionKey);
return current != null && "fixed".equals(current.getDescription());
}, 2000);
info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertEquals("fixed", info.getDescription());
Assertions.assertTrue(info.isIo());
Assertions.assertTrue(info.isStrictDependencies());
} finally {
executor.shutdownNow();
}
}
@Test
void testDescMcpIgnoreInvalidFileName(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
String actionKey = "local::desc_ignore";
existedMetaActions.put(actionKey, buildMetaActionInfo("base"));
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path descDir = tempDir.resolve("action").resolve("mcp").resolve("desc");
Files.createDirectories(descDir);
Files.writeString(descDir.resolve("local-desc.desc.json"), "{ \"description\": \"bad\" }");
Files.writeString(descDir.resolve(actionKey + ".json"), "{ \"description\": \"bad\" }");
waitForCondition(() -> existedMetaActions.size() > 1, 500);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertEquals("base", info.getDescription());
Assertions.assertTrue(info.isIo());
Assertions.assertEquals(1, existedMetaActions.size());
} finally {
executor.shutdownNow();
}
}
@Test
void testDescMcpNoActionKeyPresent(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path descDir = tempDir.resolve("action").resolve("mcp").resolve("desc");
Files.createDirectories(descDir);
String actionKey = "local::missing_action";
writeDescMcpJson(descDir, actionKey, "desc");
waitForCondition(() -> existedMetaActions.containsKey(actionKey), 500);
Assertions.assertFalse(existedMetaActions.containsKey(actionKey));
} finally {
executor.shutdownNow();
}
}
@Test
void testDescMcpRapidCreateDelete(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
String actionKey = "local::desc_rapid";
existedMetaActions.put(actionKey, buildMetaActionInfo("base"));
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path descDir = tempDir.resolve("action").resolve("mcp").resolve("desc");
Files.createDirectories(descDir);
writeDescMcpJson(descDir, actionKey, "v1");
Files.deleteIfExists(descDir.resolve(actionKey + ".desc.json"));
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null
&& !info.isIo()
&& !info.isStrictDependencies()
&& info.getTags().isEmpty()
&& info.getPreActions().isEmpty()
&& info.getPostActions().isEmpty();
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertFalse(info.isIo());
Assertions.assertFalse(info.isStrictDependencies());
Assertions.assertTrue(info.getTags().isEmpty());
Assertions.assertTrue(info.getPreActions().isEmpty());
Assertions.assertTrue(info.getPostActions().isEmpty());
} finally {
executor.shutdownNow();
}
}
@Test
void testDescMcpRapidDeleteCreate(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
String actionKey = "local::desc_rapid_restore";
existedMetaActions.put(actionKey, buildMetaActionInfo("base"));
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path descDir = tempDir.resolve("action").resolve("mcp").resolve("desc");
Files.createDirectories(descDir);
writeDescMcpJson(descDir, actionKey, "v1");
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null && "v1".equals(info.getDescription());
}, 2000);
Files.deleteIfExists(descDir.resolve(actionKey + ".desc.json"));
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null
&& !info.isIo()
&& !info.isStrictDependencies()
&& info.getTags().isEmpty()
&& info.getPreActions().isEmpty()
&& info.getPostActions().isEmpty();
}, 2000);
writeDescMcpJson(descDir, actionKey, "v2");
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null && "v2".equals(info.getDescription());
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertEquals("v2", info.getDescription());
} finally {
executor.shutdownNow();
}
}
@Test
void testDescMcpDirDeleteRecreate(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
String actionKey = "local::desc_dir_restore";
existedMetaActions.put(actionKey, buildMetaActionInfo("base"));
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path descDir = tempDir.resolve("action").resolve("mcp").resolve("desc");
Files.createDirectories(descDir);
writeDescMcpJson(descDir, actionKey, "v1");
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null && "v1".equals(info.getDescription());
}, 2000);
Files.deleteIfExists(descDir.resolve(actionKey + ".desc.json"));
deleteDirectory(descDir);
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null
&& !info.isIo()
&& !info.isStrictDependencies()
&& info.getTags().isEmpty()
&& info.getPreActions().isEmpty()
&& info.getPostActions().isEmpty();
}, 2000);
Files.createDirectories(descDir);
writeDescMcpJson(descDir, actionKey, "v2");
waitForCondition(() -> {
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
return info != null && "v2".equals(info.getDescription());
}, 2000);
MetaActionInfo info = getMetaActionInfo(existedMetaActions, actionKey);
Assertions.assertNotNull(info);
Assertions.assertEquals("v2", info.getDescription());
} finally {
executor.shutdownNow();
}
}
}
@Nested
class CommonMcpTest {
@Test
void testCommonMcpInitialLoad(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
Path mcpDir = tempDir.resolve("action").resolve("mcp");
Files.createDirectories(mcpDir);
Path configFile = mcpDir.resolve("servers.json");
String config = buildCommonMcpConfig(
buildStdioServerEntry("mcp-deepwiki", "mcp-deepwiki@latest")
);
writeCommonMcpConfig(configFile, config);
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")), 20000);
Assertions.assertTrue(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")));
} finally {
executor.shutdownNow();
}
}
@Test
void testCommonMcpCreateModifyDelete(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path mcpDir = tempDir.resolve("action").resolve("mcp");
Files.createDirectories(mcpDir);
Path configFile = mcpDir.resolve("servers.json");
String config = buildCommonMcpConfig(
buildStdioServerEntry("mcp-deepwiki", "mcp-deepwiki@latest")
);
writeCommonMcpConfig(configFile, config);
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")), 20000);
Assertions.assertTrue(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")));
String updatedConfig = buildCommonMcpConfig(
buildStdioServerEntry("mcp-deepwiki", "mcp-deepwiki@latest"),
buildStdioServerEntry("playwright", "@playwright/mcp@latest")
);
writeCommonMcpConfig(configFile, updatedConfig);
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")), 20000);
Assertions.assertTrue(hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")));
Files.deleteIfExists(configFile);
waitForCondition(() -> !hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")), 20000);
Assertions.assertFalse(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")));
Assertions.assertFalse(hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")));
} finally {
executor.shutdownNow();
}
}
@Test
void testCommonMcpRemoveEntryFromConfig(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path mcpDir = tempDir.resolve("action").resolve("mcp");
Files.createDirectories(mcpDir);
Path configFile = mcpDir.resolve("servers.json");
String config = buildCommonMcpConfig(
buildStdioServerEntry("mcp-deepwiki", "mcp-deepwiki@latest"),
buildStdioServerEntry("playwright", "@playwright/mcp@latest")
);
writeCommonMcpConfig(configFile, config);
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")), 20000);
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")), 20000);
Assertions.assertTrue(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")));
Assertions.assertTrue(hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")));
String updatedConfig = buildCommonMcpConfig(
buildStdioServerEntry("mcp-deepwiki", "mcp-deepwiki@latest")
);
writeCommonMcpConfig(configFile, updatedConfig);
waitForCondition(() -> !hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")), 20000);
Assertions.assertFalse(hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")));
Assertions.assertTrue(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")));
} finally {
executor.shutdownNow();
}
}
@Test
void testCommonMcpInvalidJsonRecovery(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path mcpDir = tempDir.resolve("action").resolve("mcp");
Files.createDirectories(mcpDir);
Path configFile = mcpDir.resolve("servers.json");
writeCommonMcpConfig(configFile, "{ invalid json");
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")), 2000);
Assertions.assertFalse(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")));
String config = buildCommonMcpConfig(
buildStdioServerEntry("mcp-deepwiki", "mcp-deepwiki@latest")
);
writeCommonMcpConfig(configFile, config);
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")), 20000);
Assertions.assertTrue(hasActionKey(existedMetaActions, key -> key.startsWith("mcp-deepwiki::")));
} finally {
executor.shutdownNow();
}
}
}
@Nested
class DoRunTest {
@Test
void testDoRunWithOriginUnknownExt(@TempDir Path tempDir) throws IOException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path script = tempDir.resolve("run");
Files.writeString(script, "echo ok\n");
MetaAction metaAction = buildMetaAction(MetaAction.Type.ORIGIN, script.toString(), "run", Map.of());
RunnerClient.RunnerResponse response = client.doRun(metaAction);
Assertions.assertNotNull(response);
Assertions.assertFalse(response.isOk());
Assertions.assertEquals("未知文件类型", response.getData());
} finally {
executor.shutdownNow();
}
}
@Test
void testDoRunWithOriginScriptSuccess(@TempDir Path tempDir) throws IOException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
Path script = tempDir.resolve("run.sh");
Files.writeString(script, "echo ok\n");
MetaAction metaAction = buildMetaAction(MetaAction.Type.ORIGIN, script.toString(), "run", Map.of());
RunnerClient.RunnerResponse response = client.doRun(metaAction);
Assertions.assertNotNull(response);
Assertions.assertTrue(response.isOk());
Assertions.assertTrue(response.getData().contains("ok"));
} finally {
executor.shutdownNow();
}
}
@Test
void testDoRunWithMcpMissingClient(@TempDir Path tempDir) {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
MetaAction metaAction = buildMetaAction(MetaAction.Type.MCP, "missing-client", "missing-tool", Map.of());
RunnerClient.RunnerResponse response = client.doRun(metaAction);
Assertions.assertNotNull(response);
Assertions.assertFalse(response.isOk());
} finally {
executor.shutdownNow();
}
}
@Test
void testDoRunViaRunnerClient(@TempDir Path tempDir) {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
RunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
MetaAction metaAction = buildMetaAction(MetaAction.Type.MCP, "missing-client", "missing-tool", Map.of());
client.submit(metaAction);
Assertions.assertNotNull(metaAction.getResult().getData());
} finally {
executor.shutdownNow();
}
}
@Test
void testDoRunWithMcpLoadedFromCommonConfig(@TempDir Path tempDir) throws IOException, InterruptedException {
ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
Path mcpDir = tempDir.resolve("action").resolve("mcp");
Files.createDirectories(mcpDir);
Path configFile = mcpDir.resolve("servers.json");
String config = buildCommonMcpConfig(
buildStdioServerEntry("playwright", "@playwright/mcp@latest")
);
writeCommonMcpConfig(configFile, config);
LocalRunnerClient client = new LocalRunnerClient(existedMetaActions, executor, tempDir.toString());
try {
waitForCondition(() -> hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")), 20000);
Assertions.assertTrue(hasActionKey(existedMetaActions, key -> key.startsWith("playwright::")));
MetaAction metaAction = buildMetaAction(MetaAction.Type.MCP, "playwright", "browser_navigate", Map.of("url", "https://deepwiki.com/microsoft/vscode"));
client.submit(metaAction);
Assertions.assertNotEquals(MetaAction.Result.Status.WAITING, metaAction.getResult().getStatus());
} finally {
executor.shutdownNow();
}
}
}
}

View File

@@ -0,0 +1,45 @@
package work.slhaf.partner.core.action.runner;
import com.alibaba.fastjson2.JSONObject;
import work.slhaf.partner.core.action.entity.ActionFileMetaData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
public class RunnerClientTest {
private static class TestRunnerClient extends RunnerClient {
public TestRunnerClient() {
super(new ConcurrentHashMap<>(), Executors.newVirtualThreadPerTaskExecutor(), null);
}
@Override
protected RunnerResponse doRun(MetaAction metaAction) {
return null;
}
@Override
public String buildTmpPath(String actionKey, String codeType) {
return null;
}
@Override
public void tmpSerialize(MetaAction tempAction, String code, String codeType) throws IOException {
}
@Override
public void persistSerialize(MetaActionInfo metaActionInfo, ActionFileMetaData fileMetaData) {
}
@Override
public JSONObject listSysDependencies() {
return null;
}
}
}

View File

@@ -0,0 +1,453 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.*;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.CorrectorResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult;
import java.util.*;
import java.util.concurrent.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
/**
* 测试矩阵(与文档一致):
* 1) 单行动-单阶段-单MetaAction成功已覆盖
* 2) 多行动并发执行(未覆盖:需并发稳定性/线程调度控制)
* 3) status 非 PREPARE 直接返回(已覆盖)
* 4) 多阶段顺序执行(已覆盖)
* 5) IO 行动使用虚拟线程池(已覆盖)
* 6) extractor 失败 -> repairer OK -> 再成功(已覆盖)
* 7) extractor 失败 -> repairer FAILED已覆盖
* 8) extractor 失败 -> repairer ACQUIRE 阻塞后恢复(已覆盖)
* 9) runnerClient.submit 抛异常(未覆盖:需更精细的异常吞吐与线程结束校验)
* 10) paramsExtractor.execute 抛异常(未覆盖:与 #9 类似,需更精细的异常吞吐校验)
* 11) corrector.execute 抛异常导致资源未清理(已标记已知缺陷,@Disabled
* 12) actionChain 为空导致异常与泄漏(已标记已知缺陷,@Disabled
* 13) metaActions 为空导致 awaitAdvance 阻塞(未覆盖:更适合集成/压测)
* 17) result 状态不更新导致循环不退出(未覆盖:更适合集成/压测)
* 18) 同 stage 多 metaAction 并发完成顺序不固定(未覆盖:更适合集成/压测)
*/
@SuppressWarnings("unused")
@Slf4j
@ExtendWith(MockitoExtension.class)
class ActionExecutorTest {
@Mock
ActionCapability actionCapability;
@Mock
MemoryCapability memoryCapability;
@Mock
CognationCapability cognationCapability;
@Mock
ParamsExtractor paramsExtractor;
@Mock
ActionRepairer actionRepairer;
@Mock
ActionCorrector actionCorrector;
@Mock
RunnerClient runnerClient;
@InjectMocks
ActionExecutor actionExecutor;
@BeforeEach
void setUp() {
lenient().when(cognationCapability.getChatMessages()).thenReturn(Collections.emptyList());
lenient().when(memoryCapability.getActivatedSlices(anyString())).thenReturn(Collections.emptyList());
lenient().when(actionCapability.putPhaserRecord(any(Phaser.class), any(ExecutableAction.class)))
.thenAnswer(inv -> new PhaserRecord(inv.getArgument(0), inv.getArgument(1)));
lenient().when(actionCapability.loadMetaActionInfo(anyString())).thenAnswer(inv -> {
MetaActionInfo info = new MetaActionInfo();
info.setDescription("desc");
info.setParams(Collections.emptyMap());
return info;
});
CorrectorResult correctorResult = new CorrectorResult();
correctorResult.setMetaInterventionList(Collections.emptyList());
lenient().when(actionCorrector.execute(any())).thenReturn(correctorResult);
lenient().doNothing().when(actionCapability).handleInterventions(any(), any());
}
// 场景1B1 -> B3 -> B4 -> B7(成功) -> B10。目的验证正常主路径与资源清理。
@Test
void execute_singleAction_singleStage_success() {
ExecutorService directExecutor = new DirectExecutorService();
stubExecutors(directExecutor, directExecutor);
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult extractorResult = new ExtractorResult();
extractorResult.setOk(true);
when(paramsExtractor.execute(any())).thenReturn(extractorResult);
doAnswer(inv -> {
MetaAction metaAction = inv.getArgument(0);
metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS);
return null;
}).when(runnerClient).submit(any(MetaAction.class));
actionExecutor.init();
actionExecutor.execute(input);
verify(runnerClient, times(1)).submit(any(MetaAction.class));
verify(actionCapability, times(1)).removePhaserRecord(any(Phaser.class));
assertEquals(ExecutableAction.Status.SUCCESS, actionData.getStatus());
assertEquals(1, actionData.getHistory().get(0).size());
}
// 场景3B1 -> B2。目的验证非 PREPARE 不执行任何子任务。
@Test
void execute_statusNotPrepare_shouldSkip() {
ExecutorService directExecutor = new DirectExecutorService();
stubExecutors(directExecutor, directExecutor);
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
actionData.setStatus(ExecutableAction.Status.EXECUTING);
ActionExecutorInput input = buildInput("u1", actionData);
actionExecutor.init();
actionExecutor.execute(input);
verify(actionCapability, never()).putPhaserRecord(any(Phaser.class), any(ExecutableAction.class));
verify(runnerClient, never()).submit(any(MetaAction.class));
}
// 场景4B1 -> B3 -> B4(两轮) -> B7(成功) -> B10。目的验证多阶段顺序执行。
@Test
void execute_multiStage_success() {
ExecutorService platformExecutor = Executors.newFixedThreadPool(4);
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
stubExecutors(platformExecutor, virtualExecutor);
Map<Integer, List<MetaAction>> chain = new HashMap<>();
chain.put(0, List.of(buildMetaAction("a1", false)));
chain.put(1, List.of(buildMetaAction("a2", false)));
ImmediateExecutableAction actionData = buildActionData(chain);
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult extractorResult = new ExtractorResult();
extractorResult.setOk(true);
when(paramsExtractor.execute(any())).thenReturn(extractorResult);
doAnswer(inv -> {
MetaAction metaAction = inv.getArgument(0);
metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS);
log.info("metaAction result:{}", metaAction.getResult().getStatus());
return null;
}).when(runnerClient).submit(any(MetaAction.class));
actionExecutor.init();
actionExecutor.execute(input);
verify(runnerClient, timeout(5000).times(2)).submit(any(MetaAction.class));
verify(actionCorrector, timeout(5000).times(2)).execute(any());
assertEquals(2, actionData.getHistory().size());
assertEquals(ExecutableAction.Status.SUCCESS, actionData.getStatus());
}
// 场景5B4.2。目的:验证 IO 行动使用虚拟线程池。
@Test
void execute_ioMetaAction_usesVirtualExecutor() {
ExecutorService platformExecutor = Executors.newFixedThreadPool(4);
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateExecutableAction actionData = buildActionData(singleStageChain(true));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult extractorResult = new ExtractorResult();
extractorResult.setOk(true);
lenient().when(paramsExtractor.execute(any())).thenReturn(extractorResult);
lenient().doAnswer(inv -> {
MetaAction metaAction = inv.getArgument(0);
metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS);
return null;
}).when(runnerClient).submit(any(MetaAction.class));
actionExecutor.init();
actionExecutor.execute(input);
verify(actionCapability, times(1)).getExecutor(ActionCore.ExecutorType.VIRTUAL);
shutdownExecutor(virtualExecutor);
}
// 场景6B7.2(失败) -> repairer OK -> B7(成功)。目的:验证修复后成功与上下文追加。
@Test
void execute_extractorFail_thenRepairOk_thenSuccess() {
ExecutorService platformExecutor = Executors.newFixedThreadPool(4);
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult fail = new ExtractorResult();
fail.setOk(false);
ExtractorResult ok = new ExtractorResult();
ok.setOk(true);
when(paramsExtractor.execute(any())).thenReturn(fail, ok);
RepairerResult repairerResult = new RepairerResult();
repairerResult.setStatus(RepairerResult.RepairerStatus.OK);
repairerResult.setFixedData(List.of("fix-1"));
when(actionRepairer.execute(any())).thenReturn(repairerResult);
doAnswer(inv -> {
MetaAction metaAction = inv.getArgument(0);
metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS);
return null;
}).when(runnerClient).submit(any(MetaAction.class));
actionExecutor.init();
actionExecutor.execute(input);
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
assertEquals(1, actionData.getAdditionalContext().get(0).size());
verify(runnerClient, timeout(5000).times(1)).submit(any(MetaAction.class));
}
// 场景7B7.2(失败) -> repairer FAILED。目的验证失败分支不提交外部执行。
@Test
void execute_extractorFail_thenRepairFailed() {
ExecutorService platformExecutor = Executors.newFixedThreadPool(4);
ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult fail = new ExtractorResult();
fail.setOk(false);
when(paramsExtractor.execute(any())).thenReturn(fail);
RepairerResult repairerResult = new RepairerResult();
repairerResult.setStatus(RepairerResult.RepairerStatus.FAILED);
when(actionRepairer.execute(any())).thenReturn(repairerResult);
actionExecutor.init();
actionExecutor.execute(input);
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
MetaAction metaAction = actionData.getActionChain().get(0).get(0);
assertEquals(MetaAction.Result.Status.FAILED, metaAction.getResult().getStatus());
verify(runnerClient, never()).submit(any(MetaAction.class));
}
// 场景8B7.2(ACQUIRE) -> interrupt 阻塞 -> 状态恢复 -> B7(成功)。目的:验证阻塞可恢复且不死锁。
@Test
@Timeout(3)
void execute_extractorFail_thenAcquire_thenResume() throws Exception {
ExecutorService platformExecutor = Executors.newCachedThreadPool();
ExecutorService virtualExecutor = Executors.newCachedThreadPool();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult fail = new ExtractorResult();
fail.setOk(false);
ExtractorResult ok = new ExtractorResult();
ok.setOk(true);
when(paramsExtractor.execute(any())).thenReturn(fail, ok);
RepairerResult repairerResult = new RepairerResult();
repairerResult.setStatus(RepairerResult.RepairerStatus.ACQUIRE);
when(actionRepairer.execute(any())).thenReturn(repairerResult);
doAnswer(inv -> {
MetaAction metaAction = inv.getArgument(0);
metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS);
return null;
}).when(runnerClient).submit(any(MetaAction.class));
CountDownLatch doneLatch = new CountDownLatch(1);
doAnswer(inv -> {
doneLatch.countDown();
return null;
}).when(actionCapability).removePhaserRecord(any(Phaser.class));
ExecutorService resumeExecutor = Executors.newSingleThreadExecutor();
resumeExecutor.execute(() -> {
while (actionData.getStatus() != ExecutableAction.Status.INTERRUPTED) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
actionData.setStatus(ExecutableAction.Status.EXECUTING);
});
actionExecutor.init();
actionExecutor.execute(input);
assertTrue(doneLatch.await(2, TimeUnit.SECONDS));
shutdownExecutor(platformExecutor);
shutdownExecutor(virtualExecutor);
shutdownExecutor(resumeExecutor);
}
// 场景11B4.4 异常 -> 资源未清理(已知缺陷)。目的:暴露当前行为。
// @Disabled("known-issue: corrector 抛异常时未清理 phaser 记录")
// @Tag("known-issue")
@Test
void execute_correctorThrows_shouldLeakPhaserRecord() {
ExecutorService platformExecutor = Executors.newCachedThreadPool();
ExecutorService virtualExecutor = Executors.newCachedThreadPool();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateExecutableAction actionData = buildActionData(singleStageChain(false));
ActionExecutorInput input = buildInput("u1", actionData);
ExtractorResult ok = new ExtractorResult();
ok.setOk(true);
lenient().when(paramsExtractor.execute(any())).thenReturn(ok);
lenient().doAnswer(inv -> {
MetaAction metaAction = inv.getArgument(0);
metaAction.getResult().setStatus(MetaAction.Result.Status.SUCCESS);
return null;
}).when(runnerClient).submit(any(MetaAction.class));
lenient().doThrow(new RuntimeException("boom")).when(actionCorrector).execute(any());
actionExecutor.init();
actionExecutor.execute(input);
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
verify(actionCapability).removePhaserRecord(any(Phaser.class));
}
// 场景12B4.1 actionChain 为空导致异常(已知缺陷)。目的:暴露当前行为。
@Disabled("known-issue: actionChain 为空导致 IndexOutOfBounds 与资源未清理")
@Tag("known-issue")
@Test
void execute_emptyActionChain_shouldFail() {
ExecutorService platformExecutor = Executors.newCachedThreadPool();
ExecutorService virtualExecutor = Executors.newCachedThreadPool();
stubExecutors(platformExecutor, virtualExecutor);
ImmediateExecutableAction actionData = buildActionData(new HashMap<>());
ActionExecutorInput input = buildInput("u1", actionData);
actionExecutor.init();
actionExecutor.execute(input);
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
}
private void stubExecutors(ExecutorService platformExecutor, ExecutorService virtualExecutor) {
when(actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM)).thenReturn(platformExecutor);
when(actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL)).thenReturn(virtualExecutor);
when(actionCapability.runnerClient()).thenReturn(runnerClient);
}
private ActionExecutorInput buildInput(String userId, ImmediateExecutableAction actionData) {
return new ActionExecutorInput(Set.of(actionData));
}
private ImmediateExecutableAction buildActionData(Map<Integer, List<MetaAction>> actionChain) {
val immediateActionData = new ImmediateExecutableAction(
"tendency",
actionChain,
"reason",
"desc",
"source"
);
immediateActionData.getAdditionalContext().putAll(initAdditionalContext(actionChain));
return immediateActionData;
}
private Map<Integer, List<MetaAction>> singleStageChain(boolean io) {
Map<Integer, List<MetaAction>> chain = new HashMap<>();
chain.put(0, List.of(buildMetaAction("a1", io)));
return chain;
}
private MetaAction buildMetaAction(String name, boolean io) {
return new MetaAction(
name,
io,
MetaAction.Type.ORIGIN,
"location"
);
}
private Map<Integer, List<String>> initAdditionalContext(Map<Integer, List<MetaAction>> actionChain) {
Map<Integer, List<String>> context = new HashMap<>();
for (Integer stage : actionChain.keySet()) {
context.put(stage, new ArrayList<>());
}
return context;
}
private void shutdownExecutor(ExecutorService executor) {
executor.shutdownNow();
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
}
}
private static final class DirectExecutorService extends AbstractExecutorService {
private volatile boolean shutdown;
@Override
public void shutdown() {
shutdown = true;
}
@Override
public List<Runnable> shutdownNow() {
shutdown = true;
return Collections.emptyList();
}
@Override
public boolean isShutdown() {
return shutdown;
}
@Override
public boolean isTerminated() {
return shutdown;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return true;
}
@Override
public void execute(Runnable command) {
command.run();
}
}
}

View File

@@ -0,0 +1,316 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.ArgumentMatchers.any
import org.mockito.InjectMocks
import org.mockito.Mock
import org.mockito.Mockito
import org.mockito.Mockito.times
import org.mockito.Mockito.verify
import org.mockito.junit.jupiter.MockitoExtension
import org.slf4j.LoggerFactory
import work.slhaf.partner.core.action.ActionCapability
import work.slhaf.partner.core.action.entity.ExecutableAction
import work.slhaf.partner.core.action.entity.Schedulable
import work.slhaf.partner.core.action.entity.SchedulableExecutableAction
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput
import work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
/**
* ActionScheduler.execute(...) 测试矩阵控制流入口execute
*
* 场景编号与矩阵对应:
* 1) null 入参早退B1
* 2) PREPARE + ONCE 合法时间入轮B2 → B2.3
* 3) 非 PREPARE 状态忽略B2 → B2.1
* 4) ONCE 过期/跨日解析失败B2 → B2.2
* 5) CYCLE cron 非法解析失败B2 → B2.2
* 6) putAction 异常传播B2 异常中断)
* 7) 同小时调度触发 ACTIVEB2.3 + 状态变更)
* 15) 混合输入(成功/失败/忽略路径混合)
*
* 以下矩阵场景因并发/时间依赖难以稳定复现,仅在文档中标注,不在本类实现:
* 8) withTimeout 超时导致协程取消
* 9) tick 触发 onTrigger 并调用 ActionExecutor
* 10) tick step<=0 空转延迟
* 11) loadActions 跨小时修复
* 13) actionExecutor 阻塞导致调度延迟
* 14) schedule 与 tick 并发访问竞态
*/
@ExtendWith(MockitoExtension::class)
class ActionSchedulerTest {
@Mock
private lateinit var actionExecutor: ActionExecutor
@Mock
private lateinit var actionCapability: ActionCapability
@InjectMocks
private lateinit var actionScheduler: ActionScheduler
companion object {
val log = LoggerFactory.getLogger(ActionSchedulerTest::class.java)!!
}
@Test
fun `running test`() {
fun buildAction(time: ZonedDateTime): SchedulableExecutableAction {
return SchedulableExecutableAction(
"tendency",
mutableMapOf(),
"reason",
"description",
"source",
Schedulable.ScheduleType.ONCE,
time.toString()
)
}
val now = ZonedDateTime.now()
val actions = setOf(
buildAction(now.plusMinutes(1)),
buildAction(now.truncatedTo(ChronoUnit.HOURS).plusHours(1).minusSeconds(1)),
)
Mockito.`when`(actionCapability.listActions(null, null))
.thenReturn(actions as Set<ExecutableAction>)
Mockito.`when`(actionExecutor.execute(any()))
.thenAnswer {
val input = it.arguments[0] as ActionExecutorInput
for (actionData in input.actions) {
log.info("Executed action $actionData at ${ZonedDateTime.now()}")
}
null
}
actionScheduler.init()
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ActionSchedulerTest"))
scope.launch {
actionScheduler.execute(
setOf(
buildAction(now.plusSeconds(5)),
)
)
}
readlnOrNull()
}
@Test
fun `execute with null input should return null and no side effects`() {
// 场景编号1路径B1目的验证正常早退
val result = actionScheduler.execute(null)
assertEquals(null, result)
verify(actionCapability, Mockito.never()).putAction(any(ExecutableAction::class.java))
}
@Test
fun `execute should put action and schedule valid ONCE prepare action`() {
// 场景编号2路径B2 → B2.3;目的:验证正常入轮与副作用
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = Schedulable.ScheduleType.ONCE,
ZonedDateTime.now().plusHours(1).toString()
)
actionScheduler.execute(setOf(action))
verify(actionCapability, times(1)).putAction(action)
val timeWheel = timeWheel()
val bucket = actionsGroupByHour(timeWheel)[action.scheduleContentHour()]
assertTrue(bucket.contains(action))
}
@Test
fun `execute should ignore non-prepare action for scheduling`() {
// 场景编号3路径B2 → B2.1;目的:验证忽略非 PREPARE 状态
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = Schedulable.ScheduleType.ONCE
)
actionScheduler.execute(setOf(action))
verify(actionCapability, times(1)).putAction(action)
val allScheduled = allScheduledActions(timeWheel())
assertFalse(allScheduled.contains(action))
}
@Test
fun `execute should skip expired ONCE action`() {
// 场景编号4路径B2 → B2.2;目的:验证解析失败被跳过
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = Schedulable.ScheduleType.ONCE
)
actionScheduler.execute(setOf(action))
val allScheduled = allScheduledActions(timeWheel())
assertFalse(allScheduled.contains(action))
}
@Test
fun `execute should skip invalid CYCLE cron`() {
// 场景编号5路径B2 → B2.2;目的:验证 cron 解析失败被跳过
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = Schedulable.ScheduleType.CYCLE,
scheduleContentOverride = "invalid-cron"
)
actionScheduler.execute(setOf(action))
val allScheduled = allScheduledActions(timeWheel())
assertFalse(allScheduled.contains(action))
}
@Test
fun `execute should propagate exception from putAction`() {
// 场景编号6路径B2 异常中断;目的:验证异常传播
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = Schedulable.ScheduleType.ONCE
)
Mockito.doThrow(RuntimeException("boom"))
.`when`(actionCapability)
.putAction(action)
assertThrows(RuntimeException::class.java) {
actionScheduler.execute(setOf(action))
}
}
@Test
fun `execute should activate wheel when scheduling current hour`() {
// 场景编号7路径B2.3;目的:验证同小时调度触发 ACTIVE
initTimeWheelWithPrimaryActions(emptySet())
val action = buildScheduledAction(
type = Schedulable.ScheduleType.ONCE,
scheduleContentOverride = ZonedDateTime.now().plusMinutes(2).toString()
)
val timeWheel = timeWheel()
val actionHour = action.scheduleContentHour()
setCurrentHour(timeWheel, actionHour)
setWheelState(timeWheel, "SLEEPING")
actionScheduler.execute(setOf(action))
assertEquals("ACTIVE", wheelStateName(timeWheel))
}
@Test
fun `execute should handle mixed actions consistently`() {
// 场景编号15路径B2 + B2.1/B2.2/B2.3;目的:验证混合输入行为
initTimeWheelWithPrimaryActions(emptySet())
val ok = buildScheduledAction(
type = Schedulable.ScheduleType.ONCE,
scheduleContentOverride = ZonedDateTime.now().plusMinutes(2).toString()
)
val nonPrepare = buildScheduledAction(
type = Schedulable.ScheduleType.ONCE,
scheduleContentOverride = ZonedDateTime.now().plusMinutes(2).toString()
)
nonPrepare.status = ExecutableAction.Status.FAILED
val invalid = buildScheduledAction(
type = Schedulable.ScheduleType.CYCLE,
scheduleContentOverride = "invalid-cron"
)
actionScheduler.execute(setOf(ok, nonPrepare, invalid))
verify(actionCapability, times(1)).putAction(ok)
verify(actionCapability, times(1)).putAction(nonPrepare)
verify(actionCapability, times(1)).putAction(invalid)
val allScheduled = allScheduledActions(timeWheel())
assertTrue(allScheduled.contains(ok))
assertFalse(allScheduled.contains(nonPrepare))
assertFalse(allScheduled.contains(invalid))
}
private fun initTimeWheelWithPrimaryActions(actions: Set<SchedulableExecutableAction>) {
@Suppress("UNCHECKED_CAST")
Mockito.`when`(actionCapability.listActions(null, null))
.thenReturn(actions as Set<ExecutableAction>)
actionScheduler.init()
}
private fun buildScheduledAction(
type: Schedulable.ScheduleType,
scheduleContentOverride: String? = null
): SchedulableExecutableAction {
val action = SchedulableExecutableAction(
"test",
mutableMapOf(),
"reason",
"description",
"test",
type,
scheduleContentOverride ?: scheduleContentOverride.toString()
)
return action
}
private fun SchedulableExecutableAction.scheduleContentHour(): Int {
return ZonedDateTime.parse(this.scheduleContent).hour
}
private fun timeWheel(): Any {
val field = actionScheduler.javaClass.getDeclaredField("timeWheel")
field.isAccessible = true
return field.get(actionScheduler)
}
@Suppress("UNCHECKED_CAST")
private fun actionsGroupByHour(timeWheel: Any): Array<MutableSet<SchedulableExecutableAction>> {
val field = timeWheel.javaClass.getDeclaredField("actionsGroupByHour")
field.isAccessible = true
return field.get(timeWheel) as Array<MutableSet<SchedulableExecutableAction>>
}
private fun allScheduledActions(timeWheel: Any): Set<SchedulableExecutableAction> {
val result = linkedSetOf<SchedulableExecutableAction>()
for (bucket in actionsGroupByHour(timeWheel)) {
result.addAll(bucket)
}
return result
}
private fun setCurrentHour(timeWheel: Any, hour: Int) {
val field = timeWheel.javaClass.getDeclaredField("currentHour")
field.isAccessible = true
field.setInt(timeWheel, hour)
}
private fun setWheelState(timeWheel: Any, name: String) {
val field = timeWheel.javaClass.getDeclaredField("state")
field.isAccessible = true
@Suppress("UNCHECKED_CAST")
val state = field.get(timeWheel) as MutableStateFlow<Any>
state.value = wheelStateEnum(name)
}
private fun wheelStateName(timeWheel: Any): String {
val field = timeWheel.javaClass.getDeclaredField("state")
field.isAccessible = true
val state = field.get(timeWheel) as MutableStateFlow<*>
val value = state.value as Enum<*>
return value.name
}
private fun wheelStateEnum(name: String): Any {
@Suppress("UNCHECKED_CAST")
val clazz = Class.forName(
$$"work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler$TimeWheel$WheelState"
) as Class<out Enum<*>>
return java.lang.Enum.valueOf(clazz, name)
}
}