Files
Partner/doc/architecture/runtime-flow.md

99 lines
3.5 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Runtime 输入与模块调度
本文说明外部输入如何经由 Gateway 进入 `AgentRuntime`,以及 `AgentRuntime` 如何按 `source` 聚合输入、去重、debounce 并调度 Running module。
## Gateway 与输入提交
Gateway 负责把外部输入转换成 `RunningFlowContext`,再提交给 `AgentRuntime``AgentRuntime` 不直接感知 WebSocket 等具体输入协议。
```mermaid
flowchart TD
A["外部输入\nWebSocket / 其他 Gateway"] --> B["AgentGateway.receive(inputData)"]
B --> C["parseRunningFlowContext(inputData)"]
C --> D["AgentRuntime.submit(context)"]
D --> E["按 context.source 聚合输入"]
E --> E1["latestContextsBySource[source]\n旧 context 与新 context mergedWith"]
E --> E2["sourceVersions[source] + 1"]
E --> E3["sourceQueue 加入 source\n队列内 source 不重复"]
E3 --> F{"当前是否正在执行同一 source?"}
F -->|是| G["currentExecutingContext.status.interrupted = true"]
F -->|否| H["发送 wakeSignal"]
G --> H
H --> I["AgentRuntime 后台协程消费 wakeSignal"]
I --> J["drainQueue()"]
J --> K["executeSource(source)"]
```
## 模块运行时
`AgentRuntime` 采用按 `source` 聚合的异步执行模型。同一个 `source` 的连续输入会合并为最新的 `RunningFlowContext`;如果新输入到达时该 `source` 正在执行,当前上下文会被标记为 interrupted当前轮结束后 runtime 会从 Running module 链头重新执行最新合并后的上下文。
```mermaid
flowchart TD
A["AgentRuntime 后台协程"] --> B["收到 wakeSignal"]
B --> C["drainQueue()"]
C --> D["取 sourceQueue.firstOrNull()"]
D --> E{"是否存在 source?"}
E -->|否| Z["返回,等待下一次 wakeSignal"]
E -->|是| F["executeSource(source)"]
F --> G["awaitDebouncedExecution(source)"]
G --> G1{"debounceWindow > 0?"}
G1 -->|是| G2["等待 debounceWindow"]
G2 --> G3{"sourceVersions 是否变化?"}
G3 -->|变化| G2
G3 -->|稳定| H["构造 SourceExecution(context, version)"]
G1 -->|否| H
H --> I["标记 currentExecutingSource/context"]
I --> J["executeTurn(context)"]
J --> K{"runningModules 是否为空?"}
K -->|是| L["refreshRunningModules()"]
K -->|否| M["按 order 遍历 Running modules"]
L --> M
M --> N["过滤 maskedModules"]
N --> O["按 order 分组并排序"]
O --> P["同一 order 内 coroutineScope + async 并发执行"]
P --> Q["module.execute(runningFlowContext)"]
Q --> R{"context.status.interrupted?"}
R -->|是| S["本轮中断"]
R -->|否| T["本轮完成"]
S --> U["检查 latest version"]
T --> U
U --> V{"执行期间是否有新输入或版本变化?"}
V -->|是| W["保留 source\n从链头重新执行最新 merged context"]
W --> G
V -->|否| X["清理 latestContextsBySource/sourceQueue/sourceVersions"]
X --> Z
```
Running module 的执行顺序由 `module.order()` 决定。order 较小的组先执行,同一 order 内的模块并发执行。
```mermaid
flowchart LR
A["Running modules from AgentContext.modules"] --> B["过滤 maskedModules"]
B --> C["按 module.order() 分组"]
C --> D["order 小的组先执行"]
D --> E1["order = 10"]
D --> E2["order = 20"]
D --> E3["order = 30"]
E1 --> F1["同组模块并发执行"]
F1 --> E2
E2 --> F2["同组模块并发执行"]
F2 --> E3
E3 --> F3["同组模块并发执行"]
F3 --> G["所有 order 执行完成"]
```