From c9466f435939ae36a36e86b85b9f4d7d36f94088 Mon Sep 17 00:00:00 2001 From: slhafzjw Date: Sun, 29 Mar 2026 20:19:09 +0800 Subject: [PATCH] refactor(executor): remove legacy InteractionThreadPoolExecutor , and change related executor calling into virtual executor and CountDownLatch --- .../partner/common/monitor/DebugMonitor.java | 36 ----------- .../thread/InteractionThreadPoolExecutor.java | 60 ------------------ .../planner/evaluator/ActionEvaluator.java | 62 +++++++++++-------- .../module/memory/updater/MemoryUpdater.java | 10 ++- .../updater/summarizer/SingleSummarizer.java | 40 +++++++----- .../runtime/interaction/WebSocketGateway.java | 7 ++- 6 files changed, 70 insertions(+), 145 deletions(-) delete mode 100644 Partner-Core/src/main/java/work/slhaf/partner/common/monitor/DebugMonitor.java delete mode 100644 Partner-Core/src/main/java/work/slhaf/partner/common/thread/InteractionThreadPoolExecutor.java diff --git a/Partner-Core/src/main/java/work/slhaf/partner/common/monitor/DebugMonitor.java b/Partner-Core/src/main/java/work/slhaf/partner/common/monitor/DebugMonitor.java deleted file mode 100644 index a18672f2..00000000 --- a/Partner-Core/src/main/java/work/slhaf/partner/common/monitor/DebugMonitor.java +++ /dev/null @@ -1,36 +0,0 @@ -package work.slhaf.partner.common.monitor; - -import lombok.extern.slf4j.Slf4j; -import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; - -@Slf4j -public class DebugMonitor { - - private static DebugMonitor debugMonitor; - private InteractionThreadPoolExecutor executor; - - public static void initialize() { - debugMonitor = new DebugMonitor(); - debugMonitor.executor = InteractionThreadPoolExecutor.getInstance(); - debugMonitor.runMonitor(); - } - - public static DebugMonitor getInstance() { - if (debugMonitor == null) { - initialize(); - } - return debugMonitor; - } - - private void runMonitor() { - executor.execute(() -> { - while (true) { - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - log.error("监测线程报错?"); - } - } - }); - } -} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/common/thread/InteractionThreadPoolExecutor.java b/Partner-Core/src/main/java/work/slhaf/partner/common/thread/InteractionThreadPoolExecutor.java deleted file mode 100644 index 23ea8e97..00000000 --- a/Partner-Core/src/main/java/work/slhaf/partner/common/thread/InteractionThreadPoolExecutor.java +++ /dev/null @@ -1,60 +0,0 @@ -package work.slhaf.partner.common.thread; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; - -public class InteractionThreadPoolExecutor { - - private static InteractionThreadPoolExecutor interactionThreadPoolExecutor; - - private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); - - public static InteractionThreadPoolExecutor getInstance() { - if (interactionThreadPoolExecutor == null) { - interactionThreadPoolExecutor = new InteractionThreadPoolExecutor(); - } - return interactionThreadPoolExecutor; - } - - - public void invokeAll(List> tasks, int time, TimeUnit timeUnit) { - try { - executorService.invokeAll(tasks, time, timeUnit); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - public void invokeAll(List> tasks) { - try { - List> futures = executorService.invokeAll(tasks); - for (Future future : futures) { - future.get(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); - } - } - - public List invokeAllAndReturn(List> tasks) { - try { - List> futures = executorService.invokeAll(tasks); - List results = new ArrayList<>(); - for (Future future : futures) { - results.add(future.get()); - } - return results; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); - } - } - - public void execute(Runnable runnable) { - executorService.execute(runnable); - } -} diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/action/planner/evaluator/ActionEvaluator.java b/Partner-Core/src/main/java/work/slhaf/partner/module/action/planner/evaluator/ActionEvaluator.java index f0009f1f..9583ce43 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/action/planner/evaluator/ActionEvaluator.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/action/planner/evaluator/ActionEvaluator.java @@ -9,8 +9,8 @@ import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentMod import work.slhaf.partner.api.agent.factory.component.abstracts.ActivateModel; import work.slhaf.partner.api.agent.factory.component.annotation.Init; import work.slhaf.partner.api.chat.pojo.Message; -import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; import work.slhaf.partner.core.action.ActionCapability; +import work.slhaf.partner.core.action.ActionCore; import work.slhaf.partner.core.cognition.BlockContent; import work.slhaf.partner.core.cognition.CognitionCapability; import work.slhaf.partner.core.cognition.ContextBlock; @@ -20,7 +20,8 @@ import work.slhaf.partner.module.action.planner.evaluator.entity.EvaluatorResult import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; public class ActionEvaluator extends AbstractAgentModule.Sub> implements ActivateModel { @@ -29,11 +30,11 @@ public class ActionEvaluator extends AbstractAgentModule.Sub execute(EvaluatorInput data) { - List> tasks = buildEvaluateTasks(data.getTendencies()); - return executor.invokeAllAndReturn(tasks); - } + List tendencies = data.getTendencies(); + CountDownLatch latch = new CountDownLatch(tendencies.size()); + List evaluatorResults = new ArrayList<>(); - private List> buildEvaluateTasks(List tendencies) { - List> list = new ArrayList<>(); for (String tendency : tendencies) { - list.add(() -> { - List messages = List.of( - cognitionCapability.contextWorkspace().resolve(List.of( - ContextBlock.VisibleDomain.ACTION, - ContextBlock.VisibleDomain.COGNITION, - ContextBlock.VisibleDomain.MEMORY - )).encodeToMessage(), - availableMetaActionContext(), - new Message(Message.Character.USER, tendency) - ); - EvaluatorResult evaluatorResult = formattedChat( - messages, - EvaluatorResult.class - ); - evaluatorResult.setTendency(tendency); - return evaluatorResult; + executor.execute(() -> { + try { + List messages = List.of( + cognitionCapability.contextWorkspace().resolve(List.of( + ContextBlock.VisibleDomain.ACTION, + ContextBlock.VisibleDomain.COGNITION, + ContextBlock.VisibleDomain.MEMORY + )).encodeToMessage(), + availableMetaActionContext(), + new Message(Message.Character.USER, tendency) + ); + EvaluatorResult evaluatorResult = formattedChat( + messages, + EvaluatorResult.class + ); + evaluatorResult.setTendency(tendency); + synchronized (evaluatorResults) { + evaluatorResults.add(evaluatorResult); + } + } finally { + latch.countDown(); + } }); } - return list; + + try { + latch.await(); + } catch (InterruptedException ignored) { + } + return evaluatorResults; } private Message availableMetaActionContext() { diff --git a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/updater/MemoryUpdater.java b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/updater/MemoryUpdater.java index be509e62..9dbb95b3 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/module/memory/updater/MemoryUpdater.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/module/memory/updater/MemoryUpdater.java @@ -10,7 +10,8 @@ import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentMod import work.slhaf.partner.api.agent.factory.component.annotation.Init; import work.slhaf.partner.api.agent.factory.component.annotation.InjectModule; import work.slhaf.partner.api.chat.pojo.Message; -import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; +import work.slhaf.partner.core.action.ActionCapability; +import work.slhaf.partner.core.action.ActionCore; import work.slhaf.partner.core.action.entity.Schedulable; import work.slhaf.partner.core.action.entity.StateAction; import work.slhaf.partner.core.cognition.CognitionCapability; @@ -30,6 +31,7 @@ import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowCon import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @EqualsAndHashCode(callSuper = true) @@ -47,6 +49,8 @@ public class MemoryUpdater extends AbstractAgentModule.Running, Void> implements ActivateModel { - private InteractionThreadPoolExecutor executor; + + @InjectCapability + private ActionCapability actionCapability; + + private ExecutorService executor; @Init public void init() { - this.executor = InteractionThreadPoolExecutor.getInstance(); + executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL); } @Override public Void execute(List chatMessages) { log.debug("[MemorySummarizer] 长文本摘要开始..."); - List> tasks = new ArrayList<>(); - AtomicInteger counter = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(chatMessages.size()); for (int i = 0; i < chatMessages.size(); i++) { Message chatMessage = chatMessages.get(i); if (chatMessage.getRole() == Message.Character.ASSISTANT) { String content = chatMessage.getContent(); if (chatMessage.getContent().length() > 500) { int index = i; - tasks.add(() -> { - int thisCount = counter.incrementAndGet(); - log.debug("[MemorySummarizer] 长文本摘要[{}]启动", thisCount); - String summarized = singleExecute(JSONObject.of("content", content).toString()); - chatMessages.set(index, new Message(Message.Character.ASSISTANT, summarized)); - log.debug("[MemorySummarizer] 长文本摘要[{}]完成", thisCount); - return null; + executor.execute(() -> { + try { + String summarized = singleExecute(JSONObject.of("content", content).toString()); + chatMessages.set(index, new Message(Message.Character.ASSISTANT, summarized)); + } finally { + latch.countDown(); + } }); } } } - executor.invokeAll(tasks, 30, TimeUnit.SECONDS); + try { + latch.await(); + } catch (InterruptedException ignored) { + } log.debug("[MemorySummarizer] 长文本摘要结束"); return null; } diff --git a/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/WebSocketGateway.java b/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/WebSocketGateway.java index ddb781ec..bf6f2a68 100644 --- a/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/WebSocketGateway.java +++ b/Partner-Core/src/main/java/work/slhaf/partner/runtime/interaction/WebSocketGateway.java @@ -12,13 +12,14 @@ import work.slhaf.partner.api.agent.runtime.config.AgentConfigLoader; import work.slhaf.partner.api.agent.runtime.interaction.AgentGateway; import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter; import work.slhaf.partner.common.config.PartnerAgentConfigLoader; -import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; import work.slhaf.partner.runtime.interaction.data.PartnerInputData; import work.slhaf.partner.runtime.interaction.data.PartnerOutputData; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Slf4j public class WebSocketGateway extends WebSocketServer implements AgentGateway { @@ -27,7 +28,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway userSessions = new ConcurrentHashMap<>(); - private final InteractionThreadPoolExecutor executor; + private final ExecutorService executor; // 记录最后一次收到Pong的时间 private final ConcurrentHashMap lastPongTimes = new ConcurrentHashMap<>(); @@ -39,7 +40,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway