Files
Partner/doc/architecture/runtime-flow.md

3.5 KiB
Raw Blame History

Runtime 输入与模块调度

本文说明外部输入如何经由 Gateway 进入 AgentRuntime,以及 AgentRuntime 如何按 source 聚合输入、去重、debounce 并调度 Running module。

Gateway 与输入提交

Gateway 负责把外部输入转换成 RunningFlowContext,再提交给 AgentRuntimeAgentRuntime 不直接感知 WebSocket 等具体输入协议。

flowchart TD
    A["外部输入\nWebSocket / 其他 Gateway"] --> B["AgentGateway.receive(inputData)"]
    B --> C["parseRunningFlowContext(inputData)"]
    C --> D["AgentRuntime.submit(context)"]

    D --> E["按 context.source 聚合输入"]
    E --> E1["latestContextsBySource[source]\n旧 context 与新 context mergedWith"]
    E --> E2["sourceVersions[source] + 1"]
    E --> E3["sourceQueue 加入 source\n队列内 source 不重复"]

    E3 --> F{"当前是否正在执行同一 source?"}
    F -->|是| G["currentExecutingContext.status.interrupted = true"]
    F -->|否| H["发送 wakeSignal"]

    G --> H
    H --> I["AgentRuntime 后台协程消费 wakeSignal"]
    I --> J["drainQueue()"]
    J --> K["executeSource(source)"]

模块运行时

AgentRuntime 采用按 source 聚合的异步执行模型。同一个 source 的连续输入会合并为最新的 RunningFlowContext;如果新输入到达时该 source 正在执行,当前上下文会被标记为 interrupted当前轮结束后 runtime 会从 Running module 链头重新执行最新合并后的上下文。

flowchart TD
    A["AgentRuntime 后台协程"] --> B["收到 wakeSignal"]
    B --> C["drainQueue()"]
    C --> D["取 sourceQueue.firstOrNull()"]
    D --> E{"是否存在 source?"}
    E -->|否| Z["返回,等待下一次 wakeSignal"]
    E -->|是| F["executeSource(source)"]

    F --> G["awaitDebouncedExecution(source)"]
    G --> G1{"debounceWindow > 0?"}
    G1 -->|是| G2["等待 debounceWindow"]
    G2 --> G3{"sourceVersions 是否变化?"}
    G3 -->|变化| G2
    G3 -->|稳定| H["构造 SourceExecution(context, version)"]
    G1 -->|否| H

    H --> I["标记 currentExecutingSource/context"]
    I --> J["executeTurn(context)"]

    J --> K{"runningModules 是否为空?"}
    K -->|是| L["refreshRunningModules()"]
    K -->|否| M["按 order 遍历 Running modules"]
    L --> M

    M --> N["过滤 maskedModules"]
    N --> O["按 order 分组并排序"]
    O --> P["同一 order 内 coroutineScope + async 并发执行"]
    P --> Q["module.execute(runningFlowContext)"]
    Q --> R{"context.status.interrupted?"}

    R -->|是| S["本轮中断"]
    R -->|否| T["本轮完成"]

    S --> U["检查 latest version"]
    T --> U

    U --> V{"执行期间是否有新输入或版本变化?"}
    V -->|是| W["保留 source\n从链头重新执行最新 merged context"]
    W --> G

    V -->|否| X["清理 latestContextsBySource/sourceQueue/sourceVersions"]
    X --> Z

Running module 的执行顺序由 module.order() 决定。order 较小的组先执行,同一 order 内的模块并发执行。

flowchart LR
    A["Running modules from AgentContext.modules"] --> B["过滤 maskedModules"]
    B --> C["按 module.order() 分组"]
    C --> D["order 小的组先执行"]

    D --> E1["order = 10"]
    D --> E2["order = 20"]
    D --> E3["order = 30"]

    E1 --> F1["同组模块并发执行"]
    F1 --> E2
    E2 --> F2["同组模块并发执行"]
    F2 --> E3
    E3 --> F3["同组模块并发执行"]

    F3 --> G["所有 order 执行完成"]