203 Commits

Author SHA1 Message Date
f6afe21b43 Merge branch 'feature/ActionModule'
# Conflicts:
#	.gitignore
2026-02-09 21:22:26 +08:00
d381a97731 refactor(ActionScheduler): add debug log for hour-change trigger scan 2026-02-09 21:12:38 +08:00
940beb2587 test(ActionScheduler): add test 2026-02-09 21:05:08 +08:00
69d9f04f11 fix(ActionScheduler): stabilize wheel tick pacing and run trigger scan before hour/day refresh 2026-02-09 21:04:48 +08:00
e2bd9eb0af fix(ActionScheduler): enqueue same-hour actions in wheel and add scheduling debug logs 2026-02-09 21:19:40 +08:00
9ec03c4c95 fix(ActionScheduler): include previous tick in trigger scan and tighten next execution filtering 2026-02-09 21:01:22 +08:00
ecbbbc9954 refactor(ActionScheduler): include tick in wheel stop debug log 2026-02-09 20:56:45 +08:00
a5d26769e8 fix(ActionScheduler): skip trigger callback when tick has no actions 2026-02-09 20:54:35 +08:00
2db1bdf3e9 refactor(ActionScheduler): add debug log for action execution 2026-02-09 20:50:36 +08:00
656d6b65e3 refactor(ActionScheduler): add debug logs for wheel start/stop, wait window, and action loading 2026-02-09 20:41:01 +08:00
7c46f1d1ff fix(ActionScheduler): remove triggered hour actions by uuid to avoid removeAll mismatch 2026-02-09 20:03:24 +08:00
406b4250aa refactor(ActionScheduler): correct actions loading logic in hour/day updating 2026-02-09 20:03:10 +08:00
eab3d00fe8 refactor(ActionScheduler): remove useless delay in TimeWheel#wheel 2026-02-09 20:02:26 +08:00
d47e9fbf95 fix(ActionScheduler): initialize wheel tick baseline before launch to avoid check-to-wheel startup drift 2026-02-09 17:29:32 +08:00
4b77f26e7b refactor(ActionScheduler): capture current hour once and reuse it for
day/hour rollover checks
2026-02-09 16:37:46 +08:00
650f9b27a1 fix(ActionScheduler): use checkThenExecute current hour consistently and trigger wheel tasks outside lock 2026-02-09 15:56:12 +08:00
9f479c5f6f fix(ActionScheduler): unify time check/loading under checkThenExecute and guard wheel loop with launch-hour consistency 2026-02-09 14:57:13 +08:00
227c735667 fix(ActionScheduler): make TimeWheel load scheduled actions dynamically instead of using init snapshot 2026-02-09 00:13:36 +08:00
b05b665960 fix(ActionScheduler): reload day/hour action buckets on time changes via checkTimeAndLoad, and reorganize functions 2026-02-09 00:03:21 +08:00
882ec43f2b fix(ActionScheduler): make scheduling thread-safe with Mutex and cancel scheduler/time wheel scopes on shutdown 2026-02-08 23:07:03 +08:00
7cb565fd1b fix(ActionScheduler): use withTimeoutOrNull when waiting for ACTIVE state, to avoid exception leading to wheel stopped 2026-02-08 21:56:35 +08:00
84b96b6645 test(ActionScheduler): remove unused actionExecutor mock 2026-02-08 21:52:16 +08:00
2169376062 test(ActionScheduler): add unit test for ActionScheduler 2026-02-08 21:51:57 +08:00
9bff74c8c7 fix(ActionScheduler): remove second offset when loading hour actions 2026-02-08 21:51:30 +08:00
76c9c27532 refactor(MetaAction): make result a read-only property 2026-02-08 17:22:47 +08:00
8524ca6f9f refactor(ActionData): use action.result.reset() when clearing action chain state 2026-02-08 17:22:10 +08:00
7dd2104689 refactor(MetaAction): migrate to Kotlin data class, merge MetaActionType/ResultStatus into nested enums, and update runner/action usages 2026-02-08 17:15:58 +08:00
6ba5784a7f 将 .java 重命名为 .kt 2026-02-08 17:15:58 +08:00
cdea8d6322 refactor(ActionData): migrate to Kotlin sealed class and data classes, update planner/scheduler usage 2026-02-08 16:27:44 +08:00
8ca2b9998d 将 .java 重命名为 .kt 2026-02-08 16:27:44 +08:00
d098b28f31 refactor(ActionExecutorInput): migrate to Kotlin data class 2026-02-08 15:12:10 +08:00
98e4d4cf1b 将 .java 重命名为 .kt 2026-02-08 15:12:10 +08:00
70489e57f7 chore(pom): add kotlinx-coroutines-test dependency 2026-02-08 14:34:19 +08:00
a43c87006e refactor(ActionCore): replace existing action with same UUID before putAction 2026-02-08 13:29:18 +08:00
be43b7eec6 refactor(ActionScheduler): implement Kotlin time-wheel scheduling and requeue scheduled actions after execution 2026-02-08 13:24:56 +08:00
3bc2ce839a 将 .java 重命名为 .kt 2026-02-08 13:24:56 +08:00
fe5a366527 refactor(ActionExecutor): remove userId from ActionExecutorInput and use source 2026-02-08 11:29:36 +08:00
9f724cee5d chore(pom): remove test scope from coroutines dependency 2026-02-08 11:22:47 +08:00
ad58b83020 refactor(ActionExecutor): rename actions variable for clarity 2026-02-08 11:22:30 +08:00
c9b64fec2a chore(pom): add cron-utils dependency 2026-02-07 15:36:45 +08:00
0eb4765235 refactor(ActionExecutor): use HistoryAction record and track scheduled history reset 2026-02-07 15:35:34 +08:00
050c39cbc7 refactor(ActionExecutor): correct input actions' type in ActionExecutor 2026-02-06 23:38:13 +08:00
08100aea8a refactor(ActionCore): replace prepared action APIs with generic list/put 2026-02-06 21:38:54 +08:00
2cd0774834 refactor(ActionCapability): rename listAvailableActions to listAvailableMetaActions 2026-02-06 21:05:10 +08:00
12df938d85 refactor(ActionCore): simplify handleInterventions to use ActionData 2026-02-06 20:41:08 +08:00
277c0d437f chore(ActionCore): update comment 2026-02-06 20:36:50 +08:00
6b861f4b77 fix(ActionExecutor): correct logic in ActionExecutor when actionChain is empty, which will skip execution 2026-02-05 20:40:02 +08:00
d33b6617c1 fix(ActionExecutor): support removing phaserRecord correctly when exception occurred in ActionCorrector 2026-02-05 19:40:28 +08:00
a1dcf4a6fa test(ActionExecutor): test with action failure 2026-02-05 17:05:53 +08:00
9c38719514 test(ActionExecutor): test with additionalContext appending 2026-02-05 16:58:22 +08:00
33df0fa017 test(ActionExecutor): test with virtual thread pool support 2026-02-05 16:53:38 +08:00
08bda84471 refactor(ActionCore): Specifies the minimum platform thread pool size
Context:
 The ActionExecutor needs at least 2 platform thread to support async  action execution,
 otherwise the current ActionExecutor logic cannot be executed
2026-02-05 16:49:58 +08:00
76da3c29f8 fix(ActionExecutorTest): repair stub in test 2026-02-05 16:13:21 +08:00
558b589830 refactor(ActionInterventor): redefine DTOs in ActionInterventor to adapt the actual intervention logic 2026-02-05 15:48:58 +08:00
80d7c283c5 refactor(ActionExecutor): update ActionChain execution, support executing and advancing correctly 2026-02-04 00:29:42 +08:00
b0bb40c5f0 test(ActionExecutor): add unit test for ActionExecutor 2026-02-01 19:49:21 +08:00
eec8f71096 fix(action): correct return type of method runnerClient() in ActionCapability 2026-02-01 16:43:12 +08:00
fbd30d1a96 build(maven): import Mockito related dependencies 2026-02-01 14:56:47 +08:00
346f925b66 chore(ActionExecutor): update comment 2026-01-31 23:35:17 +08:00
04e8d9e531 feat(ActionExecutor): support executing interventions in ActionExecutor 2026-01-30 20:58:12 +08:00
63d1552de2 refactor(ActionInterventor): remove InterventionHandler and related data class
Context:
Since last commit, the logic of interventions has been moved into ActionCapability,
the InterventionHandler is not needed.
2026-01-30 20:52:36 +08:00
77eb9b92a4 refactor(ActionCorrector): move intervention logic from InterventionHandler into ActionCapability 2026-01-30 20:10:01 +08:00
a1b4743eeb feat(ActionCorrector): complete corrector's executing logic 2026-01-30 19:19:48 +08:00
0768cddd2d fix(ActivateModel): correct modelSettings 2026-01-30 16:50:45 +08:00
75145cc547 chore(ActionRepairer): correct name of AssemblyHelper 2026-01-30 16:30:10 +08:00
d1ca1cda7d feature(ActionExecutor): complete CorrectorInput 2026-01-28 23:11:45 +08:00
fac6609d6b refactor(ActionExecutor): remove useless method getHistoryActionResults 2026-01-28 23:00:53 +08:00
dce8825e58 refactor(ActionExecutor): update type of history field in ActionData 2026-01-28 21:16:51 +08:00
cd641ac8dd fix(ActionExecutor): correct phaser block logic in method execute 2026-01-28 15:38:13 +08:00
5ffdab9e4a refactor(ActionExecutor): rework staged execution and runner submit
Context:
This refactor drops unnecessary method abstractions and cleans the action execution flow.
Additionally, method 'run' is renamed to 'submit' in RunnerClient, which better reflects that execution results are held in MetaAction.
2026-01-25 19:38:53 +08:00
830503eee4 chore(ActionExecutor): update comments 2026-01-23 19:23:07 +08:00
96e74ec877 test(LocalRunnerClient): add test for method run in RunnerClient 2026-01-17 11:28:21 +08:00
420d51af15 fix(LocalRunnerClient): harden doRun branches and add tests 2026-01-16 23:28:46 +08:00
8ead306b7b fix(RunnerClient): correct RunnerResponse's visibility 2026-01-16 22:17:15 +08:00
c793851107 fix(LocalRunnerClient): support cleaning non-existing MCP Servers' tools while MCP configuration files changed in CommonMcp 2026-01-16 21:48:17 +08:00
fb5cabc747 fix(LocalRunnerClient): support read MetaActionInfo according to desc files when an MCP Client with described tools registered by CommonMcp 2026-01-16 21:28:45 +08:00
c5f6c4e0ae fix(LocalRunnerClient): recover desc watcher after root deletion and expand DescMcp tests 2026-01-14 19:57:24 +08:00
200c0f3f13 fix(LocalRunnerClient): guard against null tool meta and ignore non-protocol MCP 2026-01-14 16:10:33 +08:00
fdf398b86e fix(LocalRunnerClient): close old MCP client while a new client's name is duplicated with the old one 2026-01-13 23:22:54 +08:00
774e2b6cd5 fix(LocalRunnerClient): correct abnormal deleting condition in CommonMcp 2026-01-13 23:13:52 +08:00
837a4c92d1 fix(LocalRunnerClient): treat missing action dir as invalid path during DELETE in DynamicMcp
Context:
Action directories may already be removed when DELETE events are handled.
Return null from loadFiles to signal invalid paths and lock behavior with DynamicAction watch tests.
2026-01-12 21:46:34 +08:00
ddd999d47b fix(LocalRunnerClient): prevent WatchService event loss caused by concurrent consumers
Context:
Shared WatchService with multiple watch threads caused WatchKey events to be consumed by mismatched processors, leading to missed file events.
Use isolated WatchService per WatchContext to restore correct semantics.
2026-01-12 19:46:45 +08:00
9694a022c7 chore(gitignore): update gitignore 2026-01-12 19:35:41 +08:00
31968c7076 chore(gitignore): create AGENTS.md for codex, and add it to .gitignore 2026-01-12 14:25:20 +08:00
abec141e4e fix(LocalRunnerClient): correct path creating logic in RunnerClient and its implementations 2026-01-11 16:47:14 +08:00
cdb6ae9d01 fix(LocalRunnerClient): correct method loadFiles in LocalWatchEventProcessor 2026-01-11 16:30:44 +08:00
dd8d86d3c4 chore(LocalRunnerClient): add logs to LocalRunnerClient 2026-01-11 16:27:14 +08:00
99b42620d0 refactor(LocalRunnerClient): repair paths registering order and support creating directories automatically 2026-01-11 15:01:19 +08:00
70b8335d49 feat(LocalRunnerClient): support atomic persist serialization in LocalRunnerClient 2026-01-11 14:24:34 +08:00
8ca475beeb feat(LocalRunnerClient): support registering CommonMcp 2026-01-08 22:28:12 +08:00
4f36c0dd2d feat(LocalRunnerClient): support deleting MCP configurations in CommonMcp 2026-01-08 22:23:08 +08:00
00993bd763 feat(LocalRunnerClient): support creating MCP configurations in CommonMcp 2026-01-08 22:09:14 +08:00
a0bca668cb refactor(LocalRunnerClient): support update existedMetaActions in method registerMcpClient 2026-01-08 21:48:30 +08:00
c6118c41b0 refactor(LocalRunnerClient): support loading primary fileMcpCache when CommonMcp launched 2026-01-08 21:33:39 +08:00
872d21170a feat(LocalRunnerClient): support modify and overflow events on mcp configurations in CommonMcp
Context:
Due to single file cannot present all mcp configurations, loading all MCPs at once is required.
This is compatible in both modify and overflow events.
2026-01-08 21:16:28 +08:00
44ab6cfac8 feat(LocalRunnerClient): support registering MCP clients in CommonMcp 2026-01-05 23:06:17 +08:00
ec30ac1922 refactor(LocalRunnerClient): remove tool change consumer in registerMcpClient
Context:
ExistedMetaActions' updating logic is covered by implementations of LocalWatchEventProcessor.
2026-01-03 16:34:04 +08:00
74b6d0c653 chore(RunnerClient): fix RunnerClient error usages in implementations 2026-01-03 15:49:54 +08:00
de462866b2 feat(LocalRunnerClient): support registering DescMcpServer watch service 2026-01-02 21:41:47 +08:00
4ea8926363 feat(LocalRunnerClient): support repairing description data while OVERFLOW event happened in DescMcpServer 2026-01-02 21:29:18 +08:00
04c98c7856 fix(LocalRunnerClient): support deleting descCache while *.desc.json is not available in DescMcpServer 2026-01-02 18:18:15 +08:00
0757856187 feat(LocalRunnerClient): support deleting *.desc.json in DescMcpServer 2026-01-02 17:20:12 +08:00
19ec93f248 feat(LocalRunnerClient): create modify *.desc.json in DescMcpServer 2026-01-02 16:45:01 +08:00
5877b9e80d feat(LocalRunnerClient): support modify *.desc.json in DescMcpServer 2026-01-02 16:42:56 +08:00
5db0b5fad1 feat(LocalRunnerClient): support load *.desc.json when DescMcpServer launched 2026-01-02 15:55:29 +08:00
623a86daab chore(LocalRunnerClient): update mcp servers' comments 2026-01-02 15:52:44 +08:00
64f24d3fc3 chore(LocalRunnerClient): adjust mcp servers' comments location 2026-01-02 13:38:43 +08:00
3097efe453 feat(LocalRunnerClient): support register DynamicActionMcp watch service 2026-01-02 13:29:00 +08:00
b58eeffd2f feat(LocalRunnerClient): support overflow event in DynamicActionMcpServer 2026-01-01 23:32:21 +08:00
62cec79005 refactor(LocalRunnerClient): extract duplicated action adding logic 2026-01-01 22:39:32 +08:00
03a5935107 fix(LocalRunnerClient): support deleting event for action directories in DynamicActionMcp 2026-01-01 21:29:30 +08:00
0ecaec0545 fix(LocalRunnerClient): repair loading logic of action subdirectories 2026-01-01 20:28:19 +08:00
74f2c6c950 fix(LocalRunnerClient): support creating and registering new action in
method buildCreate in DynamicActionMcp
2026-01-01 00:32:34 +08:00
f35a467ebc fix(LocalRunnerClient): support registering subdirectories in LocalWatchServiceBuild 2025-12-31 23:15:27 +08:00
64b907707a refactor(LocalRunnerClient): introduce WatchContext and decouple build/processor state 2025-12-31 23:11:15 +08:00
a6e33edc7a refactor(LocalRunnerClient): support remove action temporarily while action is not usable 2025-12-31 16:27:34 +08:00
94ef79c67d feat(LocalRunnerClient): support program deletion for DynamicActionMcp 2025-12-31 13:41:35 +08:00
a222015abb feat(LocalRunnerClient): support program modify and unify action load protocol
Context:
The method buildModify reuses AsyncToolSpecification building logic in buildLoad.
This feature unifies local action directory protocol, and refactors related logic in buildLoad.
New action directory protocol defines the file names of program and description files.
2025-12-30 20:52:32 +08:00
1c562f0e7b refactor(LocalRunnerClient): update action keys building source in
DynamicActionMcp

 Context:
 Building action keys by subdirector's name keeps unique identity for each local action.
2025-12-30 16:43:39 +08:00
89535a6b1c feat(LocalRunnerClient): add initial support for loading local action tools from filesystem
Context:
This feature supports DynamicActionMcpServer.

During initialization, directories containing a program file and a
.meta.json description are scanned and registered as MCP tools.
Tool execution is handled asynchronously via boundedElastic to avoid blocking server threads.
2025-12-29 20:46:26 +08:00
6e90bc8d67 refactor(LocalRunnerClient): co-locate system execution result 2025-12-29 18:53:41 +08:00
0e741802d1 refactor(LocalRunnerClient): consolidate MCP client transport params
Context:
Group HTTP and STDIO transport parameter variants under a sealed internal transport parameter hierarchy.
2025-12-29 18:48:53 +08:00
db3435fccf refactor(LocalRunnerClient): co-locate watch service builder internals
Context:
Group WatchService build interfaces and registry implementation into a
single internal structure for better cohesion.
2025-12-29 18:40:20 +08:00
e3294ec302 refactor(LocalRunnerClient): move system execution methods into SystemExecHelper 2025-12-29 18:26:30 +08:00
bf99e01b51 feat(LocalRunnerClient): introduce LocalWatchServiceHelper and internal implementations
Context:
This change introduces an internal scaffold to organize WatchEventHandler building logic.
2025-12-29 17:45:39 +08:00
1bd23b20c4 refactor(LocalRunnerClient): introduce DescMcpServer
Context:
This refactor supports creating descriptional files for common MCP Tools.
2025-12-29 17:35:03 +08:00
442dd55686 refactor(LocalRunnerClient): rename LocalWatchServiceRegistry 2025-12-29 14:27:00 +08:00
abe5dd5251 chore(idea): update misc.xml 2025-12-26 21:28:10 +08:00
1f737c0e29 refactor(action): reorganize constants in action module 2025-12-26 21:28:02 +08:00
d41074c814 refactor(LocalRunnerClient): replace ActionWatchService with unified watch service builder.
Context:
ActionWatchService was used to support SCRIPT and PLUGIN type actions loading from local FileSystem, this refactor allows register different paths to watch.
2025-12-25 15:41:49 +08:00
621441601a feat(LocalRunnerClient): correct method signature 2025-12-25 10:20:55 +08:00
e00d77f076 feat(LocalRunnerClient): add shutdown logic for dynamicActionMcpServer 2025-12-25 10:12:38 +08:00
d614ac0b15 feat(LocalRunnerClient): support initializing in-process dynamic action MCP Server 2025-12-24 21:36:39 +08:00
592e2604d9 refactor(mcp): move InProcessMcpTransport into Partner-Common module
Context:
Action modules in Partner-Main and SandboxRunner module rely on in-process MCP transport to support dynamically action generating.
2025-12-24 19:34:04 +08:00
dcbd2c6569 build(maven): introduce common module 2025-12-24 19:21:53 +08:00
476acb0641 refactor(LocalRunnerClient): rename McpServerParams into McpClientTrasnportParams 2025-12-22 15:02:07 +08:00
88a14f36b2 refactor(runner): relocate InProcessMcpTransport to experimental and move local MCP client logic into LocalRunnerClient
Context:
Recent changes blurred the responsibility boundary between RunnerClient and LocalRunnerClient.
This refactor moves local MCP client–specific logic into LocalRunnerClient and isolates InProcessMcpTransport and related code under the experimental package.
RunnerClient only defines indispensable methods and attributes.
2025-12-22 14:56:23 +08:00
05d1fff125 refactor(RunnerClient): remove unused MCP type enum class 2025-12-21 23:03:25 +08:00
49a4c9eb01 docs(RunnerClient): add architecture-location comment on RunnerClient 2025-12-21 22:05:46 +08:00
9e76c3e7ad refactor(SandboxRunnerClient): align doRun visibility with superclass 2025-12-19 23:34:17 +08:00
9762739138 refactor(action): replace HashMap with ConcurrentHashMap for thread-safe MetaAction storage 2025-12-19 23:30:27 +08:00
1f5509c17d refactor(RunnerClient): redesign existedMetaActions update strategy
Context:
Resource-change events cannot reliably represent tool changes.
The previous approach attempted to externalize descriptive content into files, but the meta attribute of McpSchema.Tool can provide this information.
2025-12-19 23:22:36 +08:00
ed042cfffa fix(action): correct params type in related DTOs 2025-12-19 22:57:34 +08:00
128592e23c chore(MetaActionInfo): remove unused type attribute 2025-12-19 22:47:06 +08:00
5ba36ed3e8 feat(LocalRunnerClient): support executing MetaActions via MCP type 2025-12-19 22:29:03 +08:00
4dea948f82 refactor(MetaAction): separate key attribute into name and location
Context:
This change adapts MetaAction locating to support different MetaAction types,
including loading from the local filesystem and from MCP tools.
2025-12-19 21:35:39 +08:00
dc4074715e chore(MetaAction): remove unused order attribute 2025-12-19 20:53:01 +08:00
225802c1a8 refactor(MetaActionInfo): remove key attribute and update related logic
Context:
MetaActionInfo was previously located via its own key attribute.
This is now redundant, as ActionCore already uses the key of existedMetaActions
as the single source of truth.
2025-12-19 20:41:07 +08:00
e851e33b2e feat(RunnerClient): support MCP type-based dynamic client/server registration
This allows implementations of RunnerClient to dynamically register different types of MCP service, and also provides a shutdown hook to close client/server properly.
2025-12-18 22:25:32 +08:00
cb28a5b068 feat(RunnerClient): add InProcessMcpTransport to support in-process MCP communication
Context:
This allows RunnerClient implementations to host local MCP servers without spawning another process.
2025-12-18 21:48:35 +08:00
ad58567ada chore(deps): introduce mcp dependencies 2025-12-18 17:52:15 +08:00
0eee12d685 refactor(MetaActionInfo): remove outdated constructor
Context:
Previously, MetaActionInfo comes from the local filesystem changes.
But now MCP Servers already provide a method to get information of MetaActions.
The pre- or post-dependencies are still required, for some MCP Tools cannot just be executed without additional context.
2025-12-18 17:49:52 +08:00
1e6ff1b30c chore(ActionCore): update outdated comment 2025-12-18 17:49:52 +08:00
0413fc281d chore(MetaAction): update outdated comment 2025-12-17 22:18:43 +08:00
8a7681ae31 chore(LocalRunnerClient): remove a redundant comment 2025-12-17 20:02:28 +08:00
1947f25ed6 feat(LocalRunnerClient): support executing origin actions
Context:
Origin actions are generated by DynamicActionGenerator and may optionally be
persistently serialized. This feature adds the basic execution flow for origin
actions within LocalRunnerClient.

Notes:
The current mapping between action files and their extensions is hardcoded. This should later be replaced with a configurable registry or loaded dynamically
during application startup.
2025-12-16 21:59:53 +08:00
488246525f chore(gitignore): exclude runtime data directory from version control 2025-12-16 21:39:11 +08:00
534dcd5ade fix(LocalRunnerClient): correctly capture stdout and stderr to avoid missing output in method exec 2025-12-16 21:30:51 +08:00
ad58c0cc7c refactor(LocalRunnerClient): allow injecting action watch path
Context:
The hardcoded action watch path made LocalRunnerClient difficult to test and
tightened it to a specific runtime layout. Injecting the watch path improves
testability and allows the runner to work in different runtime environments.
2025-12-16 21:02:29 +08:00
d546148d69 chore(test): organize experimental tests and test resources 2025-12-16 19:58:08 +08:00
bf2d5ac707 refactor(RunnerClient): restructure serialization and temp execution paths
Context:
Following the consolidation of action types into ORIGIN and MCP,
the serialization logic needs to be separated into dedicated methods.
These methods are invoked by DynamicActionGenerator.
2025-12-16 10:47:23 +08:00
628234f6e2 refactor(MetaActionType): redefine meta action types into MCP and ORIGIN
Context:
Previously, SCRIPT and PLUGIN were treated as separate action types,
but their semantics are already covered by MCP.
However, a generic execution path for locally generated actions is still
required, which is represented by ORIGIN.
2025-12-16 10:37:04 +08:00
4b852e0049 推进 ActionExecutor 下的‘行动生成与执行’部分
- 新增 RunnerClient 抽象类,并划分 SandboxRunnerClient、LocalRunnerClient两个子类(内容待完善)。前者负责对接 SandboxRunner 模块,后者直接使用本地作为执行环境(但不推荐)。
- 将 ActionWatchService 划为 LocalRunnerClient 的内部类,负责采用本地执行环境时,监听行动程序变化
- 完善 ActionRepairer 处的修复逻辑
- 调整 MetaAction 中路径获取逻辑

这提交方式真该调整一下了,这阶段推进容易攒太多,但又不好停手。或许阶段目标可以保留,但推进点应该可以细化🤔
2025-12-15 21:54:24 +08:00
6e3deced77 推进 ActionExecutor 下的 DynamicActionGenerator 子模块
- 完善了 DynamicActionGenerator 的大致逻辑,序列化逻辑待实现
- 补充了 PhaserRecord 中的阻塞逻辑,使用普通的线程sleep操作
- 调整了 MetaAction 中参数形式,由列表替换为 Map,便于执行时填写参数
- 完善了 DynamicActionGenerator 相关的数据类
2025-12-07 20:10:53 +08:00
6a351413a1 推进行动执行模块: 调整了 ActionExecutor 以支持行动链动态修复和参数提取; 完善了 ActionRepairer、ParamsExtractor 的主要逻辑; 完善了部分数据类的内容
- 在 ActionData 中新增 additionalContext 用于存储各个执行阶段临时修复生成的上下文,同样以执行阶段为键
- 调整 ActionExecutor 的输入参数,可传入用户标识,用于执行器调用 ActionRepairer 的修复过程
- 完善了 ActionExecutor 中行动单元的执行与修复逻辑,将支持正常状态推进执行、触发自对话时阻塞当前行动单元、所有修复方式失败时将整个行动数据标为 FAILED
- 完善了 ActionExecutor 中各个DTO的构建方法
- 完善了 ParamsExtractor 中的参数提取逻辑
- 在 PhaserRecord 中新增 interrupt 和 complete 方法,将用于后续行动单元的阻塞(ActionExecutor中)与恢复(InterventionHandler中)
- 完善了 ActionRepairer 中的修复逻辑,但自对话通道的暴露方式、DynamicActionGenerator 的具体逻辑待完善
2025-12-05 21:58:21 +08:00
ad973d4230 对 ActionExecutor 下子模块的功能分布、某些实体类进行了调整; 完善了 ActionExecutor 中的大致执行逻辑
- 梳理执行链路时发现 ActionRepairer 的能力明显超出可实现边界,故将其能力进行限定
- 新增 ActionCorrector 负责单组行动执行完毕后,根据意图和执行状况进行行动链修正
- 将 PhaserRecord 拆分为独立实体,未来将封装一部分流程控制逻辑
2025-12-02 22:35:53 +08:00
1d315a9b62 ActionExecutor 的执行流程规划完毕,具体逻辑待填充
- 调整了部分代码分布,移除了某些非必需的转发方法
- 新增几个 TODO 内容,后续工作已明确

这套调度方式看起来真的有些‘探索性质’了。实际上看起来有些像把 ReAct 的逻辑显式地进行了工程实现,不管是修复、依据状态选择行动单元生成还是阶段间针对行动单元的参数提取,在 ReAct Agent 中都是由一个智能体完成的。

但在这里,它要做的事情太多了,再加上 Partner 行动链的干预逻辑、幻觉参数又不可接受所以需要自对话或者用户干预,这些东西交给一个 ReAct 模块恐怕并不合适也不放心。所以这种显式模块划分应该更符合 Partner 行动模块的需求。

这点硬要说的话,应该还是在于‘ReAct 行为’并非 Partner 的全部吧。

不过谁知道呢,也许以后也会变,但这套至少现在看来是更能实现理想行为的
2025-12-01 19:25:21 +08:00
4e32129b31 优化行动链结构及相关组件、针对 ActionPlanner 相关组件做出调整
- 将existedMetaActions的实现由LinkedHashMap替换为HashMap,免去不必要的性能消耗
- 在 ActionCapability 中新增 listAvailableActions 方法用于获取当前存在的可用行动
- 将 ActionData 及相关类中的 LinkedHashMap 替换为普通Map,阶段并发将通过遍历key集合进行,而非针对原始行动链进行遍历
- 在 ActionPlanner 中完善行动链依赖修正逻辑,防止行动单元执行时的输入缺失
- 在 ActionEvaluator 中调整了 Prompt 构建方式
- 调整处理行动链相关代码,移除多余参数,简化方法签名
- 修正 EvaluatorResult 中行动链数据结构为Map,LLM将直接返回初始行动链,后续将加载行动数据并修复行动单元间的依赖关系
- 优化 InterventionHandler、ActionExecutor 等模块中对行动链Map的使用
2025-12-01 17:20:54 +08:00
3f59719e16 调整 MetaAction 的执行方式,将交给 ActionCapability、SandBoxRunnerClient 执行 2025-11-30 22:16:57 +08:00
c548cceec6 新增 SandboxRunner 项目子模块,该模块将在指定容器运行持久服务,与外部主进程通信,将用于后续执行JARSCRIPT两类行动类型 2025-11-30 18:41:42 +08:00
b3098310b4 完善了 ActionConfirmer 的遗漏逻辑 2025-11-30 15:16:57 +08:00
f48d559a7b 调整了 ActionInterventor 中数据构建方法的组织方式 2025-11-30 14:38:50 +08:00
14a57f0be6 推进行动干预模块,前置部分逻辑已基本完成
- 在`ActionData`中添加必要注释、新增`executingStage`字段表示当前执行阶段、移除了`WAITING`的状态类型
- 调整并修正了`ActionExecutor`中的`Phaser`阻塞逻辑
- 完善了`ActionInterventor`中`识别 -> 评估 -> 异步执行`的干预逻辑,并将干预结果以 Prompt 形式回写至流程上下文,作为主模块的已知内容
- 调整了干预模块内部的各个数据类的字段结构,适配干预流程
- 完善了`InterventionEvaluator`、`InterventionHandler`、`InterventionRecognizer`等必需的干预子模块
2025-11-29 20:56:29 +08:00
dff7b69b51 更新 README 2025-11-12 19:53:48 +08:00
d77ffd1db6 Merge remote-tracking branch 'origin/doc/architechture' into doc/architechture 2025-11-11 16:51:18 +08:00
264cdb09e5 推进行动干预模块; 接下来将进一步完善 InterventionHandler 的具体内容
- 调整相关目录为 interventor
-  调整了某些 ActionInterventor 的子模块用到的数据类结构
- 完善了 InterventionEvaluator 的具体逻辑
- 为 InterventionType 添加了注释,并新增了 CANCEL 干预类型
2025-11-11 16:11:09 +08:00
fea7f9c81f PerceiveSelector、PeiceiveUpdater 流程图制作完毕 2025-11-11 08:47:21 +08:00
a1520f117b 推进行动干预模块
- 完善了 ActionInterventor 中的具体逻辑以及不同情况下的prompt填充内容;
- 调整了 PreRunningModule 中的 getPromptDataMap 方法;
- 在 ActionCapability 中新增了检查 actionKey 是否存在的逻辑
2025-11-10 23:02:48 +08:00
ae5caf8475 更新 memory.md 2025-11-10 18:59:05 +08:00
980d9384d1 MemoryUpdater 流程图制作完毕 2025-11-08 17:33:05 +08:00
9ba0d1363a 创建了 action、memory、perceive 三类模块的流程文档; 完成了记忆模块中 MemorySelector 的流程图 2025-11-07 15:14:29 +08:00
f6d5cad5cd 更新 README 2025-11-07 13:51:30 +08:00
c3ca4145b8 推进行动干预模块
- 完善了大致的执行流程
- 明确并创建了评估与处理所需的数据类及干预类型
- 不同情况的Prompt处理结果、评估和处理的具体流程需要进一步完善
2025-11-06 22:07:27 +08:00
5419722c40 更新文档内容 2025-11-06 11:17:25 +08:00
31ebee3ded 制作了整体流程图 2025-11-06 11:14:37 +08:00
746fda1a5e 干预意图提取模块初步完成,Prompt 待制定; 在 ChatClient 中添加了默认的超时设定,超时时间后续可能需要调整。
另: 发现很多细节错误,比如“各个后置模块允许执行的条件”、“主模块出现异常时需要如何处理”、“模块Prompt的构建方式、采用格式不统一”等,需要后续进行修复或调整
2025-10-31 21:26:45 +08:00
ec4fbb7f19 行动干预足以抽离为新的前置模块,但仍属于‘行动’语义,大致框架已确立。后续实现时并发控制、各种干预的协调与触发时机需要注意。 2025-10-31 21:26:45 +08:00
f9c3cacfea 推进 ActionExecutor 相关的动态插拔式行动调度机制
- 移除先前构想的 SpecializedPartnerInputData 及相关类,无论是自反思、向用户求助还是用户主动干预,都应当通过语义识别来作用于对应行动事件,使用固定行动id的机制不足以支撑这种机制
- 在 ActionCore 中新增执行中行动的 phaser 管理逻辑
- 新增几个异常类,适用于行动数据加载的异常情况
- 新增 ActionIdentifier 负责行动干预意图的识别
-
2025-10-31 21:25:12 +08:00
e35e18f3b7 推进 ActionExecutor、确定动态插拔式行动调度的实现思路
- 在 ActionCore 中添加关闭hook,用于正确设置异常中断时执行中任务的状态
- 修正 actionPool 相关注释及用法
- 将 ActionData 中行动链字段调整为 LinkedHashMap 用于更好地支持分组并发及动态调度
- 重构 ActionExecutor 行动链执行逻辑,采用 Phaser 支持动态调度
- 扩展 InputData、Context 字段并调整 GateWay 格式化逻辑以适应特殊输入
2025-10-31 21:25:12 +08:00
83832d2060 推进 ActionExecutor、针对action core做出了一些调整
- 将 ActionWatchService 抽取为独立的类,使用构造参数传递所需内容
- ActionCore 中除了pendingAction外,将只维护一个行动池,通过用户键和STATUS区分类型
- 开始推进 ActionExecutor,但其中的同组并发、动态行动链、行动间参数对齐、参数重构等内容需要仔细考虑
2025-10-31 21:25:12 +08:00
4757425a15 推进 ActionDispatcher 模块、完善行动程序规范与加载逻辑
- 明确行动程序的存储形式与加载规则,分为执行程序和描述文件,前者负责逻辑,后者提供必要的描述性信息;
- 将 ActionInfo 重命名为 ActionData,更新相关接口和实现,增强代码一致性和可读性;
- 添加异常处理类以支持行动程序、描述信息的初始化和加载失败的场景;
- 实现行动程序目录的监控功能,支持行动程序的动态加载与管理;
- 明确了 ActionDispatcher 两个子模块的输入输出规范
2025-10-31 21:25:12 +08:00
21b3a0e846 开始推进 ActionDispatcher 模块
- ActionDispatcher 划分为 ActionScheduler 和 ActionExecutor 两个子模块,分别负责处理计划任务和即时任务
- 正式确定 Action 将以 ActionChain 的形式进行执行,也采用同组并发策略,按照 order 字段在 chain 中进行排序
- 调整了 ActionInfo 等类以适应当前的元行动类
- 对于行动能力的支持,或可考虑这几种方式: Agent自生成python脚本(必须经过验证,确认可执行且无风险)、MCP调用(需适配为Partner所支持的形式)、普通插件(在指定目录动态加载)
2025-10-31 21:25:12 +08:00
6bfa941c35 更新 README 2025-10-31 21:24:46 +08:00
456a7e04e8 更新 README 2025-10-24 17:29:55 +08:00
5864760f35 Action 模块语义缓存机制实现完毕,支持三种情况的语义缓存相关行为: 命中缓存且评估通过、命中缓存但评估未通过、未命中缓存但评估通过。将在评估过后步入主模块之前,进行异步更新操作(借助@AfterExecute注解,通过虚拟线程进入异步流程,在真正调用处使用平台线程加速计算) 2025-10-19 22:05:27 +08:00
aee6d879e9 推进 Action 模块语义缓存机制
- 完善缓存命中部分;
- 调整 ActionExtractor 以适配缓存逻辑
- 缓存更新大致框架待填充具体更新逻辑;
2025-10-18 21:56:50 +08:00
d1ea8dde79 推进 ActionExtractor 语义缓存机制: 移除了 VectorUtil,实现了 ollama、onnx runtime 两种向量客户端,通过 Agent 启动类暴露的后置启动任务加载并进行测试。 2025-10-17 11:20:11 +08:00
7094a8a68b 推进 ActionExtractor 语义缓存机制: 两种嵌入模型的连接方式测试完毕,在高性能主机上,可以通过ollama调用mxbai-embed-large这类模型,但放到4核8G香橙派3B就会出现推理时长过长,哪怕换成ONNX RUNTIME JAVA 也难以避免,但如果更换成 nomic-embed-text + ONNX RUNTIME JAVA ,仍能够拿到70左右ms的推理时长,远低于提取模型以及向量模型API的调用时长。预期可提供两种语义缓存所用的嵌入模型接入方式: 通过 http 调用 本地ollama接口; 指定 ONNX 格式的嵌入模型直接调用。 2025-10-16 23:04:41 +08:00
e78048f66d 推进 ActionExtractor: 新增语义向量计算工具;开始推进语义缓存相关;调整配置类格式 2025-10-16 15:39:38 +08:00
2f09c0cd71 推进 ActionExtractor: 完善大致逻辑,开始语义-行为缓存相关部分 2025-10-16 15:39:31 +08:00
8c43d6594f 推进 ActionPlanner: 新增行动确认机制,将与原‘提取-评估’流程并发执行; 将繁杂的装配逻辑封装在内部类ActionAssemblyHelper
# Conflicts:
#	Partner-Main/src/main/java/work/slhaf/partner/core/cache/CacheCapability.java
#	Partner-Main/src/main/java/work/slhaf/partner/core/memory/MemoryCore.java
#	Partner-Main/src/main/java/work/slhaf/partner/module/modules/memory/selector/MemorySelector.java
2025-10-16 15:39:16 +08:00
2d052442b1 推进 ActionPlanner: 添加行动短路机制,如果未提取到行动,则跳过评估子模块 2025-10-16 15:34:30 +08:00
84f7befb75 推进 ActionPlanner: 完成了 ActionPlanner 模块中的执行逻辑,同步调整了数据类中的字段。下一步将进行 ActionPlanner 子模块的开发。 2025-10-16 15:34:30 +08:00
140 changed files with 8887 additions and 213 deletions

4
.gitignore vendored
View File

@@ -54,3 +54,7 @@ build/
/config/ /config/
/data/ /data/
/generated-classes/ /generated-classes/
/.idea/copilot.data.migration.ask2agent.xml
/Partner-Main/data/
/AGENTS.md
/.serena/

6
.idea/copilot.data.migration.agent.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AgentMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

6
.idea/copilot.data.migration.ask.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AskMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

6
.idea/copilot.data.migration.edit.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="EditMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

3
.idea/encodings.xml generated
View File

@@ -3,10 +3,13 @@
<component name="Encoding"> <component name="Encoding">
<file url="file://$PROJECT_DIR$/Partner-Api/src/main/java" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/Partner-Api/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Api/src/main/resources" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/Partner-Api/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Common/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Common/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Main/src/main/java" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/Partner-Main/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Main/src/main/java/src/main/java" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/Partner-Main/src/main/java/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Main/src/main/java/src/main/resources" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/Partner-Main/src/main/java/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Main/src/main/resources" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/Partner-Main/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-SandboxRunner/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Test-Demo/src/main/java" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/Partner-Test-Demo/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/Partner-Test-Demo/src/main/resources" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/Partner-Test-Demo/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />

19
.idea/misc.xml generated
View File

@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="EntryPointsManager"> <component name="EntryPointsManager">
<list size="14"> <list size="15">
<item index="0" class="java.lang.String" itemvalue="lombok.Data" /> <item index="0" class="java.lang.String" itemvalue="lombok.Data" />
<item index="1" class="java.lang.String" itemvalue="net.bytebuddy.implementation.bind.annotation.RuntimeType" /> <item index="1" class="java.lang.String" itemvalue="net.bytebuddy.implementation.bind.annotation.RuntimeType" />
<item index="2" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.capability.annotation.Capability" /> <item index="2" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.capability.annotation.Capability" />
@@ -11,18 +10,24 @@
<item index="6" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.capability.annotation.Coordinated" /> <item index="6" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.capability.annotation.Coordinated" />
<item index="7" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.AfterExecute" /> <item index="7" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.AfterExecute" />
<item index="8" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.AgentModule" /> <item index="8" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.AgentModule" />
<item index="9" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.BeforeExecute" /> <item index="9" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule" />
<item index="10" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.Init" /> <item index="10" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.BeforeExecute" />
<item index="11" class="java.lang.String" itemvalue="work.slhaf.partner.api.capability.annotation.CapabilityMethod" /> <item index="11" class="java.lang.String" itemvalue="work.slhaf.partner.api.agent.factory.module.annotation.Init" />
<item index="12" class="java.lang.String" itemvalue="work.slhaf.partner.api.capability.annotation.CoordinateManager" /> <item index="12" class="java.lang.String" itemvalue="work.slhaf.partner.api.capability.annotation.CapabilityMethod" />
<item index="13" class="java.lang.String" itemvalue="work.slhaf.partner.api.register.capability.annotation.Capability" /> <item index="13" class="java.lang.String" itemvalue="work.slhaf.partner.api.capability.annotation.CoordinateManager" />
<item index="14" class="java.lang.String" itemvalue="work.slhaf.partner.api.register.capability.annotation.Capability" />
</list> </list>
<writeAnnotations>
<writeAnnotation name="work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability" />
<writeAnnotation name="work.slhaf.partner.api.agent.factory.module.annotation.InjectModule" />
</writeAnnotations>
</component> </component>
<component name="ExternalStorageConfigurationManager" enabled="true" /> <component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager"> <component name="MavenProjectsManager">
<option name="originalFiles"> <option name="originalFiles">
<list> <list>
<option value="$PROJECT_DIR$/pom.xml" /> <option value="$PROJECT_DIR$/pom.xml" />
<option value="$PROJECT_DIR$/PartnerExecutor/pom.xml" />
</list> </list>
</option> </option>
</component> </component>

1
.idea/vcs.xml generated
View File

@@ -2,6 +2,5 @@
<project version="4"> <project version="4">
<component name="VcsDirectoryMappings"> <component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" /> <mapping directory="$PROJECT_DIR$" vcs="Git" />
<mapping directory="$USER_HOME$/Projects/IdeaProjects/Projects/Partner" vcs="Git" />
</component> </component>
</project> </project>

View File

@@ -1,33 +0,0 @@
autoDetectedPackages:
- factory
- module
- work.slhaf
enableAutoDetect: true
entryDisplayConfig:
excludedPathPatterns: []
skipJsCss: true
funcDisplayConfig:
skipConstructors: false
skipFieldAccess: true
skipFieldChange: true
skipGetters: false
skipNonProjectPackages: false
skipPrivateMethods: false
skipSetters: false
ignoreSameClassCall: null
ignoreSamePackageCall: null
includedPackagePrefixes: null
includedParentClasses: null
maxColSize: 32
maxNumFirst: 12
maxNumFirstImportant: 1024
maxNumHash: 3
maxNumHashImportant: 256
maxObjectDepth: 4
maxStrSize: 4096
name: xcodemap-filter
openMainWindow: true
recordMode: manual
sourceDisplayConfig:
color: blue
startOnDebug: false

View File

@@ -25,11 +25,7 @@ public class AgentRunningFlow<C extends RunningFlowContext> {
List<MetaModule> moduleList = entry.getValue(); List<MetaModule> moduleList = entry.getValue();
for (MetaModule module : moduleList) { for (MetaModule module : moduleList) {
Future<?> future = executor.submit(() -> { Future<?> future = executor.submit(() -> {
try {
module.getInstance().execute(interactionContext); module.getInstance().execute(interactionContext);
} catch (Exception e) {
throw new AgentRuntimeException("模块执行出错: " + module.getName(), e);
}
}); });
futures.add(future); futures.add(future);
} }

View File

@@ -23,7 +23,7 @@ public interface ActivateModel {
ModelConfig modelConfig = AgentConfigManager.INSTANCE.loadModelConfig(modelKey()); ModelConfig modelConfig = AgentConfigManager.INSTANCE.loadModelConfig(modelKey());
model.setBaseMessages(withBasicPrompt() ? loadSpecificPromptAndBasicPrompt(modelKey()) : loadSpecificPrompt(modelKey())); model.setBaseMessages(withBasicPrompt() ? loadSpecificPromptAndBasicPrompt(modelKey()) : loadSpecificPrompt(modelKey()));
model.setChatClient(new ChatClient(modelConfig.getBaseUrl(), modelConfig.getApikey(), modelConfig.getModel())); model.setChatClient(new ChatClient(modelConfig.getBaseUrl(), modelConfig.getApikey(), modelConfig.getModel()));
((Module) this).setModel(model); setModel(model);
} }
default void updateModelSettings(ChatClient newChatClient) { default void updateModelSettings(ChatClient newChatClient) {

View File

@@ -7,14 +7,12 @@ import work.slhaf.partner.api.agent.factory.module.annotation.BeforeExecute;
import work.slhaf.partner.api.agent.factory.module.annotation.CoreModule; import work.slhaf.partner.api.agent.factory.module.annotation.CoreModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.entity.RunningFlowContext; import work.slhaf.partner.api.agent.runtime.interaction.flow.entity.RunningFlowContext;
import java.io.IOException;
/** /**
* 流程执行模块基类 * 流程执行模块基类
*/ */
@Slf4j @Slf4j
public abstract class AgentRunningModule<C extends RunningFlowContext> extends Module { public abstract class AgentRunningModule<C extends RunningFlowContext> extends Module {
public abstract void execute(C context) throws IOException, ClassNotFoundException; public abstract void execute(C context);
@BeforeExecute @BeforeExecute
private void beforeLog() { private void beforeLog() {

View File

@@ -1,10 +1,12 @@
package work.slhaf.partner.api.chat; package work.slhaf.partner.api.chat;
import cn.hutool.core.io.IORuntimeException;
import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse; import cn.hutool.http.HttpResponse;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.chat.constant.ChatConstant; import work.slhaf.partner.api.chat.constant.ChatConstant;
import work.slhaf.partner.api.chat.pojo.ChatBody; import work.slhaf.partner.api.chat.pojo.ChatBody;
import work.slhaf.partner.api.chat.pojo.ChatResponse; import work.slhaf.partner.api.chat.pojo.ChatResponse;
@@ -13,6 +15,7 @@ import work.slhaf.partner.api.chat.pojo.PrimaryChatResponse;
import java.util.List; import java.util.List;
@Slf4j
@Data @Data
@NoArgsConstructor @NoArgsConstructor
public class ChatClient { public class ChatClient {
@@ -34,6 +37,8 @@ public class ChatClient {
public ChatResponse runChat(List<Message> messages) { public ChatResponse runChat(List<Message> messages) {
HttpRequest request = HttpRequest.post(url); HttpRequest request = HttpRequest.post(url);
request.setConnectionTimeout(2000);
request.setReadTimeout(15000);
request.header("Content-Type", "application/json"); request.header("Content-Type", "application/json");
request.header("Authorization", "Bearer " + apikey); request.header("Authorization", "Bearer " + apikey);
@@ -53,17 +58,26 @@ public class ChatClient {
.build(); .build();
} }
HttpResponse response = request.body(JSONUtil.toJsonStr(body)).execute();
ChatResponse finalResponse; ChatResponse finalResponse;
try {
HttpResponse response = request.body(JSONUtil.toJsonStr(body)).execute();
PrimaryChatResponse primaryChatResponse = JSONUtil.toBean(response.body(), PrimaryChatResponse.class); PrimaryChatResponse primaryChatResponse = JSONUtil.toBean(response.body(), PrimaryChatResponse.class);
finalResponse = ChatResponse.builder() finalResponse = ChatResponse.builder()
.type(ChatConstant.Response.SUCCESS) .status(ChatConstant.ResponseStatus.SUCCESS)
.message(primaryChatResponse.getChoices().get(0).getMessage().getContent()) .message(primaryChatResponse.getChoices().get(0).getMessage().getContent())
.usageBean(primaryChatResponse.getUsage()) .usageBean(primaryChatResponse.getUsage())
.build(); .build();
response.close(); response.close();
} catch (IORuntimeException e) {
log.error("请求超时", e);
finalResponse = ChatResponse.builder()
.message("连接超时")
.status(ChatConstant.ResponseStatus.FAILED)
.usageBean(null)
.build();
}
return finalResponse; return finalResponse;
} }

View File

@@ -8,8 +8,7 @@ public class ChatConstant {
public static final String ASSISTANT = "assistant"; public static final String ASSISTANT = "assistant";
} }
public static class Response { public enum ResponseStatus {
public static final String SUCCESS = "success"; SUCCESS, FAILED
public static final String ERROR = "error";
} }
} }

View File

@@ -4,13 +4,14 @@ import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import work.slhaf.partner.api.chat.constant.ChatConstant;
@Data @Data
@Builder @Builder
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
public class ChatResponse { public class ChatResponse {
private String type; private ChatConstant.ResponseStatus status;
private String message; private String message;
private PrimaryChatResponse.UsageBean usageBean; private PrimaryChatResponse.UsageBean usageBean;
} }

28
Partner-Common/pom.xml Normal file
View File

@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>work.slhaf</groupId>
<artifactId>Partner</artifactId>
<version>0.5.0</version>
</parent>
<artifactId>Partner-Common</artifactId>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.modelcontextprotocol.sdk/mcp -->
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp</artifactId>
<version>0.17.0</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@@ -0,0 +1,155 @@
package work.slhaf.partner.common.mcp;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.json.McpJsonMapper;
import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.server.McpStatelessServerHandler;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpStatelessServerTransport;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
public final class InProcessMcpTransport implements McpClientTransport, McpStatelessServerTransport {
// 每个 transport 只处理一条 inbound 流
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound =
Sinks.many().unicast().onBackpressureBuffer();
private final AtomicBoolean clientConnected = new AtomicBoolean(false);
private final AtomicBoolean serverConnected = new AtomicBoolean(false);
/**
* 对端
*/
private volatile InProcessMcpTransport peer;
private volatile McpStatelessServerHandler serverHandler;
public record Pair(InProcessMcpTransport clientSide, InProcessMcpTransport serverSide) {
}
public static Pair pair() {
InProcessMcpTransport client = new InProcessMcpTransport();
InProcessMcpTransport server = new InProcessMcpTransport();
client.peer = server;
server.peer = client;
return new Pair(client, server);
}
/* ======================================================
* Internal receive: peer.sendMessage -> this.receive
* ====================================================== */
private void receive(McpSchema.JSONRPCMessage message) {
if (inbound.tryEmitNext(message).isFailure()) {
throw new RuntimeException("Failed to receive message: " + message);
}
}
/* ======================================================
* Client → Server sendMessage
* ====================================================== */
@Override
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
InProcessMcpTransport p = this.peer;
if (p == null) {
return Mono.error(new IllegalStateException("Transport is not linked"));
}
return Mono.fromRunnable(() -> p.receive(message));
}
/* ======================================================
* Client connect(handler) 处理 server → client 消息
* ====================================================== */
@Override
public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
if (!clientConnected.compareAndSet(false, true)) {
return Mono.error(new IllegalStateException("Client already connected"));
}
return inbound.asFlux()
.concatMap(msg ->
handler.apply(Mono.just(msg))
// handler may emit response message → send back to server
.flatMap(resp -> resp != null ? sendMessage(resp) : Mono.empty())
)
.doFinally(sig -> clientConnected.set(false))
.then();
}
@Override
public void setExceptionHandler(Consumer<Throwable> handler) {
McpClientTransport.super.setExceptionHandler(handler);
}
/* ======================================================
* Server: bind stateless handler = process client → server inbound
* ====================================================== */
@Override
public void setMcpHandler(McpStatelessServerHandler handler) {
this.serverHandler = handler;
if (!serverConnected.compareAndSet(false, true)) {
throw new IllegalStateException("Server already connected");
}
// 订阅 client → server 消息
inbound.asFlux()
.concatMap(this::handleServerMessage)
.doFinally(sig -> serverConnected.set(false))
.subscribe();
}
/**
* Server 端处理 JSONRPCMessage
*/
private Mono<Void> handleServerMessage(McpSchema.JSONRPCMessage msg) {
// 创建 transport context简单实现即可
McpTransportContext ctx = key -> null;
if (msg instanceof McpSchema.JSONRPCRequest req) {
return serverHandler.handleRequest(ctx, req)
.flatMap(this::sendMessage);
}
if (msg instanceof McpSchema.JSONRPCNotification noti) {
return serverHandler.handleNotification(ctx, noti);
}
return Mono.empty();
}
/* ======================================================
* other boilerplate
* ====================================================== */
@Override
public void close() {
McpClientTransport.super.close();
}
@Override
public Mono<Void> closeGracefully() {
inbound.tryEmitComplete();
clientConnected.set(false);
serverConnected.set(false);
return Mono.empty();
}
@Override
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
return McpJsonMapper.getDefault().convertValue(data, typeRef);
}
@Override
public List<String> protocolVersions() {
return McpClientTransport.super.protocolVersions();
}
}

View File

@@ -26,7 +26,59 @@
<groupId>org.jetbrains.kotlinx</groupId> <groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId> <artifactId>kotlinx-coroutines-core</artifactId>
<version>1.10.2</version> <version>1.10.2</version>
<scope>test</scope> </dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-test</artifactId>
<version>1.10.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.nd4j/nd4j-api -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-api</artifactId>
<version>1.0.0-M2.1</version>
</dependency>
<dependency>
<groupId>com.microsoft.onnxruntime</groupId>
<artifactId>onnxruntime</artifactId>
<version>1.23.1</version>
</dependency>
<dependency>
<groupId>ai.djl.huggingface</groupId>
<artifactId>tokenizers</artifactId>
<version>0.34.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.modelcontextprotocol.sdk/mcp -->
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp</artifactId>
<version>0.17.0</version>
</dependency>
<dependency>
<groupId>work.slhaf</groupId>
<artifactId>Partner-Common</artifactId>
<version>0.5.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.20.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>5.20.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>9.2.1</version>
</dependency> </dependency>
</dependencies> </dependencies>
@@ -49,7 +101,8 @@
</goals> </goals>
<configuration> <configuration>
<transformers> <transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>work.slhaf.partner.Main</mainClass> <mainClass>work.slhaf.partner.Main</mainClass>
</transformer> </transformer>
</transformers> </transformers>

View File

@@ -2,6 +2,7 @@ package work.slhaf.partner;
import work.slhaf.partner.api.agent.Agent; import work.slhaf.partner.api.agent.Agent;
import work.slhaf.partner.common.config.PartnerAgentConfigManager; import work.slhaf.partner.common.config.PartnerAgentConfigManager;
import work.slhaf.partner.common.vector.VectorClient;
import work.slhaf.partner.runtime.exception.PartnerExceptionCallback; import work.slhaf.partner.runtime.exception.PartnerExceptionCallback;
import work.slhaf.partner.runtime.interaction.WebSocketGateway; import work.slhaf.partner.runtime.interaction.WebSocketGateway;
@@ -11,6 +12,7 @@ public class Main {
.setAgentConfigManager(PartnerAgentConfigManager.class) .setAgentConfigManager(PartnerAgentConfigManager.class)
.setGateway(WebSocketGateway.class) .setGateway(WebSocketGateway.class)
.setAgentExceptionCallback(PartnerExceptionCallback.class) .setAgentExceptionCallback(PartnerExceptionCallback.class)
.addAfterLaunchRunners(VectorClient::load)
.launch(); .launch();
} }
} }

View File

@@ -0,0 +1,10 @@
package work.slhaf.partner.common;
public final class Constant {
public static final class Path {
public static final String DATA = "data";
public static final String MEMORY_DATA = DATA + "/memory";
}
}

View File

@@ -4,6 +4,21 @@ import lombok.Data;
@Data @Data
public class Config { public class Config {
private int port;
private String agentId; private String agentId;
private WebSocketConfig webSocketConfig;
private VectorConfig vectorConfig;
@Data
public static class VectorConfig {
private int type;
private String ollamaEmbeddingUrl;
private String ollamaEmbeddingModel;
private String tokenizerPath;
private String embeddingModelPath;
}
@Data
public static class WebSocketConfig {
private int port;
}
} }

View File

@@ -33,8 +33,9 @@ public final class PartnerAgentConfigManager extends FileAgentConfigManager {
if (config == null || config.getAgentId() == null) { if (config == null || config.getAgentId() == null) {
throw new ConfigLoadFailedException("Partner Config Load Failed: " + COMMON_CONFIG_FILE); throw new ConfigLoadFailedException("Partner Config Load Failed: " + COMMON_CONFIG_FILE);
} }
if (config.getPort() <= 0 || config.getPort() > 65535) { int port = config.getWebSocketConfig().getPort();
throw new ConfigLoadFailedException("Invalid Websocket port: " + config.getPort()); if (port <= 0 || port > 65535) {
throw new ConfigLoadFailedException("Invalid Websocket port: " + port);
} }
} }
} }

View File

