cap: Documentation
Browse Sign In

Integration Recipes

End-to-end patterns for requests, DAG execution, and the orchestrator pipeline

End-to-End Runtime Request

A single capability invocation follows this sequence:

sequenceDiagram
    participant C as Client
    participant PH as CartridgeHostRuntime
    participant P as Cartridge

    C->>PH: REQ (cap URN)
    PH->>P: Forward REQ

    C->>PH: STREAM_START (arg stream)
    PH->>P: Forward
    C->>PH: CHUNK (data)
    PH->>P: Forward
    C->>PH: STREAM_END
    PH->>P: Forward
    C->>PH: END
    PH->>P: Forward

    P->>PH: STREAM_START (result stream)
    PH->>C: Forward
    P->>PH: CHUNK (result data)
    PH->>C: Forward
    P->>PH: STREAM_END
    PH->>C: Forward
    P->>PH: END
    PH->>C: Forward

  1. Handshake — host and cartridge exchange HELLO frames, negotiate limits (max_frame, max_chunk).
  2. REQ — client sends a REQ with the cap URN to invoke.
  3. Input streams — arguments arrive as STREAM_START / CHUNK / STREAM_END sequences. Each argument is a separate stream with its own media URN.
  4. Output streams — cartridge returns result data as streams, followed by END.
  5. Failure — invalid frames or routing mismatches produce ERR, terminating the request flow. ERR and END are mutually exclusive — a request ends with exactly one of them.

Orchestrator Pipeline

The orchestrator bridges textual DAG descriptions and the execution engine. It takes machine notation (or a MachinePlan) and produces a ResolvedGraph — a validated DAG ready for execution.

flowchart LR
    MN["Machine Notation<br/>or MachinePlan"] --> PARSE["Parse"]
    PARSE --> RESOLVE["Resolve<br/>cap URNs"]
    RESOLVE --> VALIDATE["Validate<br/>media compat"]
    VALIDATE --> CYCLE["Check<br/>for cycles"]
    CYCLE --> RG["ResolvedGraph"]

Machine Notation

Machine notation is a compact text format for describing transformation pipelines. Step declarations bind an alias to a cap URN:

[extract cap:in="media:pdf";out="media:record";op=extract;target=metadata]

Wiring declares data flow between steps:

[media:pdf -> extract -> media:record]

A linear chain:

[media:pdf -> extract -> media:text -> summarize -> media:text]
graph LR
    A["media:pdf"] -->|extract| B["media:text"]
    B -->|summarize| C["media:text"]

Fan-in (multiple sources into one step):

[media:image -> describe -> media:text]
[media:text -> describe]

Parsing Pipeline

parse_machine_to_cap_dag() converts machine notation into a ResolvedGraph:

  1. Parse notation into a Machine graph (nodes as media URN endpoints, edges as cap invocations).
  2. Resolve cap URNs — look up each cap URN in the CapRegistry for the full Cap definition.
  3. Validate media compatibility — check that each node’s media URN is consistent across all connecting edges.
  4. Check for cycles — verify the graph is a DAG.
  5. Return the ResolvedGraph.

DAG Execution

execute_dag() takes a ResolvedGraph and runs it:

flowchart TD
    RG["ResolvedGraph"] --> DISC["1. Discover cartridges"]
    DISC --> INFRA["2. Create infrastructure<br/>(HostRuntime, Relay, Switch)"]
    INFRA --> REG["3. Register all caps"]
    REG --> GRP["4. Group edges by<br/>(target, cap_urn)"]
    GRP --> TOPO["5. Topological sort"]
    TOPO --> PROG["6. Pre-compute<br/>progress boundaries"]
    PROG --> EXEC["7. Execute each group<br/>via execute_fanin()"]
    EXEC --> RES["8. Return results"]
  1. Discover cartridges — collect all unique cap URNs from edges, find cartridge binaries that provide them.
  2. Create infrastructure — set up CartridgeHostRuntime, RelaySlave/RelayMaster pair, and RelaySwitch.
  3. Register all caps — register ALL manifest caps from each cartridge (not just DAG-referenced ones, because cartridges may make peer invocations to other caps).
  4. Group edges — group by (target_node, cap_urn) into EdgeGroups. Each group is one cap invocation, potentially with multiple input streams (fan-in).
  5. Topological sort — order groups so each executes after its inputs are available. Uses Kahn’s algorithm.
  6. Pre-compute progress — calculate progress boundaries for each group.
  7. Execute — run each group via execute_fanin().
  8. ReturnHashMap<node_id, NodeData> with output data from each node.

