Integration Recipes
End-to-end patterns for requests, DAG execution, and the orchestrator pipeline
End-to-End Runtime Request
A single capability invocation follows this sequence:
sequenceDiagram
participant C as Client
participant PH as CartridgeHostRuntime
participant P as Cartridge
C->>PH: REQ (cap URN)
PH->>P: Forward REQ
C->>PH: STREAM_START (arg stream)
PH->>P: Forward
C->>PH: CHUNK (data)
PH->>P: Forward
C->>PH: STREAM_END
PH->>P: Forward
C->>PH: END
PH->>P: Forward
P->>PH: STREAM_START (result stream)
PH->>C: Forward
P->>PH: CHUNK (result data)
PH->>C: Forward
P->>PH: STREAM_END
PH->>C: Forward
P->>PH: END
PH->>C: Forward
- Handshake — host and cartridge exchange HELLO frames, negotiate limits (max_frame, max_chunk).
- REQ — client sends a REQ with the cap URN to invoke.
- Input streams — arguments arrive as STREAM_START / CHUNK / STREAM_END sequences. Each argument is a separate stream with its own media URN.
- Output streams — cartridge returns result data as streams, followed by END.
- Failure — invalid frames or routing mismatches produce ERR, terminating the request flow. ERR and END are mutually exclusive — a request ends with exactly one of them.
Orchestrator Pipeline
The orchestrator bridges textual DAG descriptions and the execution engine. It takes machine notation (or a MachinePlan) and produces a ResolvedGraph — a validated DAG ready for execution.
flowchart LR
MN["Machine Notation<br/>or MachinePlan"] --> PARSE["Parse"]
PARSE --> RESOLVE["Resolve<br/>cap URNs"]
RESOLVE --> VALIDATE["Validate<br/>media compat"]
VALIDATE --> CYCLE["Check<br/>for cycles"]
CYCLE --> RG["ResolvedGraph"]
Machine Notation
Machine notation is a compact text format for describing transformation pipelines. Step declarations bind an alias to a cap URN:
[extract cap:in="media:pdf";out="media:record";op=extract;target=metadata]
Wiring declares data flow between steps:
[media:pdf -> extract -> media:record]
A linear chain:
[media:pdf -> extract -> media:text -> summarize -> media:text]
graph LR
A["media:pdf"] -->|extract| B["media:text"]
B -->|summarize| C["media:text"]
Fan-in (multiple sources into one step):
[media:image -> describe -> media:text]
[media:text -> describe]
Parsing Pipeline
parse_machine_to_cap_dag() converts machine notation into a ResolvedGraph:
- Parse notation into a
Machinegraph (nodes as media URN endpoints, edges as cap invocations). - Resolve cap URNs — look up each cap URN in the
CapRegistryfor the fullCapdefinition. - Validate media compatibility — check that each node’s media URN is consistent across all connecting edges.
- Check for cycles — verify the graph is a DAG.
- Return the
ResolvedGraph.
DAG Execution
execute_dag() takes a ResolvedGraph and runs it:
flowchart TD
RG["ResolvedGraph"] --> DISC["1. Discover cartridges"]
DISC --> INFRA["2. Create infrastructure<br/>(HostRuntime, Relay, Switch)"]
INFRA --> REG["3. Register all caps"]
REG --> GRP["4. Group edges by<br/>(target, cap_urn)"]
GRP --> TOPO["5. Topological sort"]
TOPO --> PROG["6. Pre-compute<br/>progress boundaries"]
PROG --> EXEC["7. Execute each group<br/>via execute_fanin()"]
EXEC --> RES["8. Return results"]
- Discover cartridges — collect all unique cap URNs from edges, find cartridge binaries that provide them.
- Create infrastructure — set up CartridgeHostRuntime, RelaySlave/RelayMaster pair, and RelaySwitch.
- Register all caps — register ALL manifest caps from each cartridge (not just DAG-referenced ones, because cartridges may make peer invocations to other caps).
- Group edges — group by
(target_node, cap_urn)intoEdgeGroups. Each group is one cap invocation, potentially with multiple input streams (fan-in). - Topological sort — order groups so each executes after its inputs are available. Uses Kahn’s algorithm.
- Pre-compute progress — calculate progress boundaries for each group.
- Execute — run each group via
execute_fanin(). - Return —
HashMap<node_id, NodeData>with output data from each node.
execute_fanin
The core function for a single cap invocation:
sequenceDiagram
participant EX as Executor
participant SW as RelaySwitch
participant P as Cartridge
EX->>SW: register_external_request(rid, cap_urn)
Note over SW: Returns (XID, response_rx)
loop For each source in EdgeGroup
EX->>SW: STREAM_START (stream_id, media_urn)
EX->>SW: CHUNK(s)
EX->>SW: STREAM_END
end
EX->>SW: END
Note over SW: Routes to cartridge via cap matching
SW->>P: (forwarded)
loop select! loop
alt Response frame
P-->>EX: CHUNK → accumulate data
P-->>EX: LOG → forward progress
P-->>EX: END → done
P-->>EX: ERR → fail
else Pump timeout (200ms)
Note over EX: Route peer frames through switch
else Activity timeout (120s)
Note over EX: ExecutionError::ActivityTimeout
end
end
The response collection loop uses tokio::select! to concurrently:
- Receive response frames on the response channel.
- Pump frames through the RelaySwitch (for peer invocations — without this, a cartridge doing a peer call would deadlock).
Activity Timeout
Default: 120 seconds. Any received frame resets the timer. Caps can override via activity_timeout_secs in their metadata.
While idle, the executor logs a warning every 30 seconds.
Progress Subdivision
Each edge group gets an equal share of the overall [0.0, 1.0] progress range. Group i maps its progress to [i/n, (i+1)/n).
Progress nesting across three layers (DAG → handler → peer call):
graph TD
subgraph "DAG (2 groups)"
G0["Group 0<br/>[0.0, 0.5]"]
G1["Group 1<br/>[0.5, 1.0]"]
end
subgraph "Group 0 Handler"
DL["Download<br/>[0.0, 0.25] of handler"]
INF["Inference<br/>[0.25, 1.0] of handler"]
end
subgraph "Effective Ranges"
DLE["Download effective<br/>[0.0, 0.125] of task"]
INFE["Inference effective<br/>[0.125, 0.5] of task"]
end
G0 --> DL
G0 --> INF
DL --> DLE
INF --> INFE
Progress flows from cartridge to UI:
sequenceDiagram
participant P as Cartridge Handler
participant CI as cap_interpreter
participant DB as SQLite
participant UI as UI
P->>CI: LOG frame<br/>progress(0.5, "msg")
Note over CI: CapProgressFn called<br/>map_progress(0.5,<br/>step_base, step_weight)
CI->>DB: UPDATE progress
UI->>DB: poll
DB-->>UI: progress value
Planner
The planner builds execution plans by finding paths through a capability graph. Media types are nodes; capabilities are edges:
graph LR
PDF["media:pdf"] -->|extract_metadata| REC["media:record"]
PDF -->|generate_thumbnail| IMG["media:image;png"]
PDF -->|extract_text| TXT["media:text"]
TXT -->|generate_embeddings| EMB["media:embedding"]
TXT -->|summarize| TXT2["media:text"]
IMG -->|describe_image| TXT
The MachinePlanBuilder converts a path (Strand) plus argument specs into a MachinePlan:
flowchart LR
S["Strand<br/>(path A→B→C)"] --> PB["MachinePlanBuilder"]
ARGS["Argument specs"] --> PB
PB --> IS["InputSlot nodes"]
PB --> CAP["Cap nodes"]
PB --> OUT["Output nodes"]
IS --> MP["MachinePlan"]
CAP --> MP
OUT --> MP
Slot Value Resolution
Three-level priority for resolving slot values:
flowchart TD
Q{"slot_values<br/>[step_N:slot_name]?"} -->|Found| V1["Use explicit<br/>per-step value"]
Q -->|Not found| Q2{"cap_settings<br/>[cap_urn][slot_name]?"}
Q2 -->|Found| V2["Use per-cap<br/>setting"]
Q2 -->|Not found| Q3{"default_value<br/>in cap definition?"}
Q3 -->|Found| V3["Use default"]
Q3 -->|Not found| V4["No value<br/>(argument omitted)"]
Cartridge Patterns
Model Cartridges
Model cartridges (GGUF, Candle, MLX) follow a three-phase handler pattern:
graph LR
DL["1. Download<br/>(peer call to<br/>modelcartridge)"] --> LD["2. Load<br/>(blocking FFI via<br/>run_with_keepalive)"]
LD --> INF["3. Inference<br/>(streaming output<br/>with progress)"]
DL -.- P1["[0.00 – 0.25]"]
LD -.- P2["[0.25 – 0.35]"]
INF -.- P3["[0.35 – 0.95]"]
Content Cartridges
Content cartridges (PDF, text) produce derived artifacts:
graph LR
PDF["media:pdf"] -->|generate_thumbnail| IMG["media:image;png"]
PDF -->|extract_metadata| REC["media:record"]
PDF -->|extract_outline| JSON["media:json"]
PDF -->|disbind| LIST["media:pdf;list"]
TXT["media:text"] -->|generate_thumbnail| IMG2["media:image;png"]
TXT -->|extract_metadata| REC2["media:record"]
MD["media:text;markdown"] -->|extract_outline| JSON2["media:json"]
Task Integration
Tasks track DAG execution progress through a state machine:
stateDiagram-v2
[*] --> Created : TriggerFuse event
Created --> Running : Execute DAG
Running --> Completed : Success (progress = 1.0)
Running --> Failed : ExecutionError
Running --> Cancelled : control_flags.aborted
Graph Execution with Macino
Macino (capdag/macino/) is the integration test harness for graph execution. It validates end-to-end DAG execution by building real execution graphs and running them through the full stack (CartridgeRuntime → CartridgeHostRuntime → RelaySwitch).
Test patterns exercised by Macino:
- Linear chains (single-step and multi-step)
- Fan-in nodes (multiple inputs to one cap)
- Parallel independent branches
- Cycle detection (rejected before execution)
- Missing capability error propagation
- Progress tracking through multi-step pipelines
Error Handling
Error types form a layered hierarchy:
graph BT
OP["OpError<br/>(handler layer)"] --> RT["RuntimeError<br/>(cartridge layer)"]
RT --> ERR["ERR frame<br/>(wire)"]
ERR --> AH["AsyncHostError<br/>(host layer)"]
AH --> RSE["RelaySwitchError<br/>(relay layer)"]
RSE --> EX["ExecutionError<br/>(execution layer)"]
EX --> TASK["Task failure<br/>(application layer)"]
Error propagation from cartridge to task:
graph TD
subgraph "Cartridge Process"
OP2["Op handler"] -->|"Err(OpError)"| DR["dispatch_op()"]
DR -->|"ERR frame"| STDOUT["stdout"]
end
STDOUT --> PHR["CartridgeHostRuntime"]
PHR --> RS["RelaySlave → RelayMaster"]
RS --> SW["RelaySwitch"]
SW --> EF["execute_fanin"]
EF -->|"ExecutionError::<br/>CartridgeExecutionFailed"| CI["cap_interpreter"]
CI -->|"task state → Failed"| DB["SQLite"]
| Error | Cause |
|---|---|
CartridgeNotFound |
No cartridge provides the requested cap |
ActivityTimeout |
No frames received for > timeout seconds |
CartridgeExecutionFailed |
Cartridge returned ERR frame |
NoIncomingData |
Missing source node data (dependency not met) |
NotADag |
Cycle detected during validation |
All errors are terminal for the DAG execution. There is no retry logic — the caller decides whether to retry.
Missing Capability
If no cap accepts the request URN, the RelaySwitch returns RelaySwitchError::NoHandler. This propagates as CartridgeNotFound in the executor. The error message includes the cap URN that could not be routed.