refactor(executor): remove legacy InteractionThreadPoolExecutor , and change related executor calling into virtual executor and CountDownLatch

This commit is contained in:
2026-03-29 20:19:09 +08:00
parent cb09b86b23
commit c9466f4359
6 changed files with 70 additions and 145 deletions

View File

@@ -1,36 +0,0 @@
package work.slhaf.partner.common.monitor;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor;
@Slf4j
public class DebugMonitor {
private static DebugMonitor debugMonitor;
private InteractionThreadPoolExecutor executor;
public static void initialize() {
debugMonitor = new DebugMonitor();
debugMonitor.executor = InteractionThreadPoolExecutor.getInstance();
debugMonitor.runMonitor();
}
public static DebugMonitor getInstance() {
if (debugMonitor == null) {
initialize();
}
return debugMonitor;
}
private void runMonitor() {
executor.execute(() -> {
while (true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
log.error("监测线程报错?");
}
}
});
}
}

View File

@@ -1,60 +0,0 @@
package work.slhaf.partner.common.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class InteractionThreadPoolExecutor {
private static InteractionThreadPoolExecutor interactionThreadPoolExecutor;
private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
public static InteractionThreadPoolExecutor getInstance() {
if (interactionThreadPoolExecutor == null) {
interactionThreadPoolExecutor = new InteractionThreadPoolExecutor();
}
return interactionThreadPoolExecutor;
}
public <T> void invokeAll(List<Callable<T>> tasks, int time, TimeUnit timeUnit) {
try {
executorService.invokeAll(tasks, time, timeUnit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public <T> void invokeAll(List<Callable<T>> tasks) {
try {
List<Future<T>> futures = executorService.invokeAll(tasks);
for (Future<T> future : futures) {
future.get();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
public <T> List<T> invokeAllAndReturn(List<Callable<T>> tasks) {
try {
List<Future<T>> futures = executorService.invokeAll(tasks);
List<T> results = new ArrayList<>();
for (Future<T> future : futures) {
results.add(future.get());
}
return results;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
public void execute(Runnable runnable) {
executorService.execute(runnable);
}
}

View File

@@ -9,8 +9,8 @@ import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentMod
import work.slhaf.partner.api.agent.factory.component.abstracts.ActivateModel; import work.slhaf.partner.api.agent.factory.component.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.factory.component.annotation.Init; import work.slhaf.partner.api.agent.factory.component.annotation.Init;
import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor;
import work.slhaf.partner.core.action.ActionCapability; import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.cognition.BlockContent; import work.slhaf.partner.core.cognition.BlockContent;
import work.slhaf.partner.core.cognition.CognitionCapability; import work.slhaf.partner.core.cognition.CognitionCapability;
import work.slhaf.partner.core.cognition.ContextBlock; import work.slhaf.partner.core.cognition.ContextBlock;
@@ -20,7 +20,8 @@ import work.slhaf.partner.module.action.planner.evaluator.entity.EvaluatorResult
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
public class ActionEvaluator extends AbstractAgentModule.Sub<EvaluatorInput, List<EvaluatorResult>> implements ActivateModel { public class ActionEvaluator extends AbstractAgentModule.Sub<EvaluatorInput, List<EvaluatorResult>> implements ActivateModel {
@@ -29,11 +30,11 @@ public class ActionEvaluator extends AbstractAgentModule.Sub<EvaluatorInput, Lis
@InjectCapability @InjectCapability
private CognitionCapability cognitionCapability; private CognitionCapability cognitionCapability;
private InteractionThreadPoolExecutor executor; private ExecutorService executor;
@Init @Init
public void init() { public void init() {
executor = InteractionThreadPoolExecutor.getInstance(); executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
} }
/** /**
@@ -44,14 +45,13 @@ public class ActionEvaluator extends AbstractAgentModule.Sub<EvaluatorInput, Lis
*/ */
@Override @Override
public List<EvaluatorResult> execute(EvaluatorInput data) { public List<EvaluatorResult> execute(EvaluatorInput data) {
List<Callable<EvaluatorResult>> tasks = buildEvaluateTasks(data.getTendencies()); List<String> tendencies = data.getTendencies();
return executor.invokeAllAndReturn(tasks); CountDownLatch latch = new CountDownLatch(tendencies.size());
} List<EvaluatorResult> evaluatorResults = new ArrayList<>();
private List<Callable<EvaluatorResult>> buildEvaluateTasks(List<String> tendencies) {
List<Callable<EvaluatorResult>> list = new ArrayList<>();
for (String tendency : tendencies) { for (String tendency : tendencies) {
list.add(() -> { executor.execute(() -> {
try {
List<Message> messages = List.of( List<Message> messages = List.of(
cognitionCapability.contextWorkspace().resolve(List.of( cognitionCapability.contextWorkspace().resolve(List.of(
ContextBlock.VisibleDomain.ACTION, ContextBlock.VisibleDomain.ACTION,
@@ -66,10 +66,20 @@ public class ActionEvaluator extends AbstractAgentModule.Sub<EvaluatorInput, Lis
EvaluatorResult.class EvaluatorResult.class
); );
evaluatorResult.setTendency(tendency); evaluatorResult.setTendency(tendency);
return evaluatorResult; synchronized (evaluatorResults) {
evaluatorResults.add(evaluatorResult);
}
} finally {
latch.countDown();
}
}); });
} }
return list;
try {
latch.await();
} catch (InterruptedException ignored) {
}
return evaluatorResults;
} }
private Message availableMetaActionContext() { private Message availableMetaActionContext() {

View File

@@ -10,7 +10,8 @@ import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentMod
import work.slhaf.partner.api.agent.factory.component.annotation.Init; import work.slhaf.partner.api.agent.factory.component.annotation.Init;
import work.slhaf.partner.api.agent.factory.component.annotation.InjectModule; import work.slhaf.partner.api.agent.factory.component.annotation.InjectModule;
import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.Schedulable; import work.slhaf.partner.core.action.entity.Schedulable;
import work.slhaf.partner.core.action.entity.StateAction; import work.slhaf.partner.core.action.entity.StateAction;
import work.slhaf.partner.core.cognition.CognitionCapability; import work.slhaf.partner.core.cognition.CognitionCapability;
@@ -30,6 +31,7 @@ import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowCon
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@@ -47,6 +49,8 @@ public class MemoryUpdater extends AbstractAgentModule.Running<PartnerRunningFlo
private MemoryCapability memoryCapability; private MemoryCapability memoryCapability;
@InjectCapability @InjectCapability
private PerceiveCapability perceiveCapability; private PerceiveCapability perceiveCapability;
@InjectCapability
private ActionCapability actionCapability;
@InjectModule @InjectModule
private MemoryRuntime memoryRuntime; private MemoryRuntime memoryRuntime;
@@ -60,11 +64,11 @@ public class MemoryUpdater extends AbstractAgentModule.Running<PartnerRunningFlo
private DialogRollingService dialogRollingService; private DialogRollingService dialogRollingService;
private final AtomicBoolean updating = new AtomicBoolean(false); private final AtomicBoolean updating = new AtomicBoolean(false);
private InteractionThreadPoolExecutor executor; private ExecutorService executor;
@Init @Init
public void init() { public void init() {
executor = InteractionThreadPoolExecutor.getInstance(); executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
registerScheduledUpdater(); registerScheduledUpdater();
} }

View File

@@ -3,51 +3,57 @@ package work.slhaf.partner.module.memory.updater.summarizer;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule; import work.slhaf.partner.api.agent.factory.component.abstracts.AbstractAgentModule;
import work.slhaf.partner.api.agent.factory.component.abstracts.ActivateModel; import work.slhaf.partner.api.agent.factory.component.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.factory.component.annotation.Init; import work.slhaf.partner.api.agent.factory.component.annotation.Init;
import work.slhaf.partner.api.chat.pojo.Message; import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor; import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Data @Data
public class SingleSummarizer extends AbstractAgentModule.Sub<List<Message>, Void> implements ActivateModel { public class SingleSummarizer extends AbstractAgentModule.Sub<List<Message>, Void> implements ActivateModel {
private InteractionThreadPoolExecutor executor;
@InjectCapability
private ActionCapability actionCapability;
private ExecutorService executor;
@Init @Init
public void init() { public void init() {
this.executor = InteractionThreadPoolExecutor.getInstance(); executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
} }
@Override @Override
public Void execute(List<Message> chatMessages) { public Void execute(List<Message> chatMessages) {
log.debug("[MemorySummarizer] 长文本摘要开始..."); log.debug("[MemorySummarizer] 长文本摘要开始...");
List<Callable<Void>> tasks = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(chatMessages.size());
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < chatMessages.size(); i++) { for (int i = 0; i < chatMessages.size(); i++) {
Message chatMessage = chatMessages.get(i); Message chatMessage = chatMessages.get(i);
if (chatMessage.getRole() == Message.Character.ASSISTANT) { if (chatMessage.getRole() == Message.Character.ASSISTANT) {
String content = chatMessage.getContent(); String content = chatMessage.getContent();
if (chatMessage.getContent().length() > 500) { if (chatMessage.getContent().length() > 500) {
int index = i; int index = i;
tasks.add(() -> { executor.execute(() -> {
int thisCount = counter.incrementAndGet(); try {
log.debug("[MemorySummarizer] 长文本摘要[{}]启动", thisCount);
String summarized = singleExecute(JSONObject.of("content", content).toString()); String summarized = singleExecute(JSONObject.of("content", content).toString());
chatMessages.set(index, new Message(Message.Character.ASSISTANT, summarized)); chatMessages.set(index, new Message(Message.Character.ASSISTANT, summarized));
log.debug("[MemorySummarizer] 长文本摘要[{}]完成", thisCount); } finally {
return null; latch.countDown();
}
}); });
} }
} }
} }
executor.invokeAll(tasks, 30, TimeUnit.SECONDS); try {
latch.await();
} catch (InterruptedException ignored) {
}
log.debug("[MemorySummarizer] 长文本摘要结束"); log.debug("[MemorySummarizer] 长文本摘要结束");
return null; return null;
} }

View File

@@ -12,13 +12,14 @@ import work.slhaf.partner.api.agent.runtime.config.AgentConfigLoader;
import work.slhaf.partner.api.agent.runtime.interaction.AgentGateway; import work.slhaf.partner.api.agent.runtime.interaction.AgentGateway;
import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter; import work.slhaf.partner.api.agent.runtime.interaction.AgentInteractionAdapter;
import work.slhaf.partner.common.config.PartnerAgentConfigLoader; import work.slhaf.partner.common.config.PartnerAgentConfigLoader;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor;
import work.slhaf.partner.runtime.interaction.data.PartnerInputData; import work.slhaf.partner.runtime.interaction.data.PartnerInputData;
import work.slhaf.partner.runtime.interaction.data.PartnerOutputData; import work.slhaf.partner.runtime.interaction.data.PartnerOutputData;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j @Slf4j
public class WebSocketGateway extends WebSocketServer implements AgentGateway<PartnerInputData, PartnerOutputData, PartnerRunningFlowContext> { public class WebSocketGateway extends WebSocketServer implements AgentGateway<PartnerInputData, PartnerOutputData, PartnerRunningFlowContext> {
@@ -27,7 +28,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<Pa
@ToString.Exclude @ToString.Exclude
private final ConcurrentHashMap<String, WebSocket> userSessions = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, WebSocket> userSessions = new ConcurrentHashMap<>();
private final InteractionThreadPoolExecutor executor; private final ExecutorService executor;
// 记录最后一次收到Pong的时间 // 记录最后一次收到Pong的时间
private final ConcurrentHashMap<WebSocket, Long> lastPongTimes = new ConcurrentHashMap<>(); private final ConcurrentHashMap<WebSocket, Long> lastPongTimes = new ConcurrentHashMap<>();
@@ -39,7 +40,7 @@ public class WebSocketGateway extends WebSocketServer implements AgentGateway<Pa
private WebSocketGateway(int port) { private WebSocketGateway(int port) {
super(new InetSocketAddress(port)); super(new InetSocketAddress(port));
this.setReuseAddr(true); this.setReuseAddr(true);
this.executor = InteractionThreadPoolExecutor.getInstance(); this.executor = Executors.newSingleThreadExecutor();
} }
public void launch() { public void launch() {