refactor(action): replace HashMap with ConcurrentHashMap for thread-safe MetaAction storage

This commit is contained in:
2025-12-19 23:30:27 +08:00
parent 1f5509c17d
commit 9762739138
7 changed files with 16 additions and 14 deletions

View File

@@ -17,6 +17,7 @@ import work.slhaf.partner.core.action.runner.SandboxRunnerClient;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
@@ -53,7 +54,7 @@ public class ActionCore extends PartnerCore<ActionCore> {
/** /**
* 已存在的行动程序,键格式为‘<MCP-ServerName>::<Tool-Name>’,值为 MCP Server 通过 Resources 相关渠道传递的行动程序元信息 * 已存在的行动程序,键格式为‘<MCP-ServerName>::<Tool-Name>’,值为 MCP Server 通过 Resources 相关渠道传递的行动程序元信息
*/ */
private final Map<String, MetaActionInfo> existedMetaActions = new HashMap<>(); private final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
private final List<PhaserRecord> phaserRecords = new ArrayList<>(); private final List<PhaserRecord> phaserRecords = new ArrayList<>();
private RunnerClient runnerClient; private RunnerClient runnerClient;

View File

@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@@ -35,7 +36,7 @@ import static work.slhaf.partner.common.Constant.Path.TMP_ACTION_DIR_LOCAL;
@Slf4j @Slf4j
public class LocalRunnerClient extends RunnerClient { public class LocalRunnerClient extends RunnerClient {
public LocalRunnerClient(Map<String, MetaActionInfo> existedMetaActions, ExecutorService executor, @Nullable String actionWatchPath) { public LocalRunnerClient(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService executor, @Nullable String actionWatchPath) {
super(existedMetaActions, executor); super(existedMetaActions, executor);
ActionWatchService watchService = new ActionWatchService(actionWatchPath); ActionWatchService watchService = new ActionWatchService(actionWatchPath);
watchService.launch(); watchService.launch();

View File

@@ -33,6 +33,7 @@ import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer; import java.util.function.Consumer;
@@ -41,7 +42,7 @@ import java.util.function.Function;
@Slf4j @Slf4j
public abstract class RunnerClient { public abstract class RunnerClient {
protected final Map<String, MetaActionInfo> existedMetaActions; protected final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions;
protected final ExecutorService executor; protected final ExecutorService executor;
protected final Map<String, McpSyncClient> mcpClients = new HashMap<>(); protected final Map<String, McpSyncClient> mcpClients = new HashMap<>();
protected final Map<String, McpStatelessAsyncServer> localMcpServers = new HashMap<>(); protected final Map<String, McpStatelessAsyncServer> localMcpServers = new HashMap<>();
@@ -49,7 +50,7 @@ public abstract class RunnerClient {
/** /**
* ActionCore 将注入虚拟线程池 * ActionCore 将注入虚拟线程池
*/ */
public RunnerClient(Map<String, MetaActionInfo> existedMetaActions, ExecutorService executor) { public RunnerClient(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService executor) {
this.existedMetaActions = existedMetaActions; this.existedMetaActions = existedMetaActions;
this.executor = executor; this.executor = executor;
setupShutdownHook(); setupShutdownHook();
@@ -99,11 +100,9 @@ public abstract class RunnerClient {
for (McpSchema.Tool tool : tools) { for (McpSchema.Tool tool : tools) {
MetaActionInfo info = buildMetaActionInfo(tool); MetaActionInfo info = buildMetaActionInfo(tool);
String actionKey = id + "::" + tool.name(); String actionKey = id + "::" + tool.name();
synchronized (existedMetaActions) {
existedMetaActions.put(actionKey, info); existedMetaActions.put(actionKey, info);
} }
} }
}
private static @NotNull MetaActionInfo buildMetaActionInfo(McpSchema.Tool tool) { private static @NotNull MetaActionInfo buildMetaActionInfo(McpSchema.Tool tool) {
MetaActionInfo info = new MetaActionInfo(); MetaActionInfo info = new MetaActionInfo();

View File

@@ -6,7 +6,7 @@ import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo; import work.slhaf.partner.core.action.entity.MetaActionInfo;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@@ -25,7 +25,7 @@ import java.util.concurrent.ExecutorService;
*/ */
public class SandboxRunnerClient extends RunnerClient { public class SandboxRunnerClient extends RunnerClient {
public SandboxRunnerClient(Map<String, MetaActionInfo> existedMetaActions, ExecutorService executor) { // 连接沙盒执行器(websocket) public SandboxRunnerClient(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService executor) { // 连接沙盒执行器(websocket)
super(existedMetaActions, executor); super(existedMetaActions, executor);
} }

View File

@@ -7,8 +7,7 @@ import work.slhaf.partner.core.action.runner.LocalRunnerClient;
import work.slhaf.partner.core.action.runner.RunnerClient; import work.slhaf.partner.core.action.runner.RunnerClient;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@@ -39,7 +38,7 @@ public class SystemTest {
@Test @Test
void localRunnerClientTest() { void localRunnerClientTest() {
Map<String, MetaActionInfo> existedMetaActions = new HashMap<>(); ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
RunnerClient client = new LocalRunnerClient(existedMetaActions, executor, null); RunnerClient client = new LocalRunnerClient(existedMetaActions, executor, null);
JSONObject res = client.listSysDependencies(); JSONObject res = client.listSysDependencies();

View File

@@ -7,6 +7,7 @@ import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionType; import work.slhaf.partner.core.action.entity.MetaActionType;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class LocalRunnerClientTest { public class LocalRunnerClientTest {
@@ -15,7 +16,7 @@ public class LocalRunnerClientTest {
@BeforeAll @BeforeAll
static void beforeAll() { static void beforeAll() {
runnerClient = new LocalRunnerClient(Map.of(), Executors.newVirtualThreadPerTaskExecutor(), "/home/slhaf/Projects/IdeaProjects/Projects/Partner/Partner-Main/src/test/java/resources/action/data"); runnerClient = new LocalRunnerClient(new ConcurrentHashMap<>(), Executors.newVirtualThreadPerTaskExecutor(), "/home/slhaf/Projects/IdeaProjects/Projects/Partner/Partner-Main/src/test/java/resources/action/data");
} }
@Test @Test

View File

@@ -15,6 +15,7 @@ import work.slhaf.partner.core.action.entity.MetaActionInfo;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class RunnerClientTest { public class RunnerClientTest {
@@ -92,7 +93,7 @@ public class RunnerClientTest {
private static class TestRunnerClient extends RunnerClient { private static class TestRunnerClient extends RunnerClient {
public TestRunnerClient() { public TestRunnerClient() {
super(Map.of(), Executors.newVirtualThreadPerTaskExecutor()); super(new ConcurrentHashMap<>(), Executors.newVirtualThreadPerTaskExecutor());
} }
@Override @Override