execute_fanin

The core function for a single cap invocation:

sequenceDiagram
    participant EX as Executor
    participant SW as RelaySwitch
    participant P as Cartridge

    EX->>SW: register_external_request(rid, cap_urn)
    Note over SW: Returns (XID, response_rx)

    loop For each source in EdgeGroup
        EX->>SW: STREAM_START (stream_id, media_urn)
        EX->>SW: CHUNK(s)
        EX->>SW: STREAM_END
    end
    EX->>SW: END

    Note over SW: Routes to cartridge via cap matching
    SW->>P: (forwarded)

    loop select! loop
        alt Response frame
            P-->>EX: CHUNK → accumulate data
            P-->>EX: LOG → forward progress
            P-->>EX: END → done
            P-->>EX: ERR → fail
        else Pump timeout (200ms)
            Note over EX: Route peer frames through switch
        else Activity timeout (120s)
            Note over EX: ExecutionError::ActivityTimeout
        end
    end

The response collection loop uses tokio::select! to concurrently:

  • Receive response frames on the response channel.
  • Pump frames through the RelaySwitch (for peer invocations — without this, a cartridge doing a peer call would deadlock).

Activity Timeout

Default: 120 seconds. Any received frame resets the timer. Caps can override via activity_timeout_secs in their metadata.

While idle, the executor logs a warning every 30 seconds.

Progress Subdivision

Each edge group gets an equal share of the overall [0.0, 1.0] progress range. Group i maps its progress to [i/n, (i+1)/n).

Progress nesting across three layers (DAG → handler → peer call):

graph TD
    subgraph "DAG (2 groups)"
        G0["Group 0<br/>[0.0, 0.5]"]
        G1["Group 1<br/>[0.5, 1.0]"]
    end

    subgraph "Group 0 Handler"
        DL["Download<br/>[0.0, 0.25] of handler"]
        INF["Inference<br/>[0.25, 1.0] of handler"]
    end

    subgraph "Effective Ranges"
        DLE["Download effective<br/>[0.0, 0.125] of task"]
        INFE["Inference effective<br/>[0.125, 0.5] of task"]
    end

    G0 --> DL
    G0 --> INF
    DL --> DLE
    INF --> INFE

Progress flows from cartridge to UI:

sequenceDiagram
    participant P as Cartridge Handler
    participant CI as cap_interpreter
    participant DB as SQLite
    participant UI as UI

    P->>CI: LOG frame<br/>progress(0.5, "msg")
    Note over CI: CapProgressFn called<br/>map_progress(0.5,<br/>step_base, step_weight)
    CI->>DB: UPDATE progress
    UI->>DB: poll
    DB-->>UI: progress value

Planner

The planner builds execution plans by finding paths through a capability graph. Media types are nodes; capabilities are edges:

graph LR
    PDF["media:pdf"] -->|extract_metadata| REC["media:record"]
    PDF -->|generate_thumbnail| IMG["media:image;png"]
    PDF -->|extract_text| TXT["media:text"]
    TXT -->|generate_embeddings| EMB["media:embedding"]
    TXT -->|summarize| TXT2["media:text"]
    IMG -->|describe_image| TXT

The MachinePlanBuilder converts a path (Strand) plus argument specs into a MachinePlan:

flowchart LR
    S["Strand<br/>(path A→B→C)"] --> PB["MachinePlanBuilder"]
    ARGS["Argument specs"] --> PB
    PB --> IS["InputSlot nodes"]
    PB --> CAP["Cap nodes"]
    PB --> OUT["Output nodes"]
    IS --> MP["MachinePlan"]
    CAP --> MP
    OUT --> MP

Slot Value Resolution

Three-level priority for resolving slot values:

flowchart TD
    Q{"slot_values<br/>[step_N:slot_name]?"} -->|Found| V1["Use explicit<br/>per-step value"]
    Q -->|Not found| Q2{"cap_settings<br/>[cap_urn][slot_name]?"}
    Q2 -->|Found| V2["Use per-cap<br/>setting"]
    Q2 -->|Not found| Q3{"default_value<br/>in cap definition?"}
    Q3 -->|Found| V3["Use default"]
    Q3 -->|Not found| V4["No value<br/>(argument omitted)"]

Cartridge Patterns

Model Cartridges

Model cartridges (GGUF, Candle, MLX) follow a three-phase handler pattern:

graph LR
    DL["1. Download<br/>(peer call to<br/>modelcartridge)"] --> LD["2. Load<br/>(blocking FFI via<br/>run_with_keepalive)"]
    LD --> INF["3. Inference<br/>(streaming output<br/>with progress)"]

    DL -.- P1["[0.00 – 0.25]"]
    LD -.- P2["[0.25 – 0.35]"]
    INF -.- P3["[0.35 – 0.95]"]

Content Cartridges

Content cartridges (PDF, text) produce derived artifacts:

graph LR
    PDF["media:pdf"] -->|generate_thumbnail| IMG["media:image;png"]
    PDF -->|extract_metadata| REC["media:record"]
    PDF -->|extract_outline| JSON["media:json"]
    PDF -->|disbind| LIST["media:pdf;list"]

    TXT["media:text"] -->|generate_thumbnail| IMG2["media:image;png"]
    TXT -->|extract_metadata| REC2["media:record"]
    MD["media:text;markdown"] -->|extract_outline| JSON2["media:json"]

Task Integration

Tasks track DAG execution progress through a state machine:

stateDiagram-v2
    [*] --> Created : TriggerFuse event
    Created --> Running : Execute DAG
    Running --> Completed : Success (progress = 1.0)
    Running --> Failed : ExecutionError
    Running --> Cancelled : control_flags.aborted

Graph Execution with Macino

Macino (capdag/macino/) is the integration test harness for graph execution. It validates end-to-end DAG execution by building real execution graphs and running them through the full stack (CartridgeRuntime → CartridgeHostRuntime → RelaySwitch).

Test patterns exercised by Macino:

  • Linear chains (single-step and multi-step)
  • Fan-in nodes (multiple inputs to one cap)
  • Parallel independent branches
  • Cycle detection (rejected before execution)
  • Missing capability error propagation
  • Progress tracking through multi-step pipelines

Error Handling

Error types form a layered hierarchy:

graph BT
    OP["OpError<br/>(handler layer)"] --> RT["RuntimeError<br/>(cartridge layer)"]
    RT --> ERR["ERR frame<br/>(wire)"]
    ERR --> AH["AsyncHostError<br/>(host layer)"]
    AH --> RSE["RelaySwitchError<br/>(relay layer)"]
    RSE --> EX["ExecutionError<br/>(execution layer)"]
    EX --> TASK["Task failure<br/>(application layer)"]

Error propagation from cartridge to task:

graph TD
    subgraph "Cartridge Process"
        OP2["Op handler"] -->|"Err(OpError)"| DR["dispatch_op()"]
        DR -->|"ERR frame"| STDOUT["stdout"]
    end

    STDOUT --> PHR["CartridgeHostRuntime"]
    PHR --> RS["RelaySlave → RelayMaster"]
    RS --> SW["RelaySwitch"]
    SW --> EF["execute_fanin"]
    EF -->|"ExecutionError::<br/>CartridgeExecutionFailed"| CI["cap_interpreter"]
    CI -->|"task state → Failed"| DB["SQLite"]
Error Cause
CartridgeNotFound No cartridge provides the requested cap
ActivityTimeout No frames received for > timeout seconds
CartridgeExecutionFailed Cartridge returned ERR frame
NoIncomingData Missing source node data (dependency not met)
NotADag Cycle detected during validation

All errors are terminal for the DAG execution. There is no retry logic — the caller decides whether to retry.

Missing Capability

If no cap accepts the request URN, the RelaySwitch returns RelaySwitchError::NoHandler. This propagates as CartridgeNotFound in the executor. The error message includes the cap URN that could not be routed.