cap: Documentation
Browse Sign In

Runtime Hosting

Cartridge runtime, host runtime, relay topology, and capability routing

Architecture

graph TD
    Engine["Engine / DAG Executor"]
    Switch["RelaySwitch<br/>Routes REQ by cap URN<br/>Routes continuations by RID<br/>Tracks peer invocations"]

    Engine --> Switch

    subgraph Host0 ["Host 0"]
        RM0["RelayMaster"] --> RS0["RelaySlave"]
        RS0 --> PH0["CartridgeHostRuntime"]
        PH0 --> P0A["Cartridge A"]
        PH0 --> P0B["Cartridge B"]
    end

    subgraph Host1 ["Host 1"]
        RM1["RelayMaster"] --> RS1["RelaySlave"]
        RS1 --> PH1["CartridgeHostRuntime"]
        PH1 --> P1A["Cartridge C"]
    end

    subgraph Host2 ["Host 2"]
        RM2["RelayMaster"] --> RS2["RelaySlave"]
        RS2 --> PH2["CartridgeHostRuntime"]
        PH2 --> P2A["Cartridge D"]
        PH2 --> P2B["Cartridge E"]
    end

    Switch --> RM0
    Switch --> RM1
    Switch --> RM2
  • CartridgeRuntime runs inside each cartridge process — handles handler registration, handshake, and frame I/O.
  • CartridgeHostRuntime manages one or more cartridge processes — spawning, routing, health monitoring.
  • RelaySwitch routes requests across multiple hosts based on cap URN matching and specificity ranking.
  • Relay pairs (RelaySlave/RelayMaster) bridge the Unix socket between host and switch, intercepting control frames.

Request Flow

A single capability invocation flows through the full stack:

sequenceDiagram
    participant E as Engine
    participant SW as RelaySwitch
    participant RS as RelaySlave
    participant PH as CartridgeHost
    participant P as Cartridge

    E->>SW: REQ(cap)
    Note over SW: assign XID, match cap → master
    SW->>RS: REQ(cap, xid)
    RS->>PH: REQ(cap, xid)
    Note over PH: match cap → cartridge
    PH->>P: REQ(cap, xid)

    Note over P: handler executes

    P-->>PH: STREAM_START
    P-->>PH: CHUNK(data)
    P-->>PH: CHUNK(data)
    P-->>PH: STREAM_END
    P-->>PH: END
    PH-->>RS: (forwarded)
    RS-->>SW: (forwarded)
    SW-->>E: (forwarded)

Peer Invocation Flow

When Cartridge A calls a capability provided by Cartridge B:

sequenceDiagram
    participant PA as Cartridge A
    participant PHA as CartridgeHost A
    participant RSA as RelaySlave A
    participant SW as RelaySwitch
    participant PB as Cartridge B

    PA->>PHA: REQ(cap) — no XID
    Note over PHA: record RID in outgoing_rids
    PHA->>RSA: REQ(cap)
    RSA->>SW: REQ(cap)
    Note over SW: assign XID, record peer
    SW->>PB: REQ(cap, xid)

    PB-->>SW: response frames
    SW-->>RSA: response (by XID)
    RSA-->>PHA: response
    Note over PHA: route by RID to Cartridge A
    PHA-->>PA: PeerResponse

CartridgeRuntime

CartridgeRuntime is the cartridge-side entry point. A cartridge creates a runtime with its manifest, registers handlers for each capability, and calls run(). The runtime handles everything else — handshake, frame parsing, request routing, response framing, heartbeat replies.

let manifest = build_manifest();
let mut runtime = CartridgeRuntime::with_manifest(manifest);

runtime.register_op("cap:in=...;out=...;op=generate", || {
    Box::new(MyGenerateOp::default())
});

runtime.run().unwrap();

Mode Detection

flowchart TD
    A["runtime.run()"] --> B{CLI arguments?}
    B -->|No args| C["Cartridge CBOR Mode<br/>Binary frames on stdin/stdout"]
    B -->|Has args| D{First arg?}
    D -->|"manifest"| E["Print JSON manifest"]
    D -->|"--help"| F["Print subcommands"]
    D -->|slug match| G["CLI Mode<br/>Parse args, run handler,<br/>output NDJSON"]
    D -->|no match| H["UnknownSubcommand error"]

A single binary serves as both a Bifaci cartridge (when launched by the host with no args) and a standalone CLI tool (when run directly by a user).

Handler Registration