@@ -1,14 +1,9 @@
package work.slhaf.partner.common.thread; package work.slhaf.partner.common.thread;
import lombok.Getter; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Getter
public class InteractionThreadPoolExecutor { public class InteractionThreadPoolExecutor {
private static InteractionThreadPoolExecutor interactionThreadPoolExecutor; private static InteractionThreadPoolExecutor interactionThreadPoolExecutor;
@@ -33,9 +28,29 @@ public class InteractionThreadPoolExecutor {
public <T> void invokeAll(List<Callable<T>> tasks) { public <T> void invokeAll(List<Callable<T>> tasks) {
try { try {
executorService.invokeAll(tasks); List<Future<T>> futures = executorService.invokeAll(tasks);
for (Future<T> future : futures) {
future.get();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
public <T> List<T> invokeAllAndReturn(List<Callable<T>> tasks) {
try {
List<Future<T>> futures = executorService.invokeAll(tasks);
List<T> results = new ArrayList<>();
for (Future<T> future : futures) {
results.add(future.get());
}
return results;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} }
} }

View File

@@ -0,0 +1,14 @@
package work.slhaf.partner.common.util;
public class PathUtil {
public static String buildPathStr(String... path) {
StringBuilder str = new StringBuilder();
for (int i = 0; i < path.length; i++) {
str.append(path[i]);
if (i < path.length - 1) {
str.append("/");
}
}
return str.toString();
}
}

View File

@@ -0,0 +1,48 @@
package work.slhaf.partner.common.vector;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.common.vector.exception.VectorClientExecuteException;
import java.util.Map;
@Slf4j
public class OllamaVectorClient extends VectorClient {
private String ollamaEmbeddingUrl;
private String ollamaEmbeddingModel;
protected OllamaVectorClient(String url, String model) {
this.ollamaEmbeddingUrl = url;
this.ollamaEmbeddingModel = model;
compute("test");
}
@Override
protected float[] doCompute(String input) {
Map<String, String> param = Map.of("model", ollamaEmbeddingModel, "input", input);
HttpRequest request = HttpRequest.get(ollamaEmbeddingUrl).body(JSONObject.toJSONString(param));
try (HttpResponse response = request.execute()) {
if (!response.isOk())
throw new VectorClientExecuteException("嵌入模型执行出错");
String resStr = response.body();
EmbeddingModelResponse embeddingResponse = JSONObject.parseObject(resStr, EmbeddingModelResponse.class);
return embeddingResponse.getEmbeddings()[0];
} catch (Exception e) {
throw new VectorClientExecuteException("嵌入模型执行出错", e);
}
}
@Data
private static class EmbeddingModelResponse {
private String model;
private float[][] embeddings;
private long total_duration;
private long load_duration;
private int prompt_eval_count;
}
}

View File

@@ -0,0 +1,82 @@
package work.slhaf.partner.common.vector;
import ai.djl.huggingface.tokenizers.Encoding;
import ai.djl.huggingface.tokenizers.HuggingFaceTokenizer;
import ai.onnxruntime.OnnxTensor;
import ai.onnxruntime.OrtEnvironment;
import ai.onnxruntime.OrtSession;
import work.slhaf.partner.common.vector.exception.VectorClientExecuteException;
import work.slhaf.partner.common.vector.exception.VectorClientLoadFailedException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("FieldMayBeFinal")
public class OnnxVectorClient extends VectorClient {
private String tokenizerPath;
private String modelPath;
private HuggingFaceTokenizer tokenizer;
private OrtSession session;
private OrtEnvironment env;
protected OnnxVectorClient(String tokenizer, String model) {
this.tokenizerPath = tokenizer;
this.modelPath = model;
loadTokenizer();
loadModel();
compute("test");
}
private void loadModel() {
try {
env = OrtEnvironment.getEnvironment();
OrtSession.SessionOptions ops = new OrtSession.SessionOptions();
session = env.createSession(modelPath, ops);
} catch (Exception e) {
throw new VectorClientLoadFailedException("加载ONNX模型失败", e);
}
}
private void loadTokenizer() {
try {
tokenizer = HuggingFaceTokenizer.newInstance(Path.of(tokenizerPath));
} catch (Exception e) {
throw new VectorClientLoadFailedException("加载Tokenizer失败", e);
}
}
@Override
protected float[] doCompute(String input) {
try {
Encoding encode = tokenizer.encode(input);
long[] ids = encode.getIds();
long[] attentionMask = encode.getAttentionMask();
long[][] inputIdsBatch = {ids};
long[][] attentionMaskBatch = {attentionMask};
long[][] tokenTypeIdsBatch = {new long[ids.length]}; // 初始化全 0
for (int i = 0; i < ids.length; i++)
tokenTypeIdsBatch[0][i] = 0;
OnnxTensor inputTensor = OnnxTensor.createTensor(env, inputIdsBatch);
OnnxTensor maskTensor = OnnxTensor.createTensor(env, attentionMaskBatch);
OnnxTensor tokenTypeTensor = OnnxTensor.createTensor(env, tokenTypeIdsBatch);
Map<String, OnnxTensor> inputs = new HashMap<>();
inputs.put("input_ids", inputTensor);
inputs.put("attention_mask", maskTensor);
inputs.put("token_type_ids", tokenTypeTensor);
OrtSession.Result result = session.run(inputs);
OnnxTensor embeddingTensor = (OnnxTensor) result.get(0);
return embeddingTensor.getFloatBuffer().array();
} catch (Exception e) {
throw new VectorClientExecuteException("嵌入模型执行出错", e);
}
}
}

View File

@@ -0,0 +1,88 @@
package work.slhaf.partner.common.vector;
import lombok.extern.slf4j.Slf4j;
import org.nd4j.linalg.api.ndarray.INDArray;
import org.nd4j.linalg.factory.Nd4j;
import org.nd4j.linalg.ops.transforms.Transforms;
import work.slhaf.partner.api.agent.runtime.config.AgentConfigManager;
import work.slhaf.partner.common.config.Config.VectorConfig;
import work.slhaf.partner.common.config.PartnerAgentConfigManager;
import work.slhaf.partner.common.exception.ServiceLoadFailedException;
import work.slhaf.partner.common.vector.exception.VectorClientExecuteException;
import work.slhaf.partner.common.vector.exception.VectorClientLoadFailedException;
@Slf4j
public abstract class VectorClient {
public static boolean status;
public static VectorClient INSTANCE;
public static void load() {
PartnerAgentConfigManager configManager = (PartnerAgentConfigManager) AgentConfigManager.INSTANCE;
VectorConfig vectorConfig = configManager.getConfig().getVectorConfig();
int type = vectorConfig.getType();
try {
switch (type) {
case 0:
status = false;
break;
case 1:
status = true;
INSTANCE = new OllamaVectorClient(vectorConfig.getOllamaEmbeddingUrl(),
vectorConfig.getOllamaEmbeddingModel());
break;
case 2:
status = true;
INSTANCE = new OnnxVectorClient(vectorConfig.getTokenizerPath(),
vectorConfig.getEmbeddingModelPath());
break;
default:
throw new ServiceLoadFailedException(
"加载向量客户端失败! type: 0 -> 不启用语义缓存; type: 1 -> ollama; type: 2 -> ONNX RUNTIME");
}
log.info("向量客户端加载完毕");
} catch (VectorClientLoadFailedException | VectorClientExecuteException exception) {
status = false;
log.error("向量客户端加载失败", exception);
}
}
public float[] compute(String input) {
if (!status) {
return null;
}
return doCompute(input);
}
protected abstract float[] doCompute(String input);
public double compare(float[] v1, float[] v2) {
if (!status) {
return 0;
}
try (INDArray a1 = Nd4j.create(v1); INDArray a2 = Nd4j.create(v2)) {
return Transforms.cosineSim(a1, a2);
}
}
public float[] weightedAverage(float[] newVector, float[] primaryVector) {
try (INDArray primary = Nd4j.create(primaryVector);
INDArray latest = Nd4j.create(newVector)) {
// 1⃣ 计算余弦相似度
double similarity = Transforms.cosineSim(primary, latest);
// 2⃣ 根据相似度决定更新比例 α(差异越大,新输入影响越强)
double alpha = (1.0 - similarity) * 0.5;
alpha = Math.max(0.05, Math.min(alpha, 0.5));
// 3⃣ 按比例混合旧向量与新向量
INDArray updated = primary.mul(1 - alpha).add(latest.mul(alpha));
// 4⃣ 归一化结果(保持方向空间一致)
updated = updated.div(updated.norm2Number());
return updated.toFloatVector();
}
}
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.partner.common.vector.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class VectorClientExecuteException extends AgentRuntimeException {
public VectorClientExecuteException(String message) {
super(message);
}
public VectorClientExecuteException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.partner.common.vector.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class VectorClientLoadFailedException extends AgentRuntimeException {
public VectorClientLoadFailedException(String message) {
super(message);
}
public VectorClientLoadFailedException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -13,10 +13,11 @@ import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import static work.slhaf.partner.common.Constant.Path.MEMORY_DATA;
@Slf4j @Slf4j
public abstract class PartnerCore<T extends PartnerCore<T>> extends PersistableObject { public abstract class PartnerCore<T extends PartnerCore<T>> extends PersistableObject {
private static final String STORAGE_DIR = "./data/memory/";
private final String id = ((PartnerAgentConfigManager) AgentConfigManager.INSTANCE).getConfig().getAgentId(); private final String id = ((PartnerAgentConfigManager) AgentConfigManager.INSTANCE).getConfig().getAgentId();
public PartnerCore() throws IOException, ClassNotFoundException { public PartnerCore() throws IOException, ClassNotFoundException {
@@ -53,7 +54,7 @@ public abstract class PartnerCore<T extends PartnerCore<T>> extends PersistableO
public void serialize() throws IOException { public void serialize() throws IOException {
//先写入到临时文件,如果正常写入则覆盖原文件 //先写入到临时文件,如果正常写入则覆盖原文件
Path filePath = getFilePath(id + "-temp"); Path filePath = getFilePath(id + "-temp");
Files.createDirectories(Path.of(STORAGE_DIR)); Files.createDirectories(Path.of(MEMORY_DATA));
try { try {
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(filePath.toFile())); ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(filePath.toFile()));
oos.writeObject(this); oos.writeObject(this);
@@ -78,12 +79,12 @@ public abstract class PartnerCore<T extends PartnerCore<T>> extends PersistableO
} }
private Path getFilePath(String s) { private Path getFilePath(String s) {
return Paths.get(STORAGE_DIR, s + "-" + getCoreKey() + ".memory"); return Paths.get(MEMORY_DATA, s + "-" + getCoreKey() + ".memory");
} }
private void createStorageDirectory() { private void createStorageDirectory() {
try { try {
Files.createDirectories(Paths.get(STORAGE_DIR)); Files.createDirectories(Paths.get(MEMORY_DATA));
} catch (IOException e) { } catch (IOException e) {
log.error("[{}]创建存储目录失败: {}", getCoreKey(), e.getMessage()); log.error("[{}]创建存储目录失败: {}", getCoreKey(), e.getMessage());
} }

View File

@@ -0,0 +1,59 @@
package work.slhaf.partner.core.action;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;
import work.slhaf.partner.api.agent.factory.capability.annotation.Capability;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.PhaserRecord;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
@Capability(value = "action")
public interface ActionCapability {
void putAction(@NonNull ActionData actionData);
Set<ActionData> listActions(@Nullable ActionData.ActionStatus actionStatus, @Nullable String source);
List<ActionData> popPendingAction(String userId);
List<ActionData> listPendingAction(String userId);
void putPendingActions(String userId, ActionData actionData);
List<String> selectTendencyCache(String input);
void updateTendencyCache(CacheAdjustData data);
ExecutorService getExecutor(ActionCore.ExecutorType type);
PhaserRecord putPhaserRecord(Phaser phaser, ActionData actionData);
void removePhaserRecord(Phaser phaser);
List<PhaserRecord> listPhaserRecords();
PhaserRecord getPhaserRecord(String tendency, String source);
MetaAction loadMetaAction(@NonNull String actionKey);
MetaActionInfo loadMetaActionInfo(@NonNull String actionKey);
Map<String, MetaActionInfo> listAvailableMetaActions();
boolean checkExists(String... actionKeys);
RunnerClient runnerClient();
void handleInterventions(List<MetaIntervention> interventions, ActionData data);
}

View File

@@ -0,0 +1,446 @@
package work.slhaf.partner.core.action;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.jetbrains.annotations.Nullable;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityCore;
import work.slhaf.partner.api.agent.factory.capability.annotation.CapabilityMethod;
import work.slhaf.partner.common.vector.VectorClient;
import work.slhaf.partner.core.PartnerCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.entity.PhaserRecord;
import work.slhaf.partner.core.action.entity.cache.ActionCacheData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
import work.slhaf.partner.core.action.exception.ActionDataNotFoundException;
import work.slhaf.partner.core.action.exception.MetaActionNotFoundException;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.core.action.runner.SandboxRunnerClient;
import work.slhaf.partner.module.modules.action.interventor.entity.InterventionType;
import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@SuppressWarnings("FieldMayBeFinal")
@CapabilityCore(value = "action")
@Slf4j
public class ActionCore extends PartnerCore<ActionCore> {
/**
* 持久行动池
*/
private CopyOnWriteArraySet<ActionData> actionPool = new CopyOnWriteArraySet<>();
/**
* 待确认任务以userId区分不同用户因为需要跨请求确认
*/
private HashMap<String, List<ActionData>> pendingActions = new HashMap<>();
/**
* 语义缓存与行为倾向映射
*/
private List<ActionCacheData> actionCache = new ArrayList<>();
private final Lock cacheLock = new ReentrantLock();
// 由于当前的执行器逻辑实现,平台线程池大小不得小于 2这里规定为最小为 4
private final ExecutorService platformExecutor = Executors
.newFixedThreadPool(Math.max(Runtime.getRuntime().availableProcessors(), 4));
private final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
/**
* 已存在的行动程序,键格式为‘<MCP-ServerName>::<Tool-Name>’,值为 MCP Server 通过 Resources 相关渠道传递的行动程序元信息
*/
private final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions = new ConcurrentHashMap<>();
private final List<PhaserRecord> phaserRecords = new ArrayList<>();
private RunnerClient runnerClient;
public ActionCore() throws IOException, ClassNotFoundException {
// TODO 通过 AgentConfigManager指定采用何种 runnerClient
runnerClient = new SandboxRunnerClient(existedMetaActions, virtualExecutor);
setupShutdownHook();
}
private void setupShutdownHook() {
// 将执行中的行动状态置为失败
val executingActionSet = listActions(ActionData.ActionStatus.EXECUTING, null);
for (ActionData actionData : executingActionSet) {
actionData.setStatus(ActionData.ActionStatus.FAILED);
actionData.setResult("由于系统中断而失败");
}
}
@CapabilityMethod
public void putAction(@NonNull ActionData actionData) {
actionPool.removeIf(data -> data.getUuid().equals(actionData.getUuid())); // 用来应对 ScheduledActionData 的重新排列
actionPool.add(actionData);
}
@CapabilityMethod
public Set<ActionData> listActions(@Nullable ActionData.ActionStatus actionStatus, @Nullable String source) {
return actionPool.stream()
.filter(actionData -> actionStatus == null || actionData.getStatus().equals(actionStatus))
.filter(actionData -> source == null || actionData.getSource().equals(source))
.collect(Collectors.toSet());
}
@CapabilityMethod
public synchronized void putPendingActions(String userId, ActionData actionData) {
pendingActions.computeIfAbsent(userId, k -> {
List<ActionData> temp = new ArrayList<>();
temp.add(actionData);
return temp;
});
}
@CapabilityMethod
public synchronized List<ActionData> popPendingAction(String userId) {
List<ActionData> infos = pendingActions.get(userId);
pendingActions.remove(userId);
return infos;
}
@CapabilityMethod
public List<ActionData> listPendingAction(String userId) {
return pendingActions.get(userId);
}
/**
* 计算输入内容的语义向量,根据与{@link ActionCacheData#getInputVector()}的相似度挑取缓存,后续将根据评估结果来更新计数
*
* @param input 本次输入内容
* @return 命中的行为倾向集合
*/
@CapabilityMethod
public List<String> selectTendencyCache(String input) {
if (!VectorClient.status) {
return null;
}
VectorClient vectorClient = VectorClient.INSTANCE;
// 计算本次输入的向量
float[] vector = vectorClient.compute(input);
if (vector == null)
return null;
// 与现有缓存比对,将匹配到的收集并返回
return actionCache.parallelStream()
.filter(ActionCacheData::isActivated)
.filter(data -> {
double compared = vectorClient.compare(vector, data.getInputVector());
return compared > data.getThreshold();
})
.map(ActionCacheData::getTendency)
.collect(Collectors.toList());
}
@CapabilityMethod
public void updateTendencyCache(CacheAdjustData data) {
VectorClient vectorClient = VectorClient.INSTANCE;
List<CacheAdjustMetaData> list = data.getMetaDataList();
String input = data.getInput();
float[] inputVector = vectorClient.compute(input);
List<CacheAdjustMetaData> matchAndPassed = new ArrayList<>();
List<CacheAdjustMetaData> matchNotPassed = new ArrayList<>();
List<CacheAdjustMetaData> notMatchPassed = new ArrayList<>();
for (CacheAdjustMetaData metaData : list) {
if (metaData.isHit() && metaData.isPassed()) {
matchAndPassed.add(metaData);
} else if (metaData.isHit()) {
matchNotPassed.add(metaData);
} else if (!metaData.isPassed()) {
notMatchPassed.add(metaData);
}
}
platformExecutor.execute(() -> adjustMatchAndPassed(matchAndPassed, inputVector, input, vectorClient));
platformExecutor.execute(() -> adjustMatchNotPassed(matchNotPassed, vectorClient));
platformExecutor.execute(() -> adjustNotMatchPassed(notMatchPassed, inputVector, input, vectorClient));
}
@CapabilityMethod
public ExecutorService getExecutor(ExecutorType type) {
return switch (type) {
case VIRTUAL -> virtualExecutor;
case PLATFORM -> platformExecutor;
};
}
@CapabilityMethod
public Map<String, MetaActionInfo> listAvailableActions() {
return existedMetaActions;
}
@CapabilityMethod
public synchronized PhaserRecord putPhaserRecord(Phaser phaser, ActionData actionData) {
PhaserRecord record = new PhaserRecord(phaser, actionData);
phaserRecords.add(record);
return record;
}
@CapabilityMethod
public synchronized void removePhaserRecord(Phaser phaser) {
PhaserRecord remove = null;
for (PhaserRecord record : phaserRecords) {
if (record.phaser().equals(phaser)) {
remove = record;
}
}
if (remove != null) {
phaserRecords.remove(remove);
}
}
@CapabilityMethod
public PhaserRecord getPhaserRecord(String tendency, String source) {
for (PhaserRecord record : phaserRecords) {
ActionData data = record.actionData();
if (data.getTendency().equals(tendency) && data.getSource().equals(source)) {
return record;
}
}
throw new ActionDataNotFoundException("未找到对应的 Phaser 记录: tendency=" + tendency + ", source=" + source);
}
@CapabilityMethod
public MetaAction loadMetaAction(@NonNull String actionKey) {
MetaActionInfo metaActionInfo = existedMetaActions.get(actionKey);
if (metaActionInfo == null) {
throw new MetaActionNotFoundException("未找到对应的行动程序信息" + actionKey);
}
String[] split = actionKey.split("::");
if (split.length < 2) {
throw new MetaActionNotFoundException("未找到对应的行动程序,原因: 传入的 actionKey(" + actionKey + ") 存在异常");
}
return new MetaAction(
split[1],
metaActionInfo.isIo(),
MetaAction.Type.MCP,
split[0]
);
}
@CapabilityMethod
public List<PhaserRecord> listPhaserRecords() {
return phaserRecords;
}
@CapabilityMethod
public MetaActionInfo loadMetaActionInfo(@NonNull String actionKey) {
MetaActionInfo info = existedMetaActions.get(actionKey);
if (info == null) {
throw new MetaActionNotFoundException("未找到对应的行动程序描述信息: " + actionKey);
}
return info;
}
@CapabilityMethod
public boolean checkExists(String... actionKeys) {
return existedMetaActions.keySet().containsAll(Arrays.asList(actionKeys));
}
@CapabilityMethod
public RunnerClient runnerClient() {
return runnerClient;
}
@CapabilityMethod
public void handleInterventions(List<MetaIntervention> interventions, ActionData actionData) {
// 加载数据
if (actionData == null) {
return;
}
// 加锁确保同步
synchronized (actionData.getStatus()) {
applyInterventions(interventions, actionData);
}
}
private void applyInterventions(List<MetaIntervention> interventions, ActionData actionData) {
boolean rebuildCleanTag = false;
interventions.sort(Comparator.comparingInt(MetaIntervention::getOrder));
for (MetaIntervention intervention : interventions) {
List<MetaAction> actions = intervention.getActions()
.stream()
.map(this::loadMetaAction)
.toList();
switch (intervention.getType()) {
case InterventionType.APPEND -> handleAppend(actionData, intervention.getOrder(), actions);
case InterventionType.INSERT -> handleInsert(actionData, intervention.getOrder(), actions);
case InterventionType.DELETE -> handleDelete(actionData, intervention.getOrder(), actions);
case InterventionType.CANCEL -> handleCancel(actionData);
case InterventionType.REBUILD -> {
if (!rebuildCleanTag) {
cleanActionData(actionData);
rebuildCleanTag = true;
}
handleRebuild(actionData, intervention.getOrder(), actions);
}
}
}
}
/**
* 在未进入执行阶段的行动单元组新增新的行动
*/
private void handleAppend(ActionData actionData, int order, List<MetaAction> actions) {
if (order <= actionData.getExecutingStage())
return;
actionData.getActionChain().put(order, actions);
}
/**
* 在未进入执行阶段和正处于行动阶段的行动单元组插入新的行动
*/
private void handleInsert(ActionData actionData, int order, List<MetaAction> actions) {
if (order < actionData.getExecutingStage())
return;
actionData.getActionChain().computeIfAbsent(order, k -> new ArrayList<>()).addAll(actions);
}
private void handleDelete(ActionData actionData, int order, List<MetaAction> actions) {
if (order <= actionData.getExecutingStage())
return;
Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
if (actionChain.containsKey(order)) {
actionChain.get(order).removeAll(actions);
if (actionChain.get(order).isEmpty()) {
actionChain.remove(order);
}
}
}
private void handleCancel(ActionData actionData) {
actionData.setStatus(ActionData.ActionStatus.FAILED);
actionData.setResult("行动取消");
}
private void handleRebuild(ActionData actionData, int order, List<MetaAction> actions) {
Map<Integer, List<MetaAction>> actionChain = actionData.getActionChain();
actionChain.put(order, actions);
}
private void cleanActionData(ActionData actionData) {
actionData.getActionChain().clear();
actionData.setExecutingStage(0);
actionData.setStatus(ActionData.ActionStatus.PREPARE);
actionData.getHistory().clear();
}
/**
* 命中缓存且评估通过时
*
* @param matchAndPassed 该类型的带调整缓存信息列表
* @param inputVector 本次输入内容的语义向量
* @param vectorClient 向量客户端
*/
private void adjustMatchAndPassed(List<CacheAdjustMetaData> matchAndPassed, float[] inputVector, String input,
VectorClient vectorClient) {
matchAndPassed.forEach(adjustData -> {
// 获取原始缓存条目
String tendency = adjustData.getTendency();
ActionCacheData primaryCacheData = selectCacheData(tendency);
if (primaryCacheData == null) {
return;
}
primaryCacheData.updateAfterMatchAndPassed(inputVector, vectorClient, input);
});
}
/**
* 针对命中缓存、但评估未通过的条目与输入进行处理
*
* @param matchNotPassed 该类型的带调整缓存信息列表
* @param vectorClient 向量客户端
*/
private void adjustMatchNotPassed(List<CacheAdjustMetaData> matchNotPassed, VectorClient vectorClient) {
List<ActionCacheData> toRemove = new ArrayList<>();
matchNotPassed.forEach(adjustData -> {
// 获取原始缓存条目
String tendency = adjustData.getTendency();
ActionCacheData primaryCacheData = selectCacheData(tendency);
if (primaryCacheData == null) {
return;
}
boolean remove = primaryCacheData.updateAfterMatchNotPassed(vectorClient);
if (remove) {
toRemove.add(primaryCacheData);
}
});
cacheLock.lock();
actionCache.removeAll(toRemove);
cacheLock.unlock();
}
/**
* 针对未命中但评估通过的缓存做出调整:
* <ol>
* <h3>如果存在缓存条目</h3>
* <li>
* 若已生效,但此时未匹配到则说明尚未生效或者阈值、向量{@link ActionCacheData#getInputVector()}存在问题,调低阈值,同时带权移动平均
* </li>
* <li>
* 若未生效,则只增加计数并带权移动平均
* </li>
* </ol>
* 如果不存在缓存条目,则新增并填充字段
*
* @param notMatchPassed 该类型的带调整缓存信息列表
* @param inputVector 本次输入内容的语义向量
* @param input 本次输入内容
* @param vectorClient 向量客户端
*/
private void adjustNotMatchPassed(List<CacheAdjustMetaData> notMatchPassed, float[] inputVector, String input,
VectorClient vectorClient) {
notMatchPassed.forEach(adjustData -> {
// 获取原始缓存条目
String tendency = adjustData.getTendency();
ActionCacheData primaryCacheData = selectCacheData(tendency);
float[] tendencyVector = vectorClient.compute(tendency);
if (primaryCacheData == null) {
actionCache.add(new ActionCacheData(tendency, tendencyVector, inputVector, input));
return;
}
primaryCacheData.updateAfterNotMatchPassed(input, inputVector, tendencyVector, vectorClient);
});
}
private ActionCacheData selectCacheData(String tendency) {
for (ActionCacheData actionCacheData : actionCache) {
if (actionCacheData.getTendency().equals(tendency)) {
return actionCacheData;
}
}
log.warn("[{}] 未找到行为倾向[{}]对应的缓存条目,可能是代码逻辑存在错误", getCoreKey(), tendency);
return null;
}
@Override
protected String getCoreKey() {
return "action-core";
}
public enum ExecutorType {
VIRTUAL, PLATFORM
}
}

View File

@@ -1,11 +0,0 @@
package work.slhaf.partner.core.action.entity;
import lombok.Data;
@Data
public class ActionData {
private String key;
private String[] array;
private String reason;
private String description;
}

View File

@@ -0,0 +1,143 @@
package work.slhaf.partner.core.action.entity
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.HistoryAction
import java.time.ZonedDateTime
import java.util.*
/**
* 行动模块传递的行动数据包含行动uuid、倾向、状态、行动链、结果、发起原因、行动描述等信息。
*/
sealed class ActionData {
/**
* 行动ID
*/
val uuid: String = UUID.randomUUID().toString()
/**
* 行动倾向
*/
abstract val tendency: String
/**
* 行动状态
*/
var status: ActionStatus = ActionStatus.PREPARE
/**
* 行动链
*/
abstract val actionChain: MutableMap<Int, MutableList<MetaAction>>
/**
* 行动阶段(当前阶段)
*/
var executingStage: Int = 0
/**
* 行动结果
*/
lateinit var result: String
val history: MutableMap<Int, MutableList<HistoryAction>> = mutableMapOf()
/**
* 修复上下文
*/
val additionalContext: MutableMap<Int, MutableList<String>> = mutableMapOf()
/**
* 行动原因
*/
abstract val reason: String
/**
* 行动描述
*/
abstract val description: String
/**
* 行动来源
*/
abstract val source: String
enum class ActionStatus {
/**
* 执行成功
*/
SUCCESS,
/**
* 执行失败
*/
FAILED,
/**
* 执行中
*/
EXECUTING,
/**
* 暂时中断
*/
INTERRUPTED,
/**
* 预备执行
*/
PREPARE
}
}
/**
* 计划行动数据类,继承自{@link ActionData},扩展了属性{@link ScheduledActionData#type}和{@link ScheduledActionData#scheduleContent},用于标识计划类型(单次还是周期性任务)和计划内容
*/
data class ScheduledActionData(
override val tendency: String,
override val actionChain: MutableMap<Int, MutableList<MetaAction>>,
override val reason: String,
override val description: String,
override val source: String,
val scheduleType: ScheduleType,
val scheduleContent: String,
) : ActionData() {
val scheduleHistories = ArrayList<ScheduleHistory>()
fun recordAndReset() {
val newHistory = ScheduleHistory(ZonedDateTime.now(), result, history.toMap())
scheduleHistories.add(newHistory)
additionalContext.clear()
executingStage = 0
for (entry in actionChain) {
for (action in entry.value) {
action.params.clear()
action.result.reset()
}
}
status = ActionStatus.PREPARE
}
enum class ScheduleType {
CYCLE,
ONCE
}
data class ScheduleHistory(
val endTime: ZonedDateTime,
val result: String,
val history: Map<Int, List<HistoryAction>>
)
}
/**
* 即时行动数据类
*/
data class ImmediateActionData(
override val tendency: String,
override val actionChain: MutableMap<Int, MutableList<MetaAction>>,
override val reason: String,
override val description: String,
override val source: String,
) : ActionData()

View File

@@ -0,0 +1,10 @@
package work.slhaf.partner.core.action.entity;
import lombok.Data;
@Data
public class ActionFileMetaData {
private String content;
private String name;
private String ext;
}

View File

@@ -1,5 +0,0 @@
package work.slhaf.partner.core.action.entity;
public enum ActionStatus {
SUCCESS, FAILED, EXECUTING, WAITING
}

View File

@@ -0,0 +1,15 @@
package work.slhaf.partner.core.action.entity;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import java.util.List;
@Data
public class GeneratedData {
private List<String> dependencies;
private String code;
private String codeType;
private boolean serialize;
private JSONObject responseSchema;
}

View File

@@ -1,5 +1,5 @@
package work.slhaf.partner.core.action.entity; package work.slhaf.partner.core.action.entity;
public enum ActionType { public class McpData {
IMMEDIATE, PLANNING
} }

View File

@@ -0,0 +1,73 @@
package work.slhaf.partner.core.action.entity
/**
* 行动链中的单一元素,封装了调用外部行动程序的必要信息与结果容器,可被[work.slhaf.partner.core.action.ActionCapability]执行
*/
data class MetaAction(
/**
* 行动name用于标识行动程序
*/
val name: String,
/**
* 是否IO密集用于决定使用何种线程池
*/
val io: Boolean = false,
/**
* 行动程序类型,可分为 MCP、ORIGIN 两种,前者对应读取到的 MCP Tool、后者对应生成的临时行动程序
*/
val type: Type,
/**
* 当类型为 MCP 时,该字段对应相应 MCP Client 注册时生成的 id;
* 当类型为 ORIGIN 时,该字段对应相应的磁盘路径字符串
*/
val location: String,
) {
/**
* 行动程序可接受的参数,由调用处设置
*/
val params: MutableMap<String, Any> = mutableMapOf()
/**
* 行动结果,包括执行状态和相应内容(执行结果或者错误信息)
*/
val result = Result()
val key: String
/**
* actionKey 将由 location+name 共同定位
*
* @return actionKey
*/
get() = "$location::$name"
class Result {
var status = Status.WAITING
var data: String? = null
fun reset() {
status = Status.WAITING
data = null
}
enum class Status {
SUCCESS,
FAILED,
WAITING
}
}
enum class Type {
/**
* 将调用的 MCP 工具,可包括远程、本地任意服务
*/
MCP,
/**
* 适用于‘临时生成’的行动程序,在生成后根据序列化选项及执行情况,进行持久化
*/
ORIGIN
}
}

View File

@@ -1,13 +1,25 @@
package work.slhaf.partner.core.action.entity; package work.slhaf.partner.core.action.entity;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data; import lombok.Data;
import java.time.LocalDateTime; import java.util.List;
import java.util.Map;
@Data @Data
public class MetaActionInfo { public class MetaActionInfo {
private ActionData actionData; private boolean io;
private ActionStatus status;
private String Result; private Map<String, Object> params;
private LocalDateTime dateTime; private String description;
private List<String> tags;
private List<String> preActions;
private List<String> postActions;
/**
* 是否严格依赖前置行动的成功执行若为true且前置行动失败则不执行该行动后置任务多为触发式。默认即执行。
*/
private boolean strictDependencies;
private JSONObject responseSchema;
} }

View File

@@ -0,0 +1,33 @@
package work.slhaf.partner.core.action.entity;
import work.slhaf.partner.core.action.entity.ActionData.ActionStatus;
import java.util.concurrent.Phaser;
public record PhaserRecord(Phaser phaser, ActionData actionData) {
public void fail() {
actionData.setStatus(ActionStatus.FAILED);
}
/**
* 负责将 ActionData 的状态设置为 INTERRUPTED
* 同时循环检查进行阻塞
*/
public void interrupt() {
actionData.setStatus(ActionStatus.INTERRUPTED);
while (actionData().getStatus() == ActionStatus.INTERRUPTED) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
}
}
/**
* 将状态重新设置为 EXECUTING ,恢复 interrupt 阻塞状态
*/
public void complete() {
actionData().setStatus(ActionStatus.EXECUTING);
}
}

View File

@@ -0,0 +1,181 @@
package work.slhaf.partner.core.action.entity.cache;
import lombok.Data;
import work.slhaf.partner.common.vector.VectorClient;
import java.util.ArrayList;
import java.util.List;
@Data
public class ActionCacheData {
private boolean activated = false;
private int inputMatchCount = 1;
private float[] inputVector;
private float[] tendencyVector;
private String tendency;
private double threshold = 0.75;
private List<String> validSamples = new ArrayList<>();
private int failedCount = 0;
private Type type = Type.PRIMARY;
public ActionCacheData(String tendency, float[] tendencyVector, float[] inputVector, String input) {
this.tendency = tendency;
this.inputVector = inputVector;
this.tendencyVector = tendencyVector;
this.validSamples.add(input);
}
/**
* 命中缓存且评估通过时,根据输入内容的语义向量与现有的输入语义向量进行带权移动平均,以相似度为权重,同时降低失败计数,为零时置为上一级缓存类型{@link ActionCacheData.Type}
*
* @param inputVector 本次输入内容对应的语义向量
* @param vectorClient 向量客户端
* @param input 本次输入内容
*/
public synchronized void updateAfterMatchAndPassed(float[] inputVector, VectorClient vectorClient, String input) {
updateInputVector(inputVector, vectorClient);
addValidSample(input);
reduceFailedCount();
updateType();
addInputMatchCount();
}
private void updateType() {
if (this.failedCount == 0) {
this.type = switch (type) {
case PRIMARY, REBUILD_V1 -> ActionCacheData.Type.PRIMARY;
case REBUILD_V2 -> ActionCacheData.Type.REBUILD_V1;
case REBUILD_V3 -> ActionCacheData.Type.REBUILD_V2;
};
}
}
private void reduceFailedCount() {
this.failedCount = Math.max(this.failedCount - 1, 0);
}
private void addValidSample(String input) {
if (this.validSamples.size() == 12) {
this.validSamples.removeFirst();
}
this.validSamples.add(input);
}
private void updateInputVector(float[] inputVector, VectorClient vectorClient) {
this.inputVector = vectorClient.weightedAverage(inputVector, this.inputVector);
}
/**
* 针对命中缓存、但评估未通过的条目与输入进行处理: 增加失败计数(必要时重建并更新类型等级)、调高阈值(0.02),由于缓存匹配但评估未通过,所以不进行带权移动平均
*
* @param vectorClient 向量客户端
* @return 是否需要删除(已在REBUILD_V3状态且达到最大误判次数的)
*/
public synchronized boolean updateAfterMatchNotPassed(VectorClient vectorClient) {
adjustThreshold();
addFailedCount();
if (this.failedCount < 3) {
return false;
}
if (this.type == Type.REBUILD_V3) {
return true;
}
rebuildAndSwitchType(vectorClient);
return false;
}
private void rebuildAndSwitchType(VectorClient vectorClient) {
this.type = switch (this.type) {
case PRIMARY -> {
//样本顺序反转后,以全部样本重建
this.validSamples = this.validSamples.reversed();
rebuildWithSamples(vectorClient);
yield Type.REBUILD_V1;
}
case REBUILD_V1 -> {
//截取后一半样本,反转后以此重建
List<String> temp = this.validSamples.subList(this.validSamples.size() / 2, this.validSamples.size());
this.validSamples = temp.reversed();
rebuildWithSamples(vectorClient);
yield Type.REBUILD_V2;
}
case REBUILD_V2 -> {
//截取后四分之一样本,反转后以此重建
List<String> temp = this.validSamples.subList(this.validSamples.size() / 4, this.validSamples.size());
this.validSamples = temp.reversed();
rebuildWithSamples(vectorClient);
yield Type.REBUILD_V3;
}
case REBUILD_V3 -> null;
};
//阈值减0.05,防止重建后一直升高
this.threshold = Math.max(this.threshold - 0.05, 0.75);
this.failedCount = 0;
}
private void rebuildWithSamples(VectorClient vectorClient) {
for (int i = 0; i < this.validSamples.size(); i++) {
String sample = this.validSamples.get(i);
if (i == 0) {
this.inputVector = vectorClient.compute(sample);
} else {
float[] newSampleVector = vectorClient.compute(sample);
this.inputVector = vectorClient.weightedAverage(this.inputVector, newSampleVector);
}
}
}
private void addFailedCount() {
this.failedCount = Math.min(this.failedCount + 1, 3);
}
private void adjustThreshold() {
double newThreshold = this.threshold + 0.03;
this.threshold = Math.min(newThreshold, 0.95);
}
/**
* 针对未命中但评估通过的已存在缓存做出调整:
* <ol>
* <li>
* 若已生效,但此时未匹配到则说明阈值或者向量{@link ActionCacheData#getInputVector()}存在问题,调低阈值,同时带权移动平均
* </li>
* <li>
* 若未生效,则只增加计数并带权移动平均
* </li>
* </ol>
*
* @param input 本次输入内容
* @param inputVector 本次输入内容对应的语义向量
* @param tendencyVector 本次倾向对应的语义向量
* @param vectorClient 向量客户端
*/
public synchronized void updateAfterNotMatchPassed(String input, float[] inputVector, float[] tendencyVector, VectorClient vectorClient) {
if (this.activated) {
reduceThreshold();
this.inputVector = vectorClient.weightedAverage(inputVector, this.inputVector);
} else {
addValidSample(input);
this.tendencyVector = vectorClient.weightedAverage(tendencyVector, this.tendencyVector);
addInputMatchCount();
}
}
private void reduceThreshold() {
double newThreshold = this.threshold - 0.02;
this.threshold = Math.max(newThreshold, 0.75);
}
private void addInputMatchCount() {
this.inputMatchCount += 1;
if (inputMatchCount >= 6) {
this.activated = true;
}
}
public enum Type {
PRIMARY, REBUILD_V1, REBUILD_V2, REBUILD_V3
}
}

View File

@@ -0,0 +1,11 @@
package work.slhaf.partner.core.action.entity.cache;
import lombok.Data;
import java.util.List;
@Data
public class CacheAdjustData {
private String input;
private List<CacheAdjustMetaData> metaDataList;
}

View File

@@ -0,0 +1,10 @@
package work.slhaf.partner.core.action.entity.cache;
import lombok.Data;
@Data
public class CacheAdjustMetaData {
private String tendency;
private boolean passed;
private boolean hit;
}

View File

@@ -0,0 +1,13 @@
package work.slhaf.partner.core.action.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class ActionDataNotFoundException extends AgentRuntimeException {
public ActionDataNotFoundException(String message) {
super(message);
}
public ActionDataNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,13 @@
package work.slhaf.partner.core.action.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentLaunchFailedException;
public class ActionInitFailedException extends AgentLaunchFailedException {
public ActionInitFailedException(String message, Throwable cause) {
super(message, cause);
}
public ActionInitFailedException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,13 @@
package work.slhaf.partner.core.action.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class ActionLoadFailedException extends AgentRuntimeException {
public ActionLoadFailedException(String message) {
super(message);
}
public ActionLoadFailedException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,13 @@
package work.slhaf.partner.core.action.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class ActionSerializeFailedException extends AgentRuntimeException {
public ActionSerializeFailedException(String message) {
super(message);
}
public ActionSerializeFailedException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,13 @@
package work.slhaf.partner.core.action.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class MetaActionNotFoundException extends AgentRuntimeException {
public MetaActionNotFoundException(String message) {
super(message);
}
public MetaActionNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,107 @@
package work.slhaf.partner.core.action.runner;
import com.alibaba.fastjson2.JSONObject;
import io.modelcontextprotocol.server.McpStatelessAsyncServer;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.jetbrains.annotations.Nullable;
import work.slhaf.partner.core.action.entity.ActionFileMetaData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaAction.Result;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.action.exception.ActionInitFailedException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import static work.slhaf.partner.common.Constant.Path.DATA;
import static work.slhaf.partner.common.util.PathUtil.buildPathStr;
/**
* 执行客户端抽象类
* <br/>
* 只负责暴露序列化、执行等相应接口,具体逻辑交给下游实现
* <br/>
* 默认存在两类实现,{@link LocalRunnerClient} 和 {@link SandboxRunnerClient}
* <ol>
* LocalRunnerClient:
* <li>
* 对应本地运行环境,可在本地启动 MCP 客户端将 RunnerClient 暴露的能力接口转发至本地 MCP Client 并执行
* </li>
* SandboxRunnerClient:
* <li>
* 对应沙盒运行环境,该 Client 仅作为沙盒环境的客户端,不持有额外能力,仅保持远端连接已存在行动的内容更新
* </li>
* </ol>
*/
@Slf4j
public abstract class RunnerClient {
protected final String ACTION_PATH;
protected final ConcurrentHashMap<String, MetaActionInfo> existedMetaActions;
protected final ExecutorService executor;
//TODO 仍可提供内部 MCP但调用方式需要结合 AgentContext来获取否则生命周期不合
protected McpStatelessAsyncServer innerMcpServer;
/**
* ActionCore 将注入虚拟线程池
*/
public RunnerClient(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService executor, @Nullable String baseActionPath) {
this.existedMetaActions = existedMetaActions;
this.executor = executor;
baseActionPath = baseActionPath == null ? DATA : baseActionPath;
this.ACTION_PATH = buildPathStr(baseActionPath, "action");
createPath(ACTION_PATH);
}
/**
* 执行行动程序
*/
public void submit(MetaAction metaAction) {
// 获取已存在行动列表
Result result = metaAction.getResult();
if (!result.getStatus().equals(Result.Status.WAITING)) {
return;
}
RunnerResponse response = doRun(metaAction);
result.setData(response.getData());
result.setStatus(response.isOk() ? Result.Status.SUCCESS : Result.Status.FAILED);
}
protected abstract RunnerResponse doRun(MetaAction metaAction);
public abstract String buildTmpPath(String actionKey, String codeType);
public abstract void tmpSerialize(MetaAction tempAction, String code, String codeType) throws IOException;
public abstract void persistSerialize(MetaActionInfo metaActionInfo, ActionFileMetaData fileMetaData);
protected void createPath(String pathStr) {
val path = Path.of(pathStr);
try {
Files.createDirectory(path);
} catch (IOException e) {
if (!Files.exists(path)) {
throw new ActionInitFailedException("目录创建失败: " + pathStr, e);
}
}
}
/**
* 列出执行环境下的系统依赖情况
*/
public abstract JSONObject listSysDependencies();
@Data
public static class RunnerResponse {
private boolean ok;
private String data;
}
}

View File

@@ -0,0 +1,57 @@
package work.slhaf.partner.core.action.runner;
import com.alibaba.fastjson2.JSONObject;
import work.slhaf.partner.core.action.entity.ActionFileMetaData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
/**
* 基于 Http 与 WebSocket 的沙盒执行器客户端,负责:
* <ul>
* <li>
* 发送行动单元数据
* </li>
* <li>
* 实时更新获取已存在行动列表
* </li>
* <li>
* 向传入的 MetaAction 回写执行结果
* </li>
* </ul>
*/
public class SandboxRunnerClient extends RunnerClient {
public SandboxRunnerClient(ConcurrentHashMap<String, MetaActionInfo> existedMetaActions, ExecutorService executor) { // 连接沙盒执行器(websocket)
super(existedMetaActions, executor, null);
}
protected RunnerResponse doRun(MetaAction metaAction) {
// 调用沙盒执行器
return null;
}
@Override
public JSONObject listSysDependencies() {
return null;
}
@Override
public String buildTmpPath(String actionKey, String codeType) {
throw new UnsupportedOperationException("Unimplemented method 'buildTmpPath'");
}
@Override
public void tmpSerialize(MetaAction tempAction, String code, String codeType) throws IOException {
throw new UnsupportedOperationException("Unimplemented method 'tmpSerialize'");
}
@Override
public void persistSerialize(MetaActionInfo metaActionInfo, ActionFileMetaData fileMetaData) {
throw new UnsupportedOperationException("Unimplemented method 'persistSerialize'");
}
}

View File

@@ -13,7 +13,6 @@ import java.util.concurrent.locks.Lock;
public interface CognationCapability { public interface CognationCapability {
List<Message> getChatMessages(); List<Message> getChatMessages();
void setChatMessages(List<Message> chatMessages);
void cleanMessage(List<Message> messages); void cleanMessage(List<Message> messages);
Lock getMessageLock(); Lock getMessageLock();
void addMetaMessage(String userId, MetaMessage metaMessage); void addMetaMessage(String userId, MetaMessage metaMessage);

View File

@@ -63,11 +63,6 @@ public class CognationCore extends PartnerCore<CognationCore> {
return currentMemoryId; return currentMemoryId;
} }
@CapabilityMethod
public void setChatMessages(List<Message> chatMessages) {
this.chatMessages = chatMessages;
}
@CapabilityMethod @CapabilityMethod
public void cleanMessage(List<Message> messages) { public void cleanMessage(List<Message> messages) {
messageLock.lock(); messageLock.lock();

View File

@@ -5,7 +5,6 @@ import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import work.slhaf.partner.core.memory.pojo.MemoryResult; import work.slhaf.partner.core.memory.pojo.MemoryResult;
import work.slhaf.partner.core.memory.pojo.MemorySlice; import work.slhaf.partner.core.memory.pojo.MemorySlice;
import java.io.IOException;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
@@ -45,7 +44,7 @@ public interface MemoryCapability {
MemoryResult selectMemory(String topicPathStr); MemoryResult selectMemory(String topicPathStr);
MemoryResult selectMemory(LocalDate date) throws IOException, ClassNotFoundException; MemoryResult selectMemory(LocalDate date);
void insertSlice(MemorySlice memorySlice, String topicPath); void insertSlice(MemorySlice memorySlice, String topicPath);

View File

@@ -198,9 +198,9 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
//尝试更新缓存 //尝试更新缓存
updateCache(topicPath, memoryResult); updateCache(topicPath, memoryResult);
} catch (Exception e) { } catch (Exception e) {
log.error("[CoordinatedManager] selectMemory error: ", e); log.error("[{}] selectMemory error: ", getCoreKey(), e);
log.error("[CoordinatedManager] 路径: {}", topicPathStr); log.error("[{}] 路径: {}", getCoreKey(), topicPathStr);
log.error("[CoordinatedManager] 主题树: {}", getTopicTree()); log.error("[{}] 主题树: {}", getCoreKey(), getTopicTree());
memoryResult = new MemoryResult(); memoryResult = new MemoryResult();
memoryResult.setRelatedMemorySliceResult(new ArrayList<>()); memoryResult.setRelatedMemorySliceResult(new ArrayList<>());
memoryResult.setMemorySliceResult(new CopyOnWriteArrayList<>()); memoryResult.setMemorySliceResult(new CopyOnWriteArrayList<>());
@@ -211,7 +211,7 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
@CapabilityMethod @CapabilityMethod
public void updateActivatedSlices(String userId, List<EvaluatedSlice> memorySlices) { public void updateActivatedSlices(String userId, List<EvaluatedSlice> memorySlices) {
cache.activatedSlices.put(userId, memorySlices); cache.activatedSlices.put(userId, memorySlices);
log.debug("[CoordinatedManager] 已更新激活切片, userId: {}", userId); log.debug("[{}] 已更新激活切片, userId: {}", getCoreKey(), userId);
} }
@CapabilityMethod @CapabilityMethod
@@ -488,7 +488,7 @@ public class MemoryCore extends PartnerCore<MemoryCore> {
return targetParentNode; return targetParentNode;
} }
public void updateCacheCounter(List<String> topicPath) { private void updateCacheCounter(List<String> topicPath) {
ConcurrentHashMap<List<String>, Integer> memoryNodeCacheCounter = cache.memoryNodeCacheCounter; ConcurrentHashMap<List<String>, Integer> memoryNodeCacheCounter = cache.memoryNodeCacheCounter;
if (memoryNodeCacheCounter.containsKey(topicPath)) { if (memoryNodeCacheCounter.containsKey(topicPath)) {
Integer tempCount = memoryNodeCacheCounter.get(topicPath); Integer tempCount = memoryNodeCacheCounter.get(topicPath);

View File

@@ -5,7 +5,7 @@ import lombok.EqualsAndHashCode;
import work.slhaf.partner.api.common.entity.PersistableObject; import work.slhaf.partner.api.common.entity.PersistableObject;
import java.io.Serial; import java.io.Serial;
import java.util.HashMap; import java.util.Map;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Data @Data
@@ -15,5 +15,5 @@ public class AppendPromptData extends PersistableObject {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private String moduleName; private String moduleName;
private HashMap<String,String> appendedPrompt; private Map<String, String> appendedPrompt;
} }

View File

@@ -3,14 +3,12 @@ package work.slhaf.partner.module.common.module;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningModule;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.io.IOException;
public abstract class PostRunningModule extends AgentRunningModule<PartnerRunningFlowContext> { public abstract class PostRunningModule extends AgentRunningModule<PartnerRunningFlowContext> {
@Override @Override
public final void execute(PartnerRunningFlowContext context) throws IOException, ClassNotFoundException { public final void execute(PartnerRunningFlowContext context) {
boolean trigger = context.getModuleContext().getExtraContext().getBoolean("post_process_trigger"); boolean trigger = context.getModuleContext().getExtraContext().getBoolean("post_process_trigger");
if (!trigger) { if (!trigger && relyOnMessage()) {
return; return;
} }
doExecute(context); doExecute(context);
@@ -18,4 +16,5 @@ public abstract class PostRunningModule extends AgentRunningModule<PartnerRunnin
public abstract void doExecute(PartnerRunningFlowContext context); public abstract void doExecute(PartnerRunningFlowContext context);
protected abstract boolean relyOnMessage();
} }

View File

@@ -5,8 +5,7 @@ import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunn
import work.slhaf.partner.module.common.entity.AppendPromptData; import work.slhaf.partner.module.common.entity.AppendPromptData;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.io.IOException; import java.util.Map;
import java.util.HashMap;
/** /**
* 前置模块抽象类 * 前置模块抽象类
@@ -16,7 +15,7 @@ public abstract class PreRunningModule extends AgentRunningModule<PartnerRunning
private synchronized void setAppendedPrompt(PartnerRunningFlowContext context) { private synchronized void setAppendedPrompt(PartnerRunningFlowContext context) {
AppendPromptData data = new AppendPromptData(); AppendPromptData data = new AppendPromptData();
data.setModuleName(moduleName()); data.setModuleName(moduleName());
HashMap<String, String> map = getPromptDataMap(context.getUserId()); Map<String, String> map = getPromptDataMap(context);
data.setAppendedPrompt(map); data.setAppendedPrompt(map);
context.setAppendedPrompt(data); context.setAppendedPrompt(data);
} }
@@ -25,7 +24,7 @@ public abstract class PreRunningModule extends AgentRunningModule<PartnerRunning
context.getCoreContext().addActiveModule(moduleName()); context.getCoreContext().addActiveModule(moduleName());
} }
protected abstract HashMap<String, String> getPromptDataMap(String userId); protected abstract Map<String, String> getPromptDataMap(PartnerRunningFlowContext context);
/** /**
* 用于在CoreModule接收到的模块Prompt中标识模块名称 * 用于在CoreModule接收到的模块Prompt中标识模块名称
@@ -33,13 +32,12 @@ public abstract class PreRunningModule extends AgentRunningModule<PartnerRunning
protected abstract String moduleName(); protected abstract String moduleName();
@Override @Override
public final void execute(PartnerRunningFlowContext context) throws IOException, ClassNotFoundException { public final void execute(PartnerRunningFlowContext context) {
doExecute(context); // 子类实现差异化逻辑 doExecute(context); // 子类实现差异化逻辑
setAppendedPrompt(context); // 通用逻辑 setAppendedPrompt(context); // 通用逻辑
setActiveModule(context); // 通用逻辑 setActiveModule(context); // 通用逻辑
} }
protected abstract void doExecute(PartnerRunningFlowContext context) throws IOException, ClassNotFoundException; protected abstract void doExecute(PartnerRunningFlowContext context);
} }

View File

@@ -0,0 +1,70 @@
package work.slhaf.partner.module.modules.action.dispatcher;
import lombok.val;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.ImmediateActionData;
import work.slhaf.partner.core.action.entity.ScheduledActionData;
import work.slhaf.partner.module.common.module.PostRunningModule;
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput;
import work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@AgentModule(name = "action_dispatcher", order = 7)
public class ActionDispatcher extends PostRunningModule {
@InjectCapability
private ActionCapability actionCapability;
@InjectModule
private ActionExecutor actionExecutor;
@InjectModule
private ActionScheduler actionScheduler;
private ExecutorService executor;
@Init
public void init() {
executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
}
@Override
public void doExecute(PartnerRunningFlowContext context) {
// 只需要处理prepared action因为pending action在用户确认后也将变为prepared action
// 将PLANNING action放入时间轮中IMMEDIATE action直接进入并发执行流
// 对于将触发的PLANNING
// action理想做法是将执行工具做成执行链的形式模型的自对话流程、是否通知用户都做成与普通工具同等的通用可选能力避免绑定固定流程
executor.execute(() -> {
String userId = context.getUserId();
val preparedActions = actionCapability.listActions(ActionData.ActionStatus.PREPARE, userId);
// 分类成PLANNING和IMMEDIATE两类
Set<ScheduledActionData> scheduledActions = new HashSet<>();
Set<ImmediateActionData> immediateActions = new HashSet<>();
for (ActionData preparedAction : preparedActions) {
if (preparedAction instanceof ScheduledActionData actionInfo) {
scheduledActions.add(actionInfo);
} else if (preparedAction instanceof ImmediateActionData actionInfo) {
immediateActions.add(actionInfo);
}
}
actionExecutor.execute(new ActionExecutorInput(immediateActions));
actionScheduler.execute(scheduledActions);
});
}
@Override
protected boolean relyOnMessage() {
return false;
}
}

View File

@@ -0,0 +1,52 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import com.alibaba.fastjson2.JSONObject;
import lombok.val;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.CorrectorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.CorrectorResult;
/**
* 负责在单组行动执行后,根据行动意图与结果检查后续行动是否符合目的,必要时直接调整行动链,或发起自对话请求进行干预
*/
@AgentSubModule
public class ActionCorrector extends AgentRunningSubModule<CorrectorInput, CorrectorResult> implements ActivateModel {
@Override
public CorrectorResult execute(CorrectorInput input) {
val prompt = buildPrompt(input);
val chatResponse = singleChat(prompt);
return JSONObject.parseObject(chatResponse.getMessage(), CorrectorResult.class);
}
private String buildPrompt(CorrectorInput input) {
val prompt = new JSONObject();
prompt.put("[行动来源]", input.getSource());
prompt.put("[行动倾向]", input.getTendency());
prompt.put("[行动描述]", input.getDescription());
prompt.put("[行动原因]", input.getReason());
val messages = prompt.putArray("[近期对话]");
messages.addAll(input.getRecentMessages());
val memory = prompt.putArray("[已激活记忆]");
memory.addAll(input.getActivatedSlices());
val history = prompt.putArray("[已执行情况]");
history.addAll(input.getHistory());
return prompt.toJSONString();
}
@Override
public String modelKey() {
return "action_corrector";
}
@Override
public boolean withBasicPrompt() {
return false;
}
}

View File

@@ -0,0 +1,322 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.*;
import work.slhaf.partner.core.action.entity.ActionData.ActionStatus;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.*;
import work.slhaf.partner.module.modules.action.dispatcher.scheduler.ActionScheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@AgentSubModule
public class ActionExecutor extends AgentRunningSubModule<ActionExecutorInput, Void> {
@InjectCapability
private ActionCapability actionCapability;
@InjectCapability
private MemoryCapability memoryCapability;
@InjectCapability
private CognationCapability cognationCapability;
@InjectModule
private ParamsExtractor paramsExtractor;
@InjectModule
private ActionRepairer actionRepairer;
@InjectModule
private ActionCorrector actionCorrector;
@InjectModule
private ActionScheduler actionScheduler;
private ExecutorService virtualExecutor;
private ExecutorService platformExecutor;
private RunnerClient runnerClient;
private final AssemblyHelper assemblyHelper = new AssemblyHelper();
@Init
public void init() {
virtualExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
platformExecutor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
runnerClient = actionCapability.runnerClient();
}
/**
* 执行行动
*
* @param input ActionExecutor 输入内容
* @return 无返回,执行结果回写至 input 内部携带的 actionData 中
*/
@Override
public Void execute(ActionExecutorInput input) {
val actions = input.getActions();
// 异步执行所有行动
for (ActionData actionData : actions) {
platformExecutor.execute(() -> {
val source = actionData.getSource();
if (actionData.getStatus() != ActionStatus.PREPARE) {
return;
}
val actionChain = actionData.getActionChain();
if (actionChain.isEmpty()) {
actionData.setStatus(ActionStatus.FAILED);
actionData.setResult("行动链为空");
return;
}
// 注册执行中行动
val phaser = new Phaser();
val phaserRecord = actionCapability.putPhaserRecord(phaser, actionData);
actionData.setStatus(ActionStatus.EXECUTING);
// 开始执行
val stageCursor = new Object() {
int stageCount;
boolean executingStageUpdated;
boolean stageCountUpdated;
void init() {
stageCount = 0;
executingStageUpdated = false;
stageCountUpdated = false;
update();
}
void requestAdvance() {
if (!stageCountUpdated) {
stageCount++;
stageCountUpdated = true;
}
if (stageCount < actionChain.size() && !executingStageUpdated) {
update();
executingStageUpdated = true;
}
}
boolean next() {
executingStageUpdated = false;
stageCountUpdated = false;
return stageCount < actionChain.size();
}
void update() {
val orderList = new ArrayList<>(actionChain.keySet());
orderList.sort(Integer::compareTo);
actionData.setExecutingStage(orderList.get(stageCount));
}
};
stageCursor.init();
do {
val metaActions = actionChain.get(actionData.getExecutingStage());
val listeningRecord = executeAndListening(metaActions, phaserRecord, source);
phaser.awaitAdvance(listeningRecord.phase());
// synchronized 同步防止 accepting 循环间、phase guard 判定后发生 stage 推进
// 导致新行动的 phaser 投放阶段错乱无法阻塞的场景
// 该 synchronized 将阶段推进与 accepting 监听 loop 捆绑为互斥的原子事件,避免了细粒度的 phaser 阶段竞态问题
synchronized (listeningRecord.accepting()) {
listeningRecord.accepting().set(false);
// 立即尝试推进,本次推进中,如果前方仍有未执行 stage将执行一次阶段推进
stageCursor.requestAdvance();
}
try {
// 针对行动链进行修正,修正需要传入执行历史、行动目标等内容
// 如果后续运行 corrector 触发频率较高,可考虑增加重试机制
val correctorInput = assemblyHelper.buildCorrectorInput(actionData, source);
val correctorResult = actionCorrector.execute(correctorInput);
actionCapability.handleInterventions(correctorResult.getMetaInterventionList(), actionData);
} catch (Exception ignored) {
}
// 第二次尝试进行阶段推进,本次负责补充上一次在不存在 stage时但 corrector 执行期间发生了 actionChain 的插入事件
// 如果第一次已经推进完毕,本次将会跳过
stageCursor.requestAdvance();
} while (stageCursor.next());
// 结束
actionCapability.removePhaserRecord(phaser);
if (actionData.getStatus() != ActionStatus.FAILED) {
// 如果是 ScheduledActionData, 则重置 ActionData 内容,记录执行历史与最终结果
if (actionData instanceof ScheduledActionData scheduledActionData) {
scheduledActionData.recordAndReset();
actionScheduler.execute(Set.of(scheduledActionData));
} else {
actionData.setStatus(ActionStatus.SUCCESS);
}
}
});
}
return null;
}
private MetaActionsListeningRecord executeAndListening(List<MetaAction> metaActions, PhaserRecord phaserRecord, String source) {
AtomicBoolean accepting = new AtomicBoolean(true);
AtomicInteger cursor = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
val phaser = phaserRecord.phaser();
val phase = phaser.register();
platformExecutor.execute(() -> {
boolean first = true;
while (accepting.get()) {
synchronized (accepting) {
MetaAction next = null;
synchronized (metaActions) {
if (cursor.get() < metaActions.size()) {
next = metaActions.get(cursor.getAndIncrement());
}
}
if (next == null) {
Thread.onSpinWait();
continue;
}
if (phaser.getPhase() != phase) {
metaActions.remove(next);
log.warn("行动阶段已推进,丢弃该行动: {}", next);
continue;
}
ExecutorService executor = next.getIo() ? virtualExecutor : platformExecutor;
executor.execute(buildMataActionTask(next, phaserRecord, source));
if (first) {
phaser.arriveAndDeregister();
latch.countDown();
first = false;
}
}
}
});
try {
// 确保执行一次,防止没来得及注册任务就已经结束
latch.await();
} catch (InterruptedException ignored) {
}
return new MetaActionsListeningRecord(accepting, phase);
}
private Runnable buildMataActionTask(MetaAction metaAction, PhaserRecord phaserRecord, String source) {
val phaser = phaserRecord.phaser();
phaser.register();
return () -> {
val actionKey = metaAction.getKey();
try {
val result = metaAction.getResult();
do {
val actionData = phaserRecord.actionData();
val executingStage = actionData.getExecutingStage();
val historyActionResults = actionData.getHistory().get(executingStage);
val additionalContext = actionData.getAdditionalContext().get(executingStage);
val extractorInput = assemblyHelper.buildExtractorInput(metaAction, source, historyActionResults, additionalContext);
val extractorResult = paramsExtractor.execute(extractorInput);
if (extractorResult.isOk()) {
metaAction.getParams().putAll(extractorResult.getParams());
runnerClient.submit(metaAction);
val historyAction = new HistoryAction(actionKey, actionCapability.loadMetaActionInfo(actionKey).getDescription(), metaAction.getResult().getData());
actionData.getHistory()
.computeIfAbsent(executingStage, integer -> new ArrayList<>())
.add(historyAction);
} else {
val repairerInput = assemblyHelper.buildRepairerInput(historyActionResults, metaAction, source);
val repairerResult = actionRepairer.execute(repairerInput);
switch (repairerResult.getStatus()) {
// 如果本次修复被认为成功,则将补充的信息添加至 additionalContext
case RepairerResult.RepairerStatus.OK -> {
additionalContext.addAll(repairerResult.getFixedData());
result.setStatus(MetaAction.Result.Status.WAITING);
}
// 此处的修复失败来自系统内部的执行失败:其余方式均不可行时将回退至当前分支
case RepairerResult.RepairerStatus.FAILED -> {
result.setStatus(MetaAction.Result.Status.FAILED);
result.setData("行动执行失败");
}
// 此处对应已在 repairer 内发起外部请求,故在此处进行阻塞
case RepairerResult.RepairerStatus.ACQUIRE -> {
phaserRecord.interrupt();
result.setStatus(MetaAction.Result.Status.WAITING);
}
}
}
} while (result.getStatus().equals(MetaAction.Result.Status.WAITING));
} catch (Exception e) {
log.error("Action executing failed: {}", actionKey, e);
} finally {
phaser.arriveAndDeregister();
}
};
}
private record MetaActionsListeningRecord(AtomicBoolean accepting, int phase) {
}
@SuppressWarnings("InnerClassMayBeStatic")
private class AssemblyHelper {
private AssemblyHelper() {
}
private RepairerInput buildRepairerInput(List<HistoryAction> historyActionsResults, MetaAction action, String userId) {
RepairerInput input = new RepairerInput();
MetaActionInfo metaActionInfo = actionCapability.loadMetaActionInfo(action.getKey());
input.setHistoryActionResults(historyActionsResults);
input.setParams(metaActionInfo.getParams());
input.setRecentMessages(cognationCapability.getChatMessages());
input.setActionDescription(metaActionInfo.getDescription());
input.setUserId(userId);
return input;
}
private ExtractorInput buildExtractorInput(MetaAction action, String source, List<HistoryAction> historyActionResults,
List<String> additionalContext) {
ExtractorInput input = new ExtractorInput();
input.setEvaluatedSlices(memoryCapability.getActivatedSlices(source));
input.setRecentMessages(cognationCapability.getChatMessages());
input.setMetaActionInfo(actionCapability.loadMetaActionInfo(action.getKey()));
input.setHistoryActionResults(historyActionResults);
input.setAdditionalContext(additionalContext);
return input;
}
private CorrectorInput buildCorrectorInput(ActionData actionData, String source) {
return CorrectorInput.builder()
.tendency(actionData.getTendency())
.source(actionData.getSource())
.reason(actionData.getReason())
.description(actionData.getDescription())
.history(actionData.getHistory().get(actionData.getExecutingStage()))
.status(actionData.getStatus())
.recentMessages(cognationCapability.getChatMessages())
.activatedSlices(memoryCapability.getActivatedSlices(source))
.build();
}
}
}

View File

@@ -0,0 +1,228 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore.ExecutorType;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.entity.MetaAction.Result;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.RepairerResult.RepairerStatus;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 负责识别行动链的修复
* <ol>
* <li>
* 可通过协调 {@link DynamicActionGenerator} 生成新的行动单元并调用,获取所需的参数信息(必要时持久化);
* </li>
* <li>
* 也可以直接调用已存在的行动程序获取信息;
* </li>
* <li>
* 如果上述都无法满足,将发起自对话借助干预模块进行操作或者借助自对话通道向用户发起沟通请求,该请求的目的一般为行动程序生成/调用指导或者用户侧的信息补充,后续还需要再走一遍参数修复流程
* </li>
* </ol>
*/
@Slf4j
@AgentSubModule
public class ActionRepairer extends AgentRunningSubModule<RepairerInput, RepairerResult> implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
@InjectCapability
private CognationCapability cognationCapability;
@InjectModule
private DynamicActionGenerator dynamicActionGenerator;
private final AssemblyHelper assemblyHelper = new AssemblyHelper();
private RunnerClient runnerClient;
@Init
void init() {
runnerClient = actionCapability.runnerClient();
}
@Override
public RepairerResult execute(RepairerInput data) {
RepairerResult result;
try {
String prompt = assemblyHelper.buildPrompt(data, null);
ChatResponse response = this.singleChat(prompt);
RepairerData repairerData = JSONObject.parseObject(response.getMessage(), RepairerData.class);
result = switch (repairerData.getRepairerType()) {
case ACTION_GENERATION ->
handleActionGeneration(JSONObject.parseObject(repairerData.getData(), GeneratorInput.class));
case ACTION_INVOCATION -> handleActionInvocation(
JSONObject.parseObject(repairerData.getData(), new TypeReference<List<String>>() {
}));
case USER_INTERACTION -> handleUserInteraction(repairerData.getData());
};
if (!repairerData.getRepairerType().equals(RepairerType.USER_INTERACTION)
&& result.getStatus().equals(RepairerResult.RepairerStatus.FAILED)) {
log.warn("常规行动修复失败,将尝试自对话通道");
prompt = assemblyHelper.buildPrompt(data, "常规行动修复失败,请尝试通过自对话通道获取必要的信息以完成行动参数的修复");
response = this.singleChat(prompt);
repairerData = JSONObject.parseObject(response.getMessage(), RepairerData.class);
handleUserInteraction(repairerData.getData());
}
} catch (Exception e) {
result = new RepairerResult();
result.setStatus(RepairerStatus.FAILED);
}
return result;
}
/**
* 负责根据输入内容进行行动单元的参数信息修复
*
* @param generatorInput 生成的行动单元参考内容,最好包含行动单元的执行逻辑
* @return 修复后的行动单元结果
*/
private RepairerResult handleActionGeneration(GeneratorInput generatorInput) {
RepairerResult result = new RepairerResult();
GeneratorResult generatorResult = dynamicActionGenerator.execute(generatorInput);
MetaAction tempAction = generatorResult.getTempAction();
if (tempAction == null) {
result.setStatus(RepairerStatus.FAILED);
return result;
}
runnerClient.submit(tempAction);
// 根据 tempAction 的执行状态设置修复结果
Result actionResult = tempAction.getResult();
if (actionResult.getStatus() != MetaAction.Result.Status.SUCCESS) {
result.setStatus(RepairerStatus.FAILED);
return result;
}
result.setStatus(RepairerStatus.OK);
result.getFixedData().add(actionResult.getData());
return result;
}
/**
* 负责根据输入内容进行行动单元的参数信息修复
*
* @param actionKeys 需要调用的行动单元Key列表
* @return 修复后的行动单元结果
*/
private RepairerResult handleActionInvocation(List<String> actionKeys) {
RepairerResult result = new RepairerResult();
CountDownLatch latch = new CountDownLatch(actionKeys.size());
ExecutorService virtual = actionCapability.getExecutor(ExecutorType.VIRTUAL);
ExecutorService platform = actionCapability.getExecutor(ExecutorType.PLATFORM);
ExecutorService executor;
AtomicInteger failedCount = new AtomicInteger(0);
for (String key : actionKeys) {
MetaAction action = actionCapability.loadMetaAction(key);
executor = action.getIo() ? virtual : platform;
executor.execute(() -> {
try {
runnerClient.submit(action);
result.getFixedData().add(action.getResult().getData());
} catch (Exception e) {
log.error("行动单元执行失败: {}", key, e);
failedCount.incrementAndGet();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (Exception e) {
log.warn("CountDownLatch 已中断");
}
if (actionKeys.size() - failedCount.get() > 0) {
result.setStatus(RepairerStatus.OK);
} else {
result.setStatus(RepairerStatus.FAILED);
}
return result;
}
private RepairerResult handleUserInteraction(String acquireContent) {
RepairerResult result = new RepairerResult();
result.setStatus(RepairerStatus.ACQUIRE);
// 发送自对话请求
return result;
}
@Override
public String modelKey() {
return "action_repairer";
}
@Override
public boolean withBasicPrompt() {
return false;
}
@SuppressWarnings("InnerClassMayBeStatic")
@Data
private class RepairerData {
private RepairerType repairerType;
private String data;
}
private enum RepairerType {
ACTION_GENERATION,
ACTION_INVOCATION,
USER_INTERACTION
}
@SuppressWarnings("InnerClassMayBeStatic")
private class AssemblyHelper {
private AssemblyHelper() {
}
private String buildPrompt(RepairerInput data, String specialInstruction) {
JSONObject prompt = new JSONObject();
JSONObject actionData = prompt.putObject("[本次行动信息]");
actionData.put("[行动描述]", data.getActionDescription());
JSONObject actionParamsData = actionData.putObject("[行动参数说明]");
actionParamsData.putAll(data.getParams());
JSONArray historyData = prompt.putArray("[历史行动执行结果]");
data.getHistoryActionResults().forEach(historyAction -> {
JSONObject historyItem = new JSONObject();
historyItem.put("[行动Key]", historyAction.actionKey());
historyItem.put("[行动描述]", historyAction.description());
historyItem.put("[行动结果]", historyAction.result());
historyData.add(historyItem);
});
JSONArray messageData = prompt.putArray("[最近消息列表]");
messageData.addAll(data.getRecentMessages());
if (specialInstruction != null) {
prompt.put("[特殊指令]", specialInstruction);
}
return prompt.toString();
}
}
}

View File

@@ -0,0 +1,91 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import com.alibaba.fastjson2.JSONObject;
import lombok.val;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.common.util.ExtractUtil;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.entity.GeneratedData;
import work.slhaf.partner.core.action.entity.MetaAction;
import work.slhaf.partner.core.action.runner.RunnerClient;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.GeneratorResult;
/**
* 负责依据输入内容生成可执行的动态行动单元,并选择是否持久化至 SandboxRunner 容器内
*/
@AgentSubModule
public class DynamicActionGenerator extends AgentRunningSubModule<GeneratorInput, GeneratorResult>
implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
private RunnerClient runnerClient;
@Init
void init() {
runnerClient = actionCapability.runnerClient();
}
@Override
public GeneratorResult execute(GeneratorInput input) {
GeneratorResult result = new GeneratorResult();
try {
// 由于 SCRIPT 类型程序都是在 SandboxRunner 内部的磁盘上加载然后执行的,
// 所以此处的输入内容也只需要指定输入参数、临时key、是否持久化即可路径将按照指定规则统一构建不可交给LLM生成
String prompt = buildPrompt(input);
// 响应结果需要包含几个特殊数据: 依赖项、代码内容、是否序列化、响应数据释义
ChatResponse response = this.singleChat(prompt);
GeneratedData generatorData = JSONObject
.parseObject(ExtractUtil.extractJson(response.getMessage()), GeneratedData.class);
val location = runnerClient.buildTmpPath(input.getActionName(), generatorData.getCodeType());
MetaAction tempAction = new MetaAction(
input.getActionName(),
true,
MetaAction.Type.ORIGIN,
location
);
// 将临时行动单元序列化至临时文件夹,并设置程序路径、放置在队列中,等待执行状态变化,并根据序列化选项选择是否补充 MetaActionInfo 并持久序列化
// 通过 ActionCapability 暴露的接口序列化至临时文件夹同时返回Path对象并设置。队列建议交给 SandboxRunner
// 持有,包括监听与序列化线程
runnerClient.tmpSerialize(tempAction, generatorData.getCode(), generatorData.getCodeType());
if (generatorData.isSerialize()) {
waitingSerialize();
}
result.setTempAction(tempAction);
} catch (Exception e) {
result.setTempAction(null);
}
return result;
}
private void waitingSerialize() {
throw new UnsupportedOperationException("Unimplemented method 'waitingSerialize'");
}
private String buildPrompt(GeneratorInput data) {
JSONObject prompt = new JSONObject();
prompt.put("[行动描述]", data.getDescription());
// prompt.putObject("[行动参数]").putAll(data.getParams());
prompt.putObject("[行动参数描述]").putAll(data.getParamsDescription());
return prompt.toString();
}
@Override
public String modelKey() {
return "dynamic_generator";
}
@Override
public boolean withBasicPrompt() {
return false;
}
}

View File

@@ -0,0 +1,75 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorInput;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ExtractorResult;
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.HistoryAction;
import java.util.HashMap;
import java.util.List;
/**
* 负责依据输入内容进行行动单元的参数信息提取
*/
@Slf4j
@AgentSubModule
public class ParamsExtractor extends AgentRunningSubModule<ExtractorInput, ExtractorResult> implements ActivateModel {
@Override
public ExtractorResult execute(ExtractorInput input) {
String prompt = buildPrompt(input);
ChatResponse response = this.singleChat(prompt);
ExtractorResult result;
try {
result = JSONObject.parseObject(response.getMessage(), ExtractorResult.class);
} catch (Exception e) {
log.error("ParamsExtractor解析结果失败返回内容{}", response.getMessage(), e);
result = new ExtractorResult();
result.setOk(false);
result.setParams(new HashMap<>());
}
return result;
}
private String buildPrompt(ExtractorInput input) {
JSONObject prompt = new JSONObject();
JSONObject actionData = prompt.putObject("[本次行动信息]");
MetaActionInfo actionInfo = input.getMetaActionInfo();
actionData.put("[行动描述]", actionInfo.getDescription());
actionData.put("[行动参数说明]", actionInfo.getParams());
JSONArray historyData = prompt.putArray("[历史行动执行结果]");
List<HistoryAction> historyActions = input.getHistoryActionResults();
for (HistoryAction historyAction : historyActions) {
JSONObject historyItem = new JSONObject();
historyItem.put("[行动Key]", historyAction.actionKey());
historyItem.put("[行动描述]", historyAction.description());
historyItem.put("[行动结果]", historyAction.result());
historyData.add(historyItem);
}
JSONArray messageData = prompt.putArray("[最近消息列表]");
messageData.addAll(input.getRecentMessages());
return prompt.toString();
}
@Override
public String modelKey() {
return "params_extractor";
}
@Override
public boolean withBasicPrompt() {
return false;
}
}

View File

@@ -0,0 +1,5 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity
import work.slhaf.partner.core.action.entity.ActionData
data class ActionExecutorInput(val actions: Set<ActionData>)

View File

@@ -0,0 +1,24 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Builder;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import java.util.List;
@Data
@Builder
public class CorrectorInput {
private String tendency;
private String source;
private String reason;
private String description;
private List<HistoryAction> history;
private ActionData.ActionStatus status;
private List<Message> recentMessages;
private List<EvaluatedSlice> activatedSlices;
}

View File

@@ -0,0 +1,11 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention;
import java.util.List;
@Data
public class CorrectorResult {
private List<MetaIntervention> metaInterventionList;
}

View File

@@ -0,0 +1,32 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.MetaActionInfo;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import java.util.List;
@Data
public class ExtractorInput {
/**
* 目标 MetaActionInfo
*/
private MetaActionInfo metaActionInfo;
/**
* 可参考的记忆切片
*/
private List<EvaluatedSlice> evaluatedSlices;
/**
* 历史行动执行结果
*/
private List<HistoryAction> historyActionResults;
/**
* 最近的消息列表
*/
private List<Message> recentMessages;
/**
* 额外的上下文信息(可来自修复器等)
*/
private List<String> additionalContext;
}

View File

@@ -0,0 +1,11 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
import java.util.Map;
@Data
public class ExtractorResult {
private boolean ok;
private Map<String, Object> params;
}

View File

@@ -0,0 +1,13 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
import java.util.Map;
@Data
public class GeneratorInput {
private String actionName;
private Map<String, Object> params;
private String description;
private Map<String, String> paramsDescription;
}

View File

@@ -0,0 +1,9 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
import work.slhaf.partner.core.action.entity.MetaAction;
@Data
public class GeneratorResult {
private MetaAction tempAction;
}

View File

@@ -0,0 +1,4 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
public record HistoryAction(String actionKey, String description, String result) {
}

View File

@@ -0,0 +1,17 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import java.util.List;
import java.util.Map;
@Data
public class RepairerInput {
private String userId;
private List<Message> recentMessages;
private Map<String, Object> params;
private String actionDescription;
private List<HistoryAction> historyActionResults;
}

View File

@@ -0,0 +1,30 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.entity;
import lombok.Data;
import java.util.List;
/**
* 行动修复结果,包含行动状态和修复后的参数
*/
@Data
public class RepairerResult {
private RepairerStatus status;
private List<String> fixedData;
public enum RepairerStatus {
/**
* 成功修复: 携带修复后参数; 此种情况对应 Repairer 通过某种方式获取到了完整的参数(调用额外的行动)
*/
OK,
/**
* 发送了自对话请求干预行动,这类一般是补充信息或者提供行动指导,后续必须再步入修复进程,但需要设置层级
*/
ACQUIRE,
/**
* 修复失败(简单修复、自对话通道均出现错误,正常情况不应该出现)
*/
FAILED
}
}

View File

@@ -0,0 +1,14 @@
package work.slhaf.partner.module.modules.action.dispatcher.executor.exception;
import work.slhaf.partner.api.agent.runtime.exception.AgentRuntimeException;
public class ActionExecutingFailedException extends AgentRuntimeException {
public ActionExecutingFailedException(String message) {
super(message);
}
public ActionExecutingFailedException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,400 @@
package work.slhaf.partner.module.modules.action.dispatcher.scheduler
import com.cronutils.model.CronType
import com.cronutils.model.definition.CronDefinition
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.model.time.ExecutionTime
import com.cronutils.parser.CronParser
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.slf4j.LoggerFactory
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule
import work.slhaf.partner.api.agent.factory.module.annotation.Init
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule
import work.slhaf.partner.core.action.ActionCapability
import work.slhaf.partner.core.action.entity.ActionData
import work.slhaf.partner.core.action.entity.ScheduledActionData
import work.slhaf.partner.module.modules.action.dispatcher.executor.ActionExecutor
import work.slhaf.partner.module.modules.action.dispatcher.executor.entity.ActionExecutorInput
import java.io.Closeable
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.stream.Collectors
import kotlin.jvm.optionals.getOrNull
@AgentSubModule
class ActionScheduler : AgentRunningSubModule<Set<ScheduledActionData>, Void>() {
@InjectCapability
private lateinit var actionCapability: ActionCapability
@InjectModule
private lateinit var actionExecutor: ActionExecutor
private lateinit var timeWheel: TimeWheel
private val schedulerScope =
CoroutineScope(Dispatchers.Default + SupervisorJob() + CoroutineName("ActionScheduler"))
companion object {
private val log = LoggerFactory.getLogger(ActionScheduler::class.java)
}
@Init
fun init() {
val listScheduledActions: () -> Set<ScheduledActionData> = {
actionCapability.listActions(null, null)
.stream()
.filter { it is ScheduledActionData }
.map { it as ScheduledActionData }
.collect(Collectors.toSet())
}
val onTrigger: (Set<ScheduledActionData>) -> Unit = { actionExecutor.execute(ActionExecutorInput(it)) }
timeWheel = TimeWheel(listScheduledActions, onTrigger)
setupShutdownHook()
}
private fun setupShutdownHook() {
Runtime.getRuntime().addShutdownHook(Thread {
timeWheel.close()
schedulerScope.cancel()
})
}
override fun execute(scheduledActionDataSet: Set<ScheduledActionData>?): Void? {
schedulerScope.launch {
scheduledActionDataSet?.run {
for (scheduledActionData in scheduledActionDataSet) {
log.debug("New action to schedule: {}", scheduledActionData)
actionCapability.putAction(scheduledActionData)
timeWheel.schedule(scheduledActionData)
}
}
}
return null
}
private class TimeWheel(
val listScheduledActions: () -> Set<ScheduledActionData>,
val onTrigger: (toTrigger: Set<ScheduledActionData>) -> Unit
) : Closeable {
private val actionsGroupByHour = Array<MutableSet<ScheduledActionData>>(24) { mutableSetOf() }
private val wheel = Array<MutableSet<ScheduledActionData>>(60 * 60) { mutableSetOf() }
private var recordHour: Int = -1
private var recordDay: Int = -1
private val state = MutableStateFlow(WheelState.SLEEPING)
private val wheelActionsLock = Mutex()
private val timeWheelScope = CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("TimeWheel"))
private val cronDefinition: CronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)
private val cronParser: CronParser = CronParser(cronDefinition)
/**
* 根据 primaryActions 建立时间轮,并只加载当天任务,同时启动 tick 线程
*/
init {
// 启动时间轮
launchWheel()
}
suspend fun schedule(actionData: ScheduledActionData) {
if (actionData.status != ActionData.ActionStatus.PREPARE) {
return
}
checkThenExecute {
val parseToZonedDateTime = parseToZonedDateTime(
actionData.scheduleType,
actionData.scheduleContent,
it
) ?: run {
logFailedStatus(actionData)
return@checkThenExecute
}
log.debug("Action next execution time: {}", parseToZonedDateTime)
val hour = parseToZonedDateTime.hour
actionsGroupByHour[hour].add(actionData)
log.debug("Action scheduled at {}", hour)
if (it.hour == hour) {
val wheelOffset = parseToZonedDateTime.minute * 60 + parseToZonedDateTime.second
wheel[wheelOffset].add(actionData)
state.value = WheelState.ACTIVE
log.debug("Action scheduled at wheel offset {}", wheelOffset)
}
}
}
private fun launchWheel() {
fun collectToTrigger(tick: Int, previousTick: Int, triggerHour: Int): Set<ScheduledActionData>? {
if (tick > previousTick) {
val toTrigger = mutableSetOf<ScheduledActionData>()
for (i in previousTick..tick) {
val bucket = wheel[i]
if (bucket.isNotEmpty()) {
toTrigger.addAll(bucket)
val bucketUuids = bucket.asSequence().map { it.uuid }.toHashSet()
actionsGroupByHour[triggerHour].removeIf { it.uuid in bucketUuids }
bucket.clear() // 避免重复触发
}
}
return toTrigger
}
return null
}
suspend fun CoroutineScope.wheel(launchingTime: ZonedDateTime, primaryTickAdvanceTime: Long) {
val launchingHour = launchingTime.hour
var tick = launchingTime.minute * 60 + launchingTime.second
// 让节拍器从“启动时刻的下一秒”开始(避免立即 step=0
var nextTickNanos = primaryTickAdvanceTime + 1_000_000_000L
while (isActive) {
// 1) 计算落后多少秒:至少 1正常推进也可能 >1追赶
val now0 = System.nanoTime()
val lagNanos = now0 - nextTickNanos
val step = if (lagNanos < 0) 1 else (lagNanos / 1_000_000_000L).toInt() + 1
val previousTick = tick
tick = (tick + step).coerceAtMost(wheel.lastIndex)
// 2) 推进节拍器:按“理论秒”前进 step 次
nextTickNanos += step.toLong() * 1_000_000_000L
var shouldBreak = false
var toTrigger: Set<ScheduledActionData>? = null
checkThenExecute(false) {
if (it.hour != launchingHour) {
shouldBreak = true
toTrigger = collectToTrigger(wheel.lastIndex, previousTick, launchingHour)
log.debug(
"Hour changed, previousTick: {}, tick: {}, toTriggerSize: {}",
previousTick,
tick,
toTrigger?.size
)
return@checkThenExecute
}
toTrigger = collectToTrigger(tick, previousTick, launchingHour)
if (tick >= wheel.lastIndex || actionsGroupByHour[launchingHour].isEmpty()) {
state.value = WheelState.SLEEPING
shouldBreak = true
return@checkThenExecute
}
}
toTrigger?.takeIf { it.isNotEmpty() }?.let {
onTrigger(it)
log.debug("Executing action at hour {} tick {}", launchingHour, tick)
}
if (shouldBreak) {
log.debug("Wheel stopped at tick {}", tick)
break
}
// 3) 精确睡到下一次理论 tick用最新 nanoTime
val now1 = System.nanoTime()
val sleepNanos = nextTickNanos - now1
if (sleepNanos > 0) {
delay(sleepNanos / 1_000_000L) // 毫秒级 delay 足够;剩余 nanos 不必忙等
}
}
}
suspend fun wait(currentTime: ZonedDateTime) {
val nextHour = currentTime.truncatedTo(ChronoUnit.HOURS).plusHours(1)
val seconds = Duration.between(
currentTime, nextHour
).toMillis()
// withTimeoutOrNull 内部已处理 seconds 小于 0 的情况
log.debug("Start waiting {} ms at {}, target time: {}", seconds, currentTime, nextHour)
withTimeoutOrNull(seconds) {
state.first { it == WheelState.ACTIVE }
}
log.debug("Waiting ended at {}", ZonedDateTime.now())
}
timeWheelScope.launch {
while (isActive) {
// 判断是否该步入下一小时
var shouldWait: Boolean? = null
var currentTime: ZonedDateTime? = null
var primaryTickAdvanceTime: Long? = null
checkThenExecute {
currentTime = it
shouldWait = actionsGroupByHour[it.hour].isEmpty()
// 由于 wheel 的启动时间可能存在延迟,而时内推进由 nanoTime 保证不会漏发,
// 正常的时序结束又由 tick 是否触顶、当前时是否存在额外任务触发,
// 而启动时无触发保障,此时一并初始化 tick 推进时间,足以应对 check 与 wheel 间的这段时间间隔
primaryTickAdvanceTime = System.nanoTime()
}
// 如果该时无任务则等待,插入事件可提前唤醒
if (shouldWait!!) {
// 计算距离下一小时的时间,等待
currentTime?.let { wait(it) }
continue
}
// 唤醒进行时间轮循环
wheel(currentTime!!, primaryTickAdvanceTime!!)
}
}
}
suspend fun checkThenExecute(finallyToExecute: Boolean = true, then: (currentTime: ZonedDateTime) -> Unit) =
wheelActionsLock.withLock {
fun loadActions(
source: Set<ScheduledActionData>,
now: ZonedDateTime,
load: (latestExecutingTime: ZonedDateTime, actionData: ScheduledActionData) -> Unit,
repair: () -> Unit
) {
val runLoading = {
for (actionData in source) {
val nextExecutingTime =
parseToZonedDateTime(
actionData.scheduleType,
actionData.scheduleContent,
now
) ?: run {
logFailedStatus(actionData)
continue
}
load(nextExecutingTime, actionData)
}
}
repair()
runLoading()
}
fun loadHourActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutionTime, actionData ->
val secondsTime = latestExecutionTime.minute * 60 + latestExecutionTime.second
wheel[secondsTime].add(actionData)
log.debug("Action loaded to hour: {}", actionData)
}
val repair: () -> Unit = {
for (set in wheel) {
set.clear()
}
}
loadActions(actionsGroupByHour[currentTime.hour], currentTime, load, repair)
}
fun loadDayActions(currentTime: ZonedDateTime) {
val load: (ZonedDateTime, ScheduledActionData) -> Unit = { latestExecutingTime, actionData ->
actionsGroupByHour[latestExecutingTime.hour].add(actionData)
log.debug("Action loaded to day: {}", actionData)
}
val repair: () -> Unit = {
for (set in actionsGroupByHour) {
set.clear()
}
}
loadActions(listScheduledActions(), currentTime, load, repair)
}
fun refreshIfNeeded(now: ZonedDateTime) {
val d = now.dayOfMonth
val h = now.hour
if (d != recordDay) {
recordDay = d
recordHour = h
loadDayActions(now)
loadHourActions(now)
} else if (h != recordHour) {
recordHour = h
loadHourActions(now)
}
}
val now = ZonedDateTime.now()
if (finallyToExecute) {
refreshIfNeeded(now)
then(now)
} else {
then(now)
refreshIfNeeded(now)
}
}
private fun parseToZonedDateTime(
scheduleType: ScheduledActionData.ScheduleType,
scheduleContent: String,
now: ZonedDateTime
): ZonedDateTime? {
return when (scheduleType) {
ScheduledActionData.ScheduleType.CYCLE
-> {
val cron = try {
cronParser.parse(scheduleContent).validate()
} catch (_: Exception) {
return null
}
val executionTime = ExecutionTime.forCron(cron)
executionTime.nextExecution(now).getOrNull()
}
ScheduledActionData.ScheduleType.ONCE -> {
val executionTime = try {
ZonedDateTime.parse(scheduleContent)
} catch (_: Exception) {
return null
}
if (executionTime.plusSeconds(1).isBefore(now) || executionTime.dayOfMonth != now.dayOfMonth)
null
else
executionTime
}
}
}
private fun logFailedStatus(actionData: ScheduledActionData) {
log.warn(
"行动未加载uuid: {}, source: {}, tendency: {}, scheduleContent: {}",
actionData.uuid,
actionData.source,
actionData.tendency,
actionData.scheduleContent,
)
}
override fun close() {
timeWheelScope.cancel()
}
private enum class WheelState {
ACTIVE,
SLEEPING,
}
}
}

View File

@@ -0,0 +1,251 @@
package work.slhaf.partner.module.modules.action.interventor;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.val;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.action.entity.PhaserRecord;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.module.common.module.PreRunningModule;
import work.slhaf.partner.module.modules.action.interventor.entity.InterventionType;
import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention;
import work.slhaf.partner.module.modules.action.interventor.evaluator.InterventionEvaluator;
import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorInput;
import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult;
import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult.EvaluatedInterventionData;
import work.slhaf.partner.module.modules.action.interventor.recognizer.InterventionRecognizer;
import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.RecognizerInput;
import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.RecognizerResult;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
/**
* 负责识别潜在的行动干预信息,作用于正在进行或已存在的行动池中内容
*/
@AgentModule(name = "action_identifier", order = 2)
public class ActionInterventor extends PreRunningModule implements ActivateModel {
@InjectModule
private InterventionRecognizer interventionRecognizer;
@InjectModule
private InterventionEvaluator interventionEvaluator;
@InjectCapability
private ActionCapability actionCapability;
@InjectCapability
private CognationCapability cognationCapability;
@InjectCapability
private MemoryCapability memoryCapability;
private final AssemblyHelper assemblyHelper = new AssemblyHelper();
private final PromptHelper promptHelper = new PromptHelper();
/**
* 键: 本次调用uuid
* 值本次调用对应的prompt
*/
private final Map<String, Map<String, String>> interventionPrompt = new HashMap<>();
@Override
protected void doExecute(PartnerRunningFlowContext context) {
// 综合当前正在进行的行动链信息、用户交互历史、激活的记忆切片,尝试识别出是否存在行动干预意图
// 首先通过recognizer进行快速意图识别识别成功则步入评估阶段评估成功则直接作用于目标行动链
// 进行快速意图识别时必须结合近期对话与进行中行动链情况
// 干预意图识别
String uuid = context.getUuid();
String userId = context.getUserId();
RecognizerResult recognizerResult = interventionRecognizer
.execute(assemblyHelper.buildRecognizerInput(userId, context.getInput())); // 此处的输入内容携带了所有 PhaserRecord
if (!recognizerResult.isOk()) {
promptHelper.setupNoInterventionPrompt(uuid);
return;
}
// 干预意图评估
EvaluatorResult evaluatorResult = interventionEvaluator
.execute(assemblyHelper.buildEvaluatorInput(recognizerResult, userId));
List<EvaluatedInterventionData> executingDataList = evaluatorResult.getExecutingDataList();
List<EvaluatedInterventionData> preparedDataList = evaluatorResult.getPreparedDataList();
// 意图评估结果处理
if (evaluatorResult.isOk()) {
// 对存在异常ActionKey的评估结果列表进行过滤
invalidActionKeysFilter(executingDataList);
invalidActionKeysFilter(preparedDataList);
// 同步写入prompt异步处理干预行为异步在处理流程中体现
promptHelper.setupInterventionPrompt(uuid, executingDataList, preparedDataList);
handleInterventions(executingDataList, recognizerResult.getExecutingInterventions());
handleInterventions(preparedDataList, recognizerResult.getPreparedInterventions());
} else {
promptHelper.setupInterventionIgnoredPrompt(uuid, executingDataList, preparedDataList);
}
}
private void handleInterventions(List<EvaluatedInterventionData> interventionDataList, Map<String, ActionData> interventionDataMap) {
val executor = actionCapability.getExecutor(ActionCore.ExecutorType.PLATFORM);
executor.execute(() -> {
for (EvaluatedInterventionData interventionData : interventionDataList) {
// 此处拿到的为 ActionData 或者 PhaserRecord, 来自 Recognizer 的封装
val data = interventionDataMap.get(interventionData.getTendency());
actionCapability.handleInterventions(interventionData.getMetaInterventionList(), data);
}
});
}
private void invalidActionKeysFilter(List<EvaluatedInterventionData> interventions) {
List<EvaluatedInterventionData> toRemove = new ArrayList<>();
for (EvaluatedInterventionData intervention : interventions) {
List<MetaIntervention> interventionData = intervention.getMetaInterventionList();
List<String> actions = new ArrayList<>();
for (MetaIntervention metaData : interventionData) {
actions.addAll(metaData.getActions());
}
// 如果存在异常行动key则可视为该评估结果存在问题直接忽略该结果
if (!actionCapability.checkExists(actions.toArray(String[]::new))) {
toRemove.add(intervention);
}
// 针对 REBUILD 类型进行特殊校验, REBUILD 类型必须满足所有 MetaIntervention 的类型均为 REBUILD
if (!checkRebuildType(interventionData)) {
toRemove.add(intervention);
}
}
interventions.removeAll(toRemove);
}
private boolean checkRebuildType(List<MetaIntervention> interventionData) {
boolean hasRebuild = false;
for (MetaIntervention meta : interventionData) {
if (meta.getType() == InterventionType.REBUILD) {
hasRebuild = true;
} else if (hasRebuild) {
// 已经存在REBUILD类型但又发现了非REBUILD类型不合法
return false;
}
}
return true;
}
@Override
public String modelKey() {
return "action_identifier";
}
@Override
public boolean withBasicPrompt() {
return false;
}
@Override
protected Map<String, String> getPromptDataMap(PartnerRunningFlowContext context) {
return interventionPrompt.remove(context.getUuid());
}
@Override
protected String moduleName() {
return "[行动干预识别模块]";
}
private final class AssemblyHelper {
private AssemblyHelper() {
}
private RecognizerInput buildRecognizerInput(String userId, String input) {
RecognizerInput recognizerInput = new RecognizerInput();
recognizerInput.setInput(input);
recognizerInput.setUserDialogMapStr(memoryCapability.getUserDialogMapStr(userId));
// 参考的对话列表大小或需调整
recognizerInput.setRecentMessages(cognationCapability.getChatMessages());
recognizerInput.setExecutingActions(actionCapability.listPhaserRecords().stream().map(PhaserRecord::actionData).toList());
recognizerInput.setPreparedActions(actionCapability.listActions(ActionData.ActionStatus.PREPARE, userId).stream().toList());
return recognizerInput;
}
private EvaluatorInput buildEvaluatorInput(RecognizerResult recognizerResult, String userId) {
EvaluatorInput input = new EvaluatorInput();
input.setExecutingInterventions(recognizerResult.getExecutingInterventions());
input.setPreparedInterventions(recognizerResult.getPreparedInterventions());
input.setRecentMessages(cognationCapability.getChatMessages());
input.setActivatedSlices(memoryCapability.getActivatedSlices(userId));
return input;
}
}
private final class PromptHelper {
private PromptHelper() {
}
private void setupInterventionIgnoredPrompt(String uuid, List<EvaluatedInterventionData> executingDataList, List<EvaluatedInterventionData> preparedDataList) {
List<EvaluatedInterventionData> total = Stream.concat(executingDataList.stream(), preparedDataList.stream()).toList();
JSONArray reasons = new JSONArray();
for (EvaluatedInterventionData data : total) {
JSONObject reason = reasons.addObject();
reason.put("[干预倾向]", data.getTendency());
reason.put("[未采用原因]", data.getDescription());
}
synchronized (interventionPrompt) {
interventionPrompt.put(uuid, Map.of(
"[识别状态] <是否识别到干预已存在行动的意图>", "识别到,但都未采用",
"[忽略原因] <各个意图被忽略的原因>", reasons.toString(),
"[干预行动] <将对已存在行动做出的行为>", "无行为"));
}
}
private void setupInterventionPrompt(String uuid, List<EvaluatedInterventionData> executingDataList,
List<EvaluatedInterventionData> preparedDataList) {
JSONArray contents = new JSONArray();
List<EvaluatedInterventionData> temp = Stream.concat(executingDataList.stream(), preparedDataList.stream()).toList();
for (EvaluatedInterventionData data : temp) {
if (!data.isOk()) {
continue;
}
String tendency = data.getTendency();
JSONObject newElement = contents.addObject();
newElement.put("[干预倾向]", tendency);
JSONArray changes = newElement.putArray("[行动链变动情况]");
for (MetaIntervention intervention : data.getMetaInterventionList()) {
JSONObject change = changes.addObject();
change.put("[干预类型]", intervention.getType());
change.put("[干预序号]", intervention.getOrder());
change.putArray("[干预内容]").addAll(intervention.getActions());
}
}
synchronized (interventionPrompt) {
interventionPrompt.put(uuid, Map.of(
"[识别状态] <是否识别到干预已存在行动的意图>", "识别到,将采用",
"[干预内容] <将对已存在行动做出的行为>", contents.toString()));
}
}
private void setupNoInterventionPrompt(String uuid) {
interventionPrompt.put(uuid, Map.of(
"[识别状态] <是否识别到干预已存在行动的意图>", "未识别到干预意图",
"[干预行动] <将对已存在行动做出的行为>", "无行动"));
}
}
}

View File

@@ -0,0 +1,28 @@
package work.slhaf.partner.module.modules.action.interventor.entity;
public enum InterventionType {
/**
* 追加行动: 追加至指定行动链序列之后才执行
*/
APPEND,
/**
* 插入行动: 指定行动链序列执行过程中即时新增并执行
*/
INSERT,
/**
* 重建行动: 重建指定行动链序列之后的所有行动内容
*/
REBUILD,
/**
* 删除行动: 删除指定行动链序列上的指定行动单元
*/
DELETE,
/**
* 取消行动链: 中断并取消指定行动链的执行
*/
CANCEL
}

View File

@@ -0,0 +1,21 @@
package work.slhaf.partner.module.modules.action.interventor.entity;
import lombok.Data;
import java.util.List;
@Data
public class MetaIntervention {
/**
* 干预数据类型
*/
private InterventionType type;
/**
* 干预数据对应的行动链序列
*/
private int order;
/**
* 干预数据所需的行动key列表
*/
private List<String> actions;
}

View File

@@ -0,0 +1,99 @@
package work.slhaf.partner.module.modules.action.interventor.evaluator;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore.ExecutorType;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorInput;
import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult;
import work.slhaf.partner.module.modules.action.interventor.evaluator.entity.EvaluatorResult.EvaluatedInterventionData;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@Slf4j
@AgentSubModule
public class InterventionEvaluator extends AgentRunningSubModule<EvaluatorInput, EvaluatorResult>
implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
/**
* 基于干预意图、记忆切片、交互上下文、已有行动程序综合评估,尝试评估并选取出合适的行动程序,交付给 ActionInterventor
*/
@Override
public EvaluatorResult execute(EvaluatorInput input) {
// 获取必须数据
ExecutorService executor = actionCapability.getExecutor(ExecutorType.VIRTUAL);
Map<String, ActionData> executingInterventions = input.getExecutingInterventions();
Map<String, ActionData> preparedInterventions = input.getPreparedInterventions();
CountDownLatch latch = new CountDownLatch(executingInterventions.size() + preparedInterventions.size());
// 创建结果容器
EvaluatorResult result = new EvaluatorResult();
List<EvaluatedInterventionData> executingDataList = result.getExecutingDataList();
List<EvaluatedInterventionData> preparedDataList = result.getPreparedDataList();
// 并发评估
evaluateIntervention(executingDataList, executingInterventions, input, executor, latch);
evaluateIntervention(preparedDataList, preparedInterventions, input, executor, latch);
try {
latch.await();
} catch (InterruptedException e) {
log.warn("CountDownLatch阻塞已中断");
}
return result;
}
private void evaluateIntervention(List<EvaluatedInterventionData> evaluatedDataList, Map<String, ActionData> interventionMap, EvaluatorInput input, ExecutorService executor, CountDownLatch latch) {
interventionMap.forEach((tendency, actionData) -> executor.execute(() -> {
try {
String prompt = buildPrompt(input.getRecentMessages(), input.getActivatedSlices(), actionData, tendency);
ChatResponse response = this.singleChat(prompt);
EvaluatedInterventionData evaluatedData = JSONObject.parseObject(response.getMessage(),
EvaluatedInterventionData.class);
synchronized (evaluatedDataList) {
evaluatedDataList.add(evaluatedData);
}
} catch (Exception e) {
log.error("干预意图评估出错: {}", tendency, e);
} finally {
latch.countDown();
}
}));
}
private String buildPrompt(List<Message> recentMessages, List<EvaluatedSlice> activatedSlices,
ActionData actionData, String tendency) {
JSONObject json = new JSONObject();
json.put("干预倾向", tendency);
json.putArray("近期对话").addAll(recentMessages);
json.putArray("参考记忆").addAll(activatedSlices);
json.put("将干预的行动", JSONObject.toJSONString(actionData));
return json.toJSONString();
}
@Override
public String modelKey() {
return "intervention_evaluator";
}
@Override
public boolean withBasicPrompt() {
return false;
}
}

View File

@@ -0,0 +1,17 @@
package work.slhaf.partner.module.modules.action.interventor.evaluator.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import java.util.List;
import java.util.Map;
@Data
public class EvaluatorInput {
private Map<String, ActionData> executingInterventions;
private Map<String, ActionData> preparedInterventions;
private List<EvaluatedSlice> activatedSlices;
private List<Message> recentMessages;
}

View File

@@ -0,0 +1,33 @@
package work.slhaf.partner.module.modules.action.interventor.evaluator.entity;
import lombok.Data;
import work.slhaf.partner.module.modules.action.interventor.entity.MetaIntervention;
import java.util.List;
/**
* 干预倾向评估结果包含评估通过的倾向文本、对行动链的行为、指定操作的行动单元key、未通过的原因
*/
@Data
public class EvaluatorResult {
/**
* 是否存在通过的干预倾向
*/
private boolean ok;
private List<EvaluatedInterventionData> executingDataList;
private List<EvaluatedInterventionData> preparedDataList;
@Data
public static class EvaluatedInterventionData {
/**
* 是否通过
*/
private boolean ok;
private String tendency;
/**
* 描述信息(包括通过、失败原因)
*/
private String description;
private List<MetaIntervention> metaInterventionList;
}
}

View File

@@ -0,0 +1,102 @@
package work.slhaf.partner.module.modules.action.interventor.recognizer;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.MetaRecognizerResult;
import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.RecognizerInput;
import work.slhaf.partner.module.modules.action.interventor.recognizer.entity.RecognizerResult;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@Slf4j
@AgentSubModule
public class InterventionRecognizer extends AgentRunningSubModule<RecognizerInput, RecognizerResult> implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
@Override
public RecognizerResult execute(RecognizerInput input) {
// 获取必须数据
ExecutorService executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
List<ActionData> executingActions = input.getExecutingActions();
List<ActionData> preparedActions = input.getPreparedActions();
CountDownLatch countDownLatch = new CountDownLatch(executingActions.size() + preparedActions.size());
// 创建结果容器
RecognizerResult recognizerResult = new RecognizerResult();
Map<String, ActionData> executingInterventions = recognizerResult.getExecutingInterventions();
Map<String, ActionData> preparedInterventions = recognizerResult.getPreparedInterventions();
// 执行识别操作
recognizeIntervention(executingInterventions, executingActions, executor, input, countDownLatch);
recognizeIntervention(preparedInterventions, preparedActions, executor, input, countDownLatch);
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.warn("CountDownLatch阻塞已中断");
}
return recognizerResult;
}
private void recognizeIntervention(Map<String, ActionData> interventionsMap, List<ActionData> actions, ExecutorService executor, RecognizerInput input, CountDownLatch latch) {
for (ActionData data : actions) {
executor.execute(() -> {
try {
String prompt = buildPrompt(data, input);
ChatResponse response = this.singleChat(prompt);
MetaRecognizerResult result = JSONObject.parseObject(response.getMessage(), MetaRecognizerResult.class);
if (result.isOk()) {
synchronized (interventionsMap) {
interventionsMap.put(result.getIntervention(), data);
}
}
} catch (Exception e) {
log.error("LLM干预意图提取出错", e);
} finally {
latch.countDown();
}
});
}
}
private String buildPrompt(ActionData actionData, RecognizerInput input) {
JSONObject json = new JSONObject();
JSONObject actionInfo = json.putObject("行动信息");
actionInfo.put("行动倾向", actionData.getTendency());
actionInfo.put("行动原因", actionData.getReason());
actionInfo.put("行动描述", actionData.getDescription());
actionInfo.put("行动状态", actionData.getStatus());
actionInfo.put("行动来源", actionData.getSource());
JSONObject interactionInfo = json.putObject("交互信息");
interactionInfo.put("用户输入", input.getInput());
interactionInfo.put("当前对话", input.getRecentMessages());
interactionInfo.put("近期对话", input.getUserDialogMapStr());
return json.toString();
}
@Override
public String modelKey() {
return "intervention_recognizer";
}
@Override
public boolean withBasicPrompt() {
return false;
}
}

View File

@@ -0,0 +1,9 @@
package work.slhaf.partner.module.modules.action.interventor.recognizer.entity;
import lombok.Data;
@Data
public class MetaRecognizerResult {
private boolean ok;
private String intervention;
}

View File

@@ -0,0 +1,22 @@
package work.slhaf.partner.module.modules.action.interventor.recognizer.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.ActionData;
import java.util.List;
@Data
public class RecognizerInput {
private String input;
private List<Message> recentMessages;
/**
* 当前用户对应的近两日对话缓存
*/
private String userDialogMapStr;
/**
* 正在执行的行动-Phaser记录列表在Recognizer中结合本次输入并发评估(考虑到不同行动链之间对LLM的影响)
*/
private List<ActionData> executingActions;
private List<ActionData> preparedActions;
}

View File

@@ -0,0 +1,29 @@
package work.slhaf.partner.module.modules.action.interventor.recognizer.entity;
import lombok.Data;
import work.slhaf.partner.core.action.entity.ActionData;
import java.util.HashMap;
import java.util.Map;
@Data
public class RecognizerResult {
private boolean ok;
/**
* <h4>将被干预的‘执行中行动’</h4>
* key: 干预倾向
* <br/>
* value: 干预倾向将作用的行动数据
*/
private Map<String, ActionData> executingInterventions = new HashMap<>();
/**
* <h4>将被干预的‘等待中行动’</h4>
* key: 干预倾向
* <br/>
* value: 干预倾向将作用的行动数据
*/
private Map<String, ActionData> preparedInterventions = new HashMap<>();
}

View File

@@ -1,46 +1,340 @@
package work.slhaf.partner.module.modules.action.planner; package work.slhaf.partner.module.modules.action.planner;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.jetbrains.annotations.NotNull;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule; import work.slhaf.partner.api.agent.factory.module.annotation.AgentModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule; import work.slhaf.partner.api.agent.factory.module.annotation.InjectModule;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.common.vector.VectorClient;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.*;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustData;
import work.slhaf.partner.core.action.entity.cache.CacheAdjustMetaData;
import work.slhaf.partner.core.cognation.CognationCapability;
import work.slhaf.partner.core.memory.MemoryCapability;
import work.slhaf.partner.core.perceive.PerceiveCapability;
import work.slhaf.partner.module.common.module.PreRunningModule; import work.slhaf.partner.module.common.module.PreRunningModule;
import work.slhaf.partner.module.modules.action.planner.confirmer.ActionConfirmer;
import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerInput;
import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerResult;
import work.slhaf.partner.module.modules.action.planner.evaluator.ActionEvaluator; import work.slhaf.partner.module.modules.action.planner.evaluator.ActionEvaluator;
import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorInput;
import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorResult;
import work.slhaf.partner.module.modules.action.planner.extractor.ActionExtractor; import work.slhaf.partner.module.modules.action.planner.extractor.ActionExtractor;
import work.slhaf.partner.module.modules.action.planner.extractor.entity.ExtractorInput; import work.slhaf.partner.module.modules.action.planner.extractor.entity.ExtractorInput;
import work.slhaf.partner.module.modules.action.planner.extractor.entity.ExtractorResult;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.io.IOException; import java.util.*;
import java.util.HashMap; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* 负责针对本次输入生成基础的行动建议,是否执行由主模型判断。 * 负责针对本次输入生成基础的行动计划,在主模型传达意愿后,执行行动或者放入计划池
*/ */
@AgentModule(name = "task_planner",order = 2) @Slf4j
@AgentModule(name = "action_planner", order = 2)
public class ActionPlanner extends PreRunningModule { public class ActionPlanner extends PreRunningModule {
@InjectCapability
private CognationCapability cognationCapability;
@InjectCapability
private ActionCapability actionCapability;
@InjectCapability
private PerceiveCapability perceiveCapability;
@InjectCapability
private MemoryCapability memoryCapability;
@InjectModule @InjectModule
private ActionEvaluator actionEvaluator; private ActionEvaluator actionEvaluator;
@InjectModule @InjectModule
private ActionExtractor actionExtractor; private ActionExtractor actionExtractor;
@InjectModule
private ActionConfirmer actionConfirmer;
@Override private ExecutorService executor;
protected void doExecute(PartnerRunningFlowContext context) throws IOException, ClassNotFoundException {
ExtractorInput extractorInput = getExtractorInput(context);
}
private ExtractorInput getExtractorInput(PartnerRunningFlowContext context) { private final ActionAssemblyHelper assemblyHelper = new ActionAssemblyHelper();
ExtractorInput input = new ExtractorInput();
input.setInput(context.getInput()); @Init
input.setRecentMessages(); public void init() {
return input; executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
} }
@Override @Override
protected HashMap<String, String> getPromptDataMap(String userId) { protected void doExecute(PartnerRunningFlowContext context) {
try {
List<Callable<Void>> tasks = new ArrayList<>();
addConfirmTask(tasks, context);
addNewActionTask(tasks, context);
executor.invokeAll(tasks);
} catch (Exception e) {
log.error("执行异常", e);
}
}
/**
* 新的提取与评估任务
*
* @param tasks 并发任务列表
* @param context 流程上下文
*/
private void addNewActionTask(List<Callable<Void>> tasks, PartnerRunningFlowContext context) {
tasks.add(() -> {
ExtractorInput extractorInput = assemblyHelper.buildExtractorInput(context);
ExtractorResult extractorResult = actionExtractor.execute(extractorInput);
if (extractorResult.getTendencies().isEmpty()) {
return null; return null;
} }
EvaluatorInput evaluatorInput = assemblyHelper.buildEvaluatorInput(extractorResult, context.getUserId());
List<EvaluatorResult> evaluatorResults = actionEvaluator.execute(evaluatorInput); // 并发操作均为访问
putActionData(evaluatorResults, context);
updateTendencyCache(evaluatorResults, context.getInput(), extractorResult);
return null;
});
}
private void updateTendencyCache(List<EvaluatorResult> evaluatorResults, String input,
ExtractorResult extractorResult) {
if (!VectorClient.status) {
return;
}
executor.execute(() -> {
CacheAdjustData data = new CacheAdjustData();
List<CacheAdjustMetaData> list = new ArrayList<>();
List<String> hitTendencies = extractorResult.getTendencies();
for (EvaluatorResult result : evaluatorResults) {
CacheAdjustMetaData metaData = new CacheAdjustMetaData();
metaData.setTendency(result.getTendency());
metaData.setPassed(result.isOk());
metaData.setHit(hitTendencies.contains(result.getTendency()));
list.add(metaData);
}
data.setMetaDataList(list);
data.setInput(input);
actionCapability.updateTendencyCache(data);
});
}
/**
* 待确认行动的判断任务
*
* @param tasks 并发任务列表
* @param context 流程上下文
*/
private void addConfirmTask(List<Callable<Void>> tasks, PartnerRunningFlowContext context) {
tasks.add(() -> {
ConfirmerInput confirmerInput = assemblyHelper.buildConfirmerInput(context);
ConfirmerResult result = actionConfirmer.execute(confirmerInput);
setupConfirmedActionInfo(context, result);
return null;
});
}
private void setupConfirmedActionInfo(PartnerRunningFlowContext context, ConfirmerResult result) {
// TODO 需考虑未确认任务的失效或者拒绝时机在action core中实现
List<String> uuids = result.getUuids();
if (uuids == null) {
return;
}
List<ActionData> pendingActions = actionCapability.popPendingAction(context.getUserId());
for (ActionData actionData : pendingActions) {
if (uuids.contains(actionData.getUuid())) {
actionCapability.putAction(actionData);
}
}
}
private void putActionData(List<EvaluatorResult> evaluatorResults, PartnerRunningFlowContext context) {
for (EvaluatorResult evaluatorResult : evaluatorResults) {
ActionData actionData = assemblyHelper.buildActionData(evaluatorResult, context.getUserId());
if (evaluatorResult.isNeedConfirm()) {
actionCapability.putPendingActions(context.getUserId(), actionData);
} else {
actionCapability.putAction(actionData);
}
}
}
@Override
protected Map<String, String> getPromptDataMap(PartnerRunningFlowContext context) {
HashMap<String, String> map = new HashMap<>();
String userId = context.getUserId();
setupPendingActions(map, userId);
setupPreparedActions(map, userId);
return map;
}
private void setupPendingActions(HashMap<String, String> map, String userId) {
List<ActionData> actionData = actionCapability.listPendingAction(userId);
if (actionData == null || actionData.isEmpty()) {
map.put("[待确认行动] <等待用户确认的行动信息>", "无待确认行动");
return;
}
for (int i = 0; i < actionData.size(); i++) {
map.put("[待确认行动 " + (i + 1) + " ] <等待用户确认的行动信息>", generateActionStr(actionData.get(i)));
}
}
private void setupPreparedActions(HashMap<String, String> map, String userId) {
val preparedActions = actionCapability.listActions(ActionData.ActionStatus.PREPARE, userId).stream().toList();
if (preparedActions.isEmpty()) {
map.put("[预备行动] <预备执行或放入计划池的行动信息>", "无预备行动");
return;
}
for (int i = 0; i < preparedActions.size(); i++) {
map.put("[预备行动 " + (i + 1) + " ] <预备执行或放入计划池的行动信息>", generateActionStr(preparedActions.get(i)));
}
}
private String generateActionStr(ActionData actionData) {
return "<行动倾向>" + " : " + actionData.getTendency() +
"<行动原因>" + " : " + actionData.getReason() +
"<工具描述>" + " : " + actionData.getDescription();
}
@Override @Override
protected String moduleName() { protected String moduleName() {
return "task_planner"; return "[行动模块]";
}
private final class ActionAssemblyHelper {
private ActionAssemblyHelper() {
}
private ExtractorInput buildExtractorInput(PartnerRunningFlowContext context) {
ExtractorInput input = new ExtractorInput();
input.setInput(context.getInput());
List<Message> chatMessages = cognationCapability.getChatMessages();
List<Message> recentMessages = new ArrayList<>();
if (chatMessages.size() > 5) {
recentMessages.addAll(chatMessages.subList(chatMessages.size() - 5, chatMessages.size() - 1));
} else if (chatMessages.size() > 1) {
recentMessages.addAll(chatMessages.subList(0, chatMessages.size() - 1));
}
input.setRecentMessages(recentMessages);
return input;
}
private EvaluatorInput buildEvaluatorInput(ExtractorResult extractorResult, String userId) {
EvaluatorInput input = new EvaluatorInput();
input.setTendencies(extractorResult.getTendencies());
input.setUser(perceiveCapability.getUser(userId));
input.setRecentMessages(cognationCapability.getChatMessages());
input.setActivatedSlices(memoryCapability.getActivatedSlices(userId));
return input;
}
private ActionData buildActionData(EvaluatorResult evaluatorResult, String userId) {
Map<Integer, List<MetaAction>> actionChain = getActionChain(evaluatorResult);
return switch (evaluatorResult.getType()) {
case PLANNING -> new ScheduledActionData(
evaluatorResult.getTendency(),
actionChain,
evaluatorResult.getReason(),
evaluatorResult.getDescription(),
userId,
evaluatorResult.getScheduleType(),
evaluatorResult.getScheduleContent()
);
case IMMEDIATE -> new ImmediateActionData(
evaluatorResult.getTendency(),
actionChain,
evaluatorResult.getReason(),
evaluatorResult.getDescription(),
userId
);
};
}
private @NotNull Map<Integer, List<MetaAction>> getActionChain(EvaluatorResult evaluatorResult) {
Map<Integer, List<MetaAction>> actionChain = new HashMap<>();
Map<Integer, List<String>> primaryActionChain = evaluatorResult.getPrimaryActionChain();
fixDependencies(primaryActionChain);
primaryActionChain.forEach((order, actionKeys) -> {
List<MetaAction> metaActions = actionKeys.stream()
.map(actionKey -> actionCapability.loadMetaAction(actionKey))
.toList();
actionChain.put(order, metaActions);
});
return actionChain;
}
private void fixDependencies(Map<Integer, List<String>> primaryActionChain) {
// 先将 primaryActionChain 的节点序号修正为从1开始依次增大
fixOrder(primaryActionChain);
List<Integer> fixedOrders = new ArrayList<>(primaryActionChain.keySet().stream().toList());
AtomicBoolean fixed = new AtomicBoolean(false);
do {
Set<Integer> tempOrders = new HashSet<>();
fixedOrders.sort(Integer::compareTo);
for (Integer fixedOrder : fixedOrders) {
int lastOrder = fixedOrder - 1;
List<String> actionKeys = primaryActionChain.get(fixedOrder);
for (String actionKey : actionKeys) {
// 根据 actionKey 加载行动信息,并检查是否存在必需前置依赖
MetaActionInfo metaActionInfo = actionCapability.loadMetaActionInfo(actionKey);
List<String> preActions = metaActionInfo.getPreActions();
boolean preActionsExist = preActions != null && !preActions.isEmpty();
if (!preActionsExist) {
continue;
}
if (!metaActionInfo.isStrictDependencies()) {
continue;
}
if (checkDependenciesExist(lastOrder, preActions, primaryActionChain)) {
continue;
}
// 如果存在前置依赖,则将其放置在当前order之前的位置,
// 放置位置优先选择已存在的上一节点,如果不存在(行动链的头节点时)则需要向行动链新增order
// 不需要检查行动链的当前节点的已存在 Action 是否为新 Action 的依赖项,因为这些 Action 实际来自 LLM
// 的评估结果,并非作为依赖项存在
fixed.set(true);
List<String> actionsInChain = primaryActionChain.computeIfAbsent(lastOrder,
list -> new ArrayList<>());
preActions = new ArrayList<>(preActions);
preActions.removeAll(actionsInChain);
actionsInChain.addAll(preActions);
tempOrders.add(lastOrder);
}
}
fixedOrders.clear();
fixedOrders.addAll(tempOrders);
} while (fixed.getAndSet(false));
}
private void fixOrder(Map<Integer, List<String>> primaryActionChain) {
Map<Integer, List<String>> tempChain = new HashMap<>(primaryActionChain);
primaryActionChain.clear();
int chainSize = tempChain.size();
for (int i = 0; i < chainSize; i++) {
primaryActionChain.put(i, tempChain.get(i));
}
}
private boolean checkDependenciesExist(int lastOrder, List<String> preActions,
Map<Integer, List<String>> primaryActionChain) {
if (!primaryActionChain.containsKey(lastOrder)) {
return false;
}
List<String> existActions = primaryActionChain.get(lastOrder);
//noinspection SlowListContainsAll
return existActions.containsAll(preActions);
}
private ConfirmerInput buildConfirmerInput(PartnerRunningFlowContext context) {
ConfirmerInput confirmerInput = new ConfirmerInput();
confirmerInput.setInput(context.getInput());
List<ActionData> pendingActions = actionCapability.listPendingAction(context.getUserId());
confirmerInput.setActionData(pendingActions);
return confirmerInput;
}
} }
} }

View File

@@ -0,0 +1,90 @@
package work.slhaf.partner.module.modules.action.planner.confirmer;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.action.ActionCore;
import work.slhaf.partner.core.action.entity.ActionData;
import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerInput;
import work.slhaf.partner.module.modules.action.planner.confirmer.entity.ConfirmerResult;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import static work.slhaf.partner.common.util.ExtractUtil.extractJson;
@Slf4j
@AgentSubModule
public class ActionConfirmer extends AgentRunningSubModule<ConfirmerInput, ConfirmerResult> implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
@Override
public ConfirmerResult execute(ConfirmerInput data) {
List<ActionData> actionDataList = data.getActionData();
ExecutorService executor = actionCapability.getExecutor(ActionCore.ExecutorType.VIRTUAL);
CountDownLatch latch = new CountDownLatch(actionDataList.size());
ConfirmerResult result = new ConfirmerResult();
List<String> uuids = result.getUuids();
for (ActionData actionData : actionDataList) {
executor.execute(() -> {
try {
String prompt = buildPrompt(actionData, data.getInput(), data.getRecentMessages());
ChatResponse response = this.singleChat(prompt);
JSONObject tempResult = JSONObject.parseObject(extractJson(response.getMessage()));
if (tempResult.getBoolean("confirmed")) {
actionData.setStatus(ActionData.ActionStatus.PREPARE);
synchronized (uuids) {
uuids.add(actionData.getUuid());
}
}
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
log.warn("CountDownLatch阻塞已中断");
}
return result;
}
private String buildPrompt(ActionData data, String input, List<Message> recentMessages) {
JSONObject prompt = new JSONObject();
prompt.put("[用户输入]", input);
JSONObject actionData = prompt.putObject("[行动数据]");
actionData.put("[行动倾向]", data.getTendency());
actionData.put("[行动原因]", data.getReason());
actionData.put("[行动来源]", data.getSource());
actionData.put("[行动描述]", data.getDescription());
JSONArray messageData = prompt.putArray("[近期对话]");
messageData.addAll(recentMessages);
return prompt.toString();
}
@Override
public String modelKey() {
return "action-confirmer";
}
@Override
public boolean withBasicPrompt() {
return false;
}
}

View File

@@ -0,0 +1,14 @@
package work.slhaf.partner.module.modules.action.planner.confirmer.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.action.entity.ActionData;
import java.util.List;
@Data
public class ConfirmerInput {
private String input;
private List<ActionData> actionData;
private List<Message> recentMessages;
}

View File

@@ -0,0 +1,11 @@
package work.slhaf.partner.module.modules.action.planner.confirmer.entity;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data
public class ConfirmerResult {
private List<String> uuids = new ArrayList<>();
}

View File

@@ -1,18 +1,96 @@
package work.slhaf.partner.module.modules.action.planner.evaluator; package work.slhaf.partner.module.modules.action.planner.evaluator;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.factory.module.annotation.Init;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.common.thread.InteractionThreadPoolExecutor;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorBatchInput;
import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorInput; import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorInput;
import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorResult; import work.slhaf.partner.module.modules.action.planner.evaluator.entity.EvaluatorResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@AgentSubModule @AgentSubModule
public class ActionEvaluator extends AgentRunningSubModule<EvaluatorInput, EvaluatorResult> implements ActivateModel { public class ActionEvaluator extends AgentRunningSubModule<EvaluatorInput, List<EvaluatorResult>> implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
private InteractionThreadPoolExecutor executor;
@Init
public void init() {
executor = InteractionThreadPoolExecutor.getInstance();
}
/**
* 对输入的行为倾向进行评估,并根据评估结果,对缓存做出调整
*
* @param data 评估输入内容,包含提取/命中缓存的行动倾向、近几条聊天记录,正在生效的记忆切片内容
* @return 评估结果集合
*/
@Override @Override
public EvaluatorResult execute(EvaluatorInput data) { public List<EvaluatorResult> execute(EvaluatorInput data) {
List<EvaluatorBatchInput> batchInputs = buildEvaluatorBatchInput(data);
List<Callable<EvaluatorResult>> tasks = getTasks(batchInputs);
return executor.invokeAllAndReturn(tasks);
}
return null; private List<Callable<EvaluatorResult>> getTasks(List<EvaluatorBatchInput> batchInputs) {
List<Callable<EvaluatorResult>> list = new ArrayList<>();
for (EvaluatorBatchInput batchInput : batchInputs) {
list.add(() -> {
ChatResponse response = this.singleChat(buildPrompt(batchInput));
EvaluatorResult evaluatorResult = JSONObject.parseObject(response.getMessage(), EvaluatorResult.class);
evaluatorResult.setTendency(batchInput.getTendency());
return evaluatorResult;
});
}
return list;
}
private List<EvaluatorBatchInput> buildEvaluatorBatchInput(EvaluatorInput data) {
List<EvaluatorBatchInput> list = new ArrayList<>();
for (String tendency : data.getTendencies()) {
EvaluatorBatchInput temp = new EvaluatorBatchInput();
BeanUtil.copyProperties(data, temp);
temp.setTendency(tendency);
Map<String, String> availableActions = new HashMap<>();
actionCapability.listAvailableMetaActions().forEach((key, info) -> availableActions.put(key, info.getDescription()));
temp.setAvailableActions(availableActions);
list.add(temp);
}
return list;
}
private String buildPrompt(EvaluatorBatchInput batchInput) {
JSONObject prompt = new JSONObject();
prompt.put("[行动倾向]", batchInput.getTendency());
JSONArray memoryData = prompt.putArray("[相关记忆切片]");
for (EvaluatedSlice evaluatedSlice : batchInput.getActivatedSlices()) {
JSONObject memory = memoryData.addObject();
memory.put("[日期]", evaluatedSlice.getDate());
memory.put("[摘要]", evaluatedSlice.getSummary());
}
JSONObject availableActionData = prompt.putObject("[可用行动单元]");
availableActionData.putAll(batchInput.getAvailableActions());
return prompt.toString();
} }
@Override @Override

View File

@@ -0,0 +1,16 @@
package work.slhaf.partner.module.modules.action.planner.evaluator.entity;
import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import java.util.List;
import java.util.Map;
@Data
public class EvaluatorBatchInput {
private List<Message> recentMessages;
private List<EvaluatedSlice> activatedSlices;
private Map<String, String> availableActions;
private String tendency;
}

View File

@@ -1,11 +1,16 @@
package work.slhaf.partner.module.modules.action.planner.evaluator.entity; package work.slhaf.partner.module.modules.action.planner.evaluator.entity;
import lombok.Data; import lombok.Data;
import work.slhaf.partner.api.chat.pojo.Message;
import work.slhaf.partner.core.memory.pojo.EvaluatedSlice;
import work.slhaf.partner.core.perceive.pojo.User;
import java.util.List; import java.util.List;
@Data @Data
public class EvaluatorInput { public class EvaluatorInput {
private List<String> recentMessages; private List<Message> recentMessages;
private String tendency; private User user;
private List<EvaluatedSlice> activatedSlices;
private List<String> tendencies;
} }

View File

@@ -1,13 +1,24 @@
package work.slhaf.partner.module.modules.action.planner.evaluator.entity; package work.slhaf.partner.module.modules.action.planner.evaluator.entity;
import lombok.Data; import lombok.Data;
import work.slhaf.partner.core.action.entity.ActionData; import work.slhaf.partner.core.action.entity.ScheduledActionData;
import work.slhaf.partner.core.action.entity.ActionType;
import java.util.List;
import java.util.Map;
@Data @Data
public class EvaluatorResult { public class EvaluatorResult {
private boolean ok; private boolean ok;
private boolean needConfirm;
private ActionType type; private ActionType type;
private String typeInfo; private String scheduleContent;
private ActionData actionData; private ScheduledActionData.ScheduleType scheduleType;
private Map<Integer, List<String>> primaryActionChain;
private String tendency;
private String reason;
private String description;
public enum ActionType {
IMMEDIATE, PLANNING
}
} }

View File

@@ -1,18 +1,44 @@
package work.slhaf.partner.module.modules.action.planner.extractor; package work.slhaf.partner.module.modules.action.planner.extractor;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import work.slhaf.partner.api.agent.factory.capability.annotation.InjectCapability;
import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule; import work.slhaf.partner.api.agent.factory.module.annotation.AgentSubModule;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.ActivateModel;
import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule; import work.slhaf.partner.api.agent.runtime.interaction.flow.abstracts.AgentRunningSubModule;
import work.slhaf.partner.api.chat.pojo.ChatResponse;
import work.slhaf.partner.core.action.ActionCapability;
import work.slhaf.partner.module.modules.action.planner.extractor.entity.ExtractorInput; import work.slhaf.partner.module.modules.action.planner.extractor.entity.ExtractorInput;
import work.slhaf.partner.module.modules.action.planner.extractor.entity.ExtractorResult; import work.slhaf.partner.module.modules.action.planner.extractor.entity.ExtractorResult;
import java.util.List;
@Slf4j
@AgentSubModule @AgentSubModule
public class ActionExtractor extends AgentRunningSubModule<ExtractorInput, ExtractorResult> implements ActivateModel { public class ActionExtractor extends AgentRunningSubModule<ExtractorInput, ExtractorResult> implements ActivateModel {
@InjectCapability
private ActionCapability actionCapability;
@Override @Override
public ExtractorResult execute(ExtractorInput data) { public ExtractorResult execute(ExtractorInput data) {
ExtractorResult result = new ExtractorResult();
List<String> tendencyCache = actionCapability.selectTendencyCache(data.getInput());
if (tendencyCache != null && !tendencyCache.isEmpty()) {
result.setTendencies(tendencyCache);
return result;
}
return null; for (int i = 0; i < 3; i++) {
try {
ChatResponse response = this.singleChat(JSONObject.toJSONString(data));
return JSONObject.parseObject(response.getMessage(), ExtractorResult.class);
} catch (Exception e) {
log.error("[ActionExtractor] 提取信息出错", e);
}
}
return new ExtractorResult();
} }
@Override @Override

View File

@@ -2,8 +2,10 @@ package work.slhaf.partner.module.modules.action.planner.extractor.entity;
import lombok.Data; import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data @Data
public class ExtractorResult { public class ExtractorResult {
private boolean action; private List<String> tendencies = new ArrayList<>();
private String tendency;
} }

View File

@@ -1,4 +0,0 @@
package work.slhaf.partner.module.modules.action.scheduler;
public class ActionDispatcher {
}

View File

@@ -1,7 +0,0 @@
package work.slhaf.partner.module.modules.action.scheduler;
/**
* 负责综合前置模块
*/
public class TaskScheduler {
}

View File

@@ -22,12 +22,8 @@ import work.slhaf.partner.module.modules.memory.selector.extractor.entity.Extrac
import work.slhaf.partner.module.modules.memory.selector.extractor.entity.ExtractorResult; import work.slhaf.partner.module.modules.memory.selector.extractor.entity.ExtractorResult;
import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext; import work.slhaf.partner.runtime.interaction.data.context.PartnerRunningFlowContext;
import java.io.IOException;
import java.time.LocalDate; import java.time.LocalDate;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Data @Data
@@ -46,7 +42,7 @@ public class MemorySelector extends PreRunningModule {
private MemorySelectExtractor memorySelectExtractor; private MemorySelectExtractor memorySelectExtractor;
@Override @Override
public void doExecute(PartnerRunningFlowContext runningFlowContext) throws IOException, ClassNotFoundException { public void doExecute(PartnerRunningFlowContext runningFlowContext) {
String userId = runningFlowContext.getUserId(); String userId = runningFlowContext.getUserId();
//获取主题路径 //获取主题路径
ExtractorResult extractorResult = memorySelectExtractor.execute(runningFlowContext); ExtractorResult extractorResult = memorySelectExtractor.execute(runningFlowContext);
@@ -58,7 +54,7 @@ public class MemorySelector extends PreRunningModule {
setModuleContextRecall(runningFlowContext); setModuleContextRecall(runningFlowContext);
} }
private List<EvaluatedSlice> selectAndEvaluateMemory(PartnerRunningFlowContext runningFlowContext, ExtractorResult extractorResult) throws IOException, ClassNotFoundException { private List<EvaluatedSlice> selectAndEvaluateMemory(PartnerRunningFlowContext runningFlowContext, ExtractorResult extractorResult) {
log.debug("[MemorySelector] 触发记忆回溯..."); log.debug("[MemorySelector] 触发记忆回溯...");
//查找切片 //查找切片
String userId = runningFlowContext.getUserId(); String userId = runningFlowContext.getUserId();
@@ -86,7 +82,7 @@ public class MemorySelector extends PreRunningModule {
} }
private void setMemoryResultList(List<MemoryResult> memoryResultList, List<ExtractorMatchData> matches, String userId) throws IOException, ClassNotFoundException { private void setMemoryResultList(List<MemoryResult> memoryResultList, List<ExtractorMatchData> matches, String userId) {
for (ExtractorMatchData match : matches) { for (ExtractorMatchData match : matches) {
try { try {
MemoryResult memoryResult = switch (match.getType()) { MemoryResult memoryResult = switch (match.getType()) {
@@ -133,8 +129,10 @@ public class MemorySelector extends PreRunningModule {
return "[记忆模块]"; return "[记忆模块]";
} }
protected HashMap<String, String> getPromptDataMap(String userId) { @Override
protected Map<String, String> getPromptDataMap(PartnerRunningFlowContext context) {
HashMap<String, String> map = new HashMap<>(); HashMap<String, String> map = new HashMap<>();
String userId = context.getUserId();
String dialogMapStr = memoryCapability.getDialogMapStr(); String dialogMapStr = memoryCapability.getDialogMapStr();
if (!dialogMapStr.isEmpty()) { if (!dialogMapStr.isEmpty()) {
map.put("[记忆缓存] <你最近两日和所有聊天者的对话记忆印象>", dialogMapStr); map.put("[记忆缓存] <你最近两日和所有聊天者的对话记忆印象>", dialogMapStr);
@@ -147,7 +145,7 @@ public class MemorySelector extends PreRunningModule {
String sliceStr = memoryCapability.getActivatedSlicesStr(userId); String sliceStr = memoryCapability.getActivatedSlicesStr(userId);
if (sliceStr != null && !sliceStr.isEmpty()) { if (sliceStr != null && !sliceStr.isEmpty()) {
map.put("[记忆切片] <你与最新一条消息的发送者的相关回忆, 不会与[记忆缓存]重复, 如果有重复你也可以指出来()>", sliceStr); map.put("[记忆切片] <你与最新一条消息的发送者的相关回忆, 不会与[记忆缓存]重复, 如果有重复你也可以指出来>", sliceStr);
} }
return map; return map;
} }

View File

@@ -119,6 +119,11 @@ public class MemoryUpdater extends PostRunningModule {
}); });
} }
@Override
protected boolean relyOnMessage() {
return true;
}
private void updateMemory() { private void updateMemory() {
log.debug("[MemoryUpdater] 记忆更新流程开始..."); log.debug("[MemoryUpdater] 记忆更新流程开始...");
tempMessage = new ArrayList<>(cognationCapability.getChatMessages()); tempMessage = new ArrayList<>(cognationCapability.getChatMessages());

Some files were not shown because too many files have changed in this diff Show More