Two methods register handlers:

  • register_op(cap_urn, factory) — registers a factory closure that creates a fresh Op instance per request. Useful when handlers hold per-request state.
  • register_op_type::<T>(cap_urn) — registers a type implementing Op + Default. Instances are created via T::default().

When a REQ arrives, the runtime finds the handler by matching the request’s cap URN against registered cap URNs using is_dispatchable (see Dispatch). When multiple handlers match, the runtime ranks by specificity.

Cartridge Mode Main Loop

In cartridge CBOR mode, run() executes:

  1. Handshake — exchange HELLO frames, negotiate limits.
  2. Spawn writer task — a tokio task drains an mpsc channel and writes frames to stdout. This is the only thing that touches stdout.
  3. Read loop — read frames from stdin:
    • REQ: Look up handler, spawn handler task with InputPackage and OutputStream.
    • Heartbeat: Respond inline with same ID.
    • STREAM_START / CHUNK / STREAM_END / END: Route to the handler task owning the request ID.
    • stdin EOF: Break and shut down.
graph LR
    HA["Handler A"] --> CH["mpsc channel"]
    HB["Handler B"] --> CH
    HC["Handler C"] --> CH
    CH --> WT["Writer Task"] --> OUT["stdout"]

All handlers send frames through the channel. The writer serializes each frame to CBOR, prepends the 4-byte length, and writes to stdout.

Tokio Threading Model

graph LR
    subgraph Tokio Workers
        RT["Reader Task<br/>(stdin → dispatch)"]
        WT["Writer Task<br/>(channel → stdout)"]
        H1["Handler Task 1"]
        H2["Handler Task 2"]
    end

    subgraph Blocking Pool
        BL["spawn_blocking<br/>(FFI model load)"]
    end

    RT -->|route frames| H1
    RT -->|route frames| H2
    H1 -->|send frames| CH["mpsc channel"]
    H2 -->|send frames| CH
    BL -.->|ProgressSender| CH
    CH --> WT

If a handler blocks a tokio worker (e.g., synchronous FFI model load), the writer task cannot flush frames and the engine triggers its 120-second activity timeout:

sequenceDiagram
    participant H as Handler
    participant CH as mpsc Channel
    participant WT as Writer Task
    participant E as Engine

    H->>CH: progress(0.25, "Loading model...")
    Note over H: llama_model_load() — blocks tokio worker
    Note over WT: Needs tokio worker — none available
    Note over CH: Frames stuck in channel
    Note over E: 30s... 60s... 90s...
    E->>E: 120s — Activity Timeout!

run_with_keepalive() solves this by moving blocking work to tokio::task::spawn_blocking (a dedicated thread pool):

flowchart TD
    Q{"Blocking work needs<br/>granular progress?"}
    Q -->|"No — just keep alive"| RWK["run_with_keepalive()<br/>Fixed value every 30s"]
    Q -->|"Yes — per-token, per-step"| PS["ProgressSender<br/>Emit from inside closure"]

    RWK --> SB1["spawn_blocking(closure)"]
    PS --> SB2["spawn_blocking(move || {<br/>  ps.progress(...)  <br/>})"]

Op Trait

Handlers implement the Op<()> trait:

#[async_trait]
pub trait Op<T>: Send + Sync {
    async fn perform(&self, dry: &mut DryContext, wet: &mut WetContext) -> OpResult<T>;
    fn metadata(&self) -> OpMetadata;
}
flowchart LR
    REQ["REQ frame"] --> FH["find_handler<br/>(is_dispatchable)"]
    FH --> F["OpFactory"]
    F --> OP["Op instance"]
    OP --> P["perform()"]
    P -->|Ok| CL["close() → STREAM_END + END"]
    P -->|Err| ER["ERR frame"]

The runtime calls dispatch_op(), which creates a Request, inserts it into a WetContext, calls op.perform(), and on success closes the output stream (STREAM_END + END). On error, it sends an ERR frame.

The WetContext holds an Arc<Request> under the key "request". The Request struct bundles:

  • take_input()InputPackage: All input argument streams. Can only be called once.
  • output()&OutputStream: For emitting response data, progress, and log messages.
  • peer()&dyn PeerInvoker: For calling other cartridges’ capabilities.

Handler Data Access Pattern

flowchart LR
    WET["WetContext"] -->|get_arc| REQ["Request"]
    REQ -->|take_input| IP["InputPackage"]
    IP -->|collect_streams| ST["Vec&lt;(media_urn, bytes)&gt;"]
    ST -->|require_stream| DATA["Argument bytes"]
    ST -->|find_stream_str| OPT["Optional text"]

    REQ -->|output| OS["OutputStream"]
    OS -->|emit_cbor| OUT["Response"]
    OS -->|progress| LOG["LOG frame"]

OutputStream Auto-Framing

The OutputStream automatically injects STREAM_START before the first chunk and STREAM_END on close:

sequenceDiagram
    participant H as Handler
    participant OS as OutputStream
    participant W as Wire

    H->>OS: write(data) or emit_cbor(value)
    Note over OS: ensure_started()
    OS->>W: STREAM_START (auto)
    OS->>W: CHUNK (data)

    H->>OS: write(more)
    OS->>W: CHUNK (more)

    Note over H: handler returns Ok(())
    Note over OS: runtime calls close()
    OS->>W: STREAM_END
    Note over OS: runtime sends
    OS->>W: END

Peer Invocation from Handler

When a handler calls another cartridge’s capability via peer():

sequenceDiagram
    participant PA as Cartridge A
    participant PHA as CartridgeHost A
    participant RSA as RelaySlave A
    participant SW as RelaySwitch
    participant PB as Cartridge B

    Note over PA,PB: Request Phase
    PA->>PHA: REQ(cap) — no XID
    Note over PHA: record RID in outgoing_rids
    PHA->>RSA: REQ(cap)
    RSA->>SW: REQ(cap)
    Note over SW: assign XID, record peer
    SW->>PB: REQ(cap, xid)

    PA->>PHA: STREAM_START
    PA->>PHA: CHUNK(arg)
    PA->>PHA: STREAM_END
    PA->>PHA: END
    Note over PHA,PB: (forwarded through relay)

    Note over PA,PB: Response Phase
    PB-->>SW: LOG(progress)
    SW-->>RSA: LOG(progress)
    RSA-->>PHA: LOG(progress)
    PHA-->>PA: LOG(progress)

    PB-->>SW: CHUNK(result)
    SW-->>RSA: CHUNK
    RSA-->>PHA: CHUNK
    PHA-->>PA: CHUNK

    PB-->>SW: END
    SW-->>RSA: END
    RSA-->>PHA: END
    PHA-->>PA: END

Progress Path

graph TD
    PH["Cartridge handler<br/>output.progress(0.5, ...)"] --> LOG2["LOG frame on stdout"]
    LOG2 --> PHR["CartridgeHostRuntime<br/>(pass-through)"]
    PHR --> RS2["RelaySlave → RelayMaster<br/>(pass-through)"]
    RS2 --> EF["execute_fanin<br/>→ CapProgressFn callback"]
    EF --> CI["cap_interpreter<br/>→ map to step range"]
    CI --> DB["SQLite database"]
    DB --> UI["UI"]

Swift Equivalent

The Swift CartridgeRuntime in capdag-objc follows the same design. Key difference: Swift uses Task + DispatchSemaphore.wait() for handler dispatch — nesting another Task + semaphore inside perform() deadlocks the async executor. This constraint does not exist in Rust’s tokio model.

graph TD
    subgraph Rust
        R_CALL["run_with_keepalive()"] --> R_SB["spawn_blocking<br/>(blocking thread pool)"]
        R_CALL --> R_SEL["tokio::select!<br/>interval(30s)"]
        R_SEL -->|every 30s| R_PROG["Progress frame"]
        R_SB -->|complete| R_DONE["select! exits"]
    end

    subgraph Swift
        S_CALL["runWithKeepalive()"] --> S_TASK["Background Task<br/>loop { sleep(30s) }"]
        S_CALL --> S_OP["Async operation"]
        S_TASK -->|every 30s| S_PROG["Progress frame"]
        S_OP -->|complete| S_CANCEL["Cancel background Task"]
    end

CartridgeHostRuntime

CartridgeHostRuntime sits between a RelaySlave and one or more cartridge processes. It manages their full lifecycle — spawning, handshake, frame routing, health monitoring, and death handling.

Cartridge Registration

Two ways to connect cartridges:

  • register_plugin(path, known_caps) — registers a binary for on-demand spawning. The cartridge is not started until a REQ arrives for one of its capabilities. known_caps enables provisional routing before the HELLO handshake reveals the actual manifest.
  • attach_plugin(read, write) — attaches a pre-connected cartridge. Performs HELLO handshake and identity verification immediately.

Cartridge Spawning

sequenceDiagram
    participant R as Relay
    participant PH as CartridgeHostRuntime
    participant P as Cartridge (new)

    R->>PH: REQ(cap)
    Note over PH: Cartridge not running — spawn on demand
    PH->>P: spawn process (no args)

    rect rgb(240, 248, 255)
        Note over PH,P: Handshake
        PH->>P: Hello (limits)
        P->>PH: Hello (limits + manifest)
        PH->>P: REQ(CAP_IDENTITY, nonce)
        P->>PH: nonce echo
    end

    Note over PH: Update cap table from manifest
    PH->>P: Forward original REQ(cap)

If HELLO fails, the cartridge is marked with hello_failed = true and will not be spawned again.

Frame Routing

flowchart TD
    F["Frame from relay"] --> FT{frame_type?}
    FT -->|REQ| CAP["Lookup cap_urn<br/>in cap_table"]
    CAP --> RUN{Cartridge running?}
    RUN -->|No| SP["Spawn on demand"]
    SP --> FWD
    RUN -->|Yes| FWD["Forward to cartridge<br/>Record (XID,RID) in<br/>incoming_rxids"]

    FT -->|"STREAM_START<br/>CHUNK<br/>STREAM_END<br/>END<br/>ERR"| CONT["Lookup (XID,RID)<br/>in incoming_rxids"]
    CONT --> FWD2["Forward to<br/>handling cartridge"]

Engine → Cartridge: REQ frames are routed by cap URN lookup in the cap table. Continuation frames (STREAM_START, CHUNK, etc.) are routed by (XID, RID) lookup.

Cartridge → Engine: HEARTBEAT is handled locally (never forwarded). REQ from a cartridge (peer invoke) is forwarded to the relay with the source cartridge recorded. Everything else is forwarded to the relay.

Self-Loop Routing

When a cartridge peer-invokes a cap on another cartridge on the same host, both incoming_rxids and outgoing_rids contain entries for the same request IDs. The host resolves this by frame type discrimination — request body frames route via incoming, peer response frames route via outgoing.

Heartbeat Health Monitoring

The host probes each running cartridge every 30 seconds. A heartbeat unanswered for 10 seconds marks the cartridge as unhealthy.

Cartridge Death Handling

Three scenarios:

Scenario Behavior
Ordered shutdown Clean up routing entries, no ERR frames
Unexpected death with pending requests Send ERR frame for each pending request ID
Idle death Clean up routing entries; next REQ triggers respawn (cartridge is re-spawned on demand)
stateDiagram-v2
    [*] --> Registered : Discovery
    Registered --> Spawning : First REQ for this cap
    Spawning --> Handshaking : Process started
    Handshaking --> Live : HELLO + identity OK
    Handshaking --> Failed : HELLO failed
    Live --> Live : Handle requests
    Live --> Dead : Process died
    Live --> Killed : Engine shutdown / idle timeout
    Dead --> Spawning : Next REQ triggers respawn
    Killed --> Registered : Available for respawn
    Failed --> [*] : Permanent failure

On unexpected death:

sequenceDiagram
    participant P as Cartridge
    participant PH as CartridgeHostRuntime
    participant E as Engine

    Note over P: Process dies unexpectedly
    P--xPH: stdout EOF
    Note over PH: Detect death via reader task
    PH->>PH: Read stderr (last_death_message)
    loop For each pending request
        PH->>E: ERR frame (death message)
    end
    Note over E: ExecutionError::CartridgeExecutionFailed<br/>or ExecutionError::HostError

Capability Advertisement

When capabilities change, the host rebuilds its aggregate manifest and sends a RelayNotify frame to the relay. This propagates through the switch so the engine’s view stays in sync.

Relay Topology

A RelaySlave/RelayMaster pair forms a transparent frame bridge over a Unix socket:

graph LR
    SW["RelaySwitch"] <-->|Unix socket| RM["RelayMaster"]
    RM <-->|Unix socket| RS["RelaySlave"]
    RS <--> PH["CartridgeHostRuntime"]

    style RM fill:#e8f4fd
    style RS fill:#e8f4fd

All regular frames pass through unchanged. Only two frame types are intercepted:

sequenceDiagram
    participant PH as CartridgeHostRuntime
    participant RS as RelaySlave
    participant RM as RelayMaster
    participant SW as RelaySwitch

    Note over PH,SW: RelayNotify (slave → master)
    PH->>RS: Caps changed
    RS->>RM: RelayNotify (manifest, limits)
    Note over RM: Stores manifest — never forwarded
    RM--xSW: (intercepted)

    Note over PH,SW: RelayState (master → slave)
    SW->>RM: RelayState (resource info)
    RM->>RS: RelayState (payload)
    Note over RS: Stores in resource_state — never forwarded
    RS--xPH: (intercepted)

    Note over PH,SW: Regular frames
    PH->>RS: REQ / CHUNK / END / etc.
    RS->>RM: (pass-through)
    RM->>SW: (pass-through)
  • RelayNotify (Slave → Master): Carries capability advertisements. Stored by the master, consumed by the switch for routing.
  • RelayState (Master → Slave): Carries host resource information (model paths, GPU availability, security bookmarks). Stored by the slave, accessible by the cartridge host runtime.

Connection Lifecycle

sequenceDiagram
    participant E as Engine
    participant RM as RelayMaster
    participant RS as RelaySlave
    participant PH as CartridgeHostRuntime

    Note over E,PH: 1. Socket creation
    E->>RM: Create Unix socket pair
    E->>RS: Other end of socket

    Note over E,PH: 2. Initial advertisement
    RS->>RM: RelayNotify (caps, limits)

    Note over E,PH: 3. Identity verification
    RM->>RS: REQ(CAP_IDENTITY)
    RS->>PH: (forward)
    PH-->>RS: nonce echo
    RS-->>RM: (forward)

    Note over E,PH: 4. Spawn reader tasks
    Note over E,PH: 5. Bidirectional flow — relay is live
  1. Socket creation — the engine creates a Unix socket pair.
  2. Initial advertisement — the RelaySlave sends a RelayNotify on startup.
  3. Identity verification — REQ for CAP_IDENTITY flows through the relay chain; nonce echo response flows back.
  4. Reader tasks — both sides spawn reader tasks for ongoing frame forwarding.
  5. Bidirectional flow — the relay is live. Shutdown occurs when either side closes the socket.

RelaySwitch

The RelaySwitch is the central routing component. It connects to one or more relay masters and routes requests based on cap URN matching.

All methods take &self — interior mutability (RwLock, Mutex, AtomicU64) allows multiple concurrent DAG executions to route frames simultaneously.

Cap Routing

flowchart TD
    REQ["REQ(cap_urn)"] --> PARSE["Parse cap URN"]
    PARSE --> MATCH["Match against each<br/>master's cap set<br/>(is_dispatchable)"]
    MATCH --> RANK{Multiple matches?}
    RANK -->|Yes| SPEC["Rank by specificity"]
    RANK -->|No| SKIP
    SPEC --> SKIP{Master healthy?}
    SKIP -->|No| ERR["RelaySwitchError::NoHandler"]
    SKIP -->|Yes| XID["Assign XID<br/>(atomic increment)"]
    XID --> REC["Record (XID, RID)<br/>in routing tables"]
    REC --> FWD["Forward REQ to<br/>selected master"]
  1. Parse the cap field as a Cap URN.
  2. Match against each master’s capability set using is_dispatchable (contravariant input, covariant output).
  3. Rank by specificity if multiple masters match.
  4. Assign XID via atomic counter.
  5. Record routing in request_routing, rid_to_xid, and origin_map.
  6. Forward the REQ to the selected master.

XID Assignment

XIDs (routing IDs, key 13 in frames) are unsigned integers assigned by the switch at routing boundaries. Cartridges never see XIDs directly — they are purely infrastructure.

Request Tracking

Table Key Purpose
request_routing (XID, RID) Route continuation frames to the right master
rid_to_xid RID Look up XID from RID
peer_requests (XID, RID) Track peer invocations for cleanup
origin_map (XID, RID) Route responses back to source (engine or cartridge)

Terminal frames (END, ERR) trigger cleanup of all tables.

Multiple Masters

graph TD
    SW["RelaySwitch"]
    SW --> RM0["Master 0"]
    SW --> RM1["Master 1"]
    SW --> RM2["Master 2"]

    RM0 --> PH0["Host 0"] --> P0["Cartridges..."]
    RM1 --> PH1["Host 1"] --> P1["Cartridges..."]
    RM2 --> PH2["Host 2"] --> P2["Cartridges..."]

Masters are independent. A failure in one does not affect the others — the switch marks the failed master as unhealthy and continues routing to healthy masters. New masters can be added at runtime via add_master().

Capability Aggregation

capabilities() returns the union of all healthy masters’ capabilities. The switch also exposes graph queries for the planner:

  • get_reachable_targets() — which output media URNs are reachable from a given input.
  • find_paths_to_exact_target() — machines that transform input to output.