5 min readResonate HQJust published

Per-message Kafka worker with crash recovery in Rust

How the Rust SDK turns a per-Kafka-message handler into a durable workflow keyed by the message id, with non-blocking dispatch and replay from the last checkpoint.

Resonate brand card on a dark background with an ember spectrum wave at the bottom and the post headline in white Sansation.

A worker that pulls from one Kafka topic, runs a multi-step task, and pushes to another has two failure shapes: head-of-line blocking when a slow message stalls faster ones behind it, and redoing already-completed steps when the worker crashes mid-task. This example uses the Resonate Rust SDK to turn the per-message handler into a durable workflow keyed by the record id, so the consumer dispatches non-blockingly and any crashed step is replayed from its last checkpoint. The example reads record ids off a records_to_be_deleted Redpanda topic, runs a multi-step batch-deletion loop per id, and publishes a completion to records_that_were_deleted.

The shape of the solution

/// The durable workflow.
///
/// Loops until `delete_batch` returns false, then publishes a completion
/// message. Each `ctx.run(...)` call is a checkpoint — if the worker
/// crashes mid-flight, Resonate replays from the last checkpoint and the
/// workflow resumes on whichever worker picks it up next.
#[resonate::function]
pub async fn workflow(ctx: &Context, record_id: String, offset: String) -> Result<String> {
    println!("processing record {record_id} in position {offset}");
    while ctx.run(delete_batch, record_id.clone()).await? {
        println!("record {record_id} still has rows to delete");
        ctx.sleep(Duration::from_secs(5)).await?;
    }
    println!("all rows deleted for record {record_id} in position {offset}");
    ctx.run(enqueue_completion, (record_id.clone(), offset.clone()))
        .await?;
    Ok(format!("workflow {record_id} completed"))
}
// from example-kafka-worker-rs/src/lib.rs:67

The consumer side does no orchestration of its own. It pulls each Kafka message, parses the record id, and hands it to resonate.run(...).spawn() — the Rust SDK's non-blocking dispatch. The consumer gets a handle back, drops it, and moves on to the next message:

// Non-blocking dispatch — the Rust SDK equivalent
// of TS `beginRun`. We get a handle back and drop
// it; the workflow runs in the background, on this
// worker or any other in the `workers` group.
match resonate
    .run(&record_id, workflow, (record_id.clone(), offset.clone()))
    .target("poll://any@workers")
    .spawn()
    .await
{
    Ok(_handle) => {
        println!(
            "kicked off workflow for record {record_id} (partition={partition}, offset={offset})"
        );
    }
    Err(e) => eprintln!("failed to begin workflow for {record_id}: {e}"),
}
// from example-kafka-worker-rs/src/bin/consumer.rs:91

The same process registers the workflow and leaf functions and so can both dispatch and execute them, addressed by the workers group:

let resonate = Resonate::new(ResonateConfig {
    url: Some("http://localhost:8001".into()),
    group: Some("workers".into()),
    ..Default::default()
});
 
// ...
 
resonate.register(workflow).unwrap();
resonate.register(delete_batch).unwrap();
resonate.register(enqueue_completion).unwrap();
// from example-kafka-worker-rs/src/bin/consumer.rs:23

The durable primitives in play

  • #[resonate::function] — proc-macro that registers a function so the SDK can call it under a durable promise and replay its ctx.run calls from history. Applied at src/lib.rs:22, src/lib.rs:42, and src/lib.rs:73.
  • resonate.run(id, fn, args).target("poll://any@workers").spawn() — non-blocking dispatch from the consumer loop. id is the durable promise id; if the same id is dispatched again, the SDK returns the existing promise instead of starting a new run. src/bin/consumer.rs:95.
  • ctx.run(delete_batch, record_id.clone()) — durable call to a registered function. Result is checkpointed; on replay the function is not re-executed, the checkpointed result is returned. src/lib.rs:76.
  • ctx.run(enqueue_completion, (record_id.clone(), offset.clone())) — second durable call, ordering enforced by sequential await in the workflow body. src/lib.rs:81.
  • ctx.sleep(Duration::from_secs(5)) — durable sleep. Survives crashes; the SDK records the wake time so a replay on a different worker still wakes at the original deadline. src/lib.rs:78.
  • Error::Application { message } returned from delete_batch — an application error, which the SDK retries automatically per the function's retry policy. src/lib.rs:29.
  • Durable promise id = the Kafka record id — set via the first argument to resonate.run(...). The README states: "the record ID is used as the Durable Promise ID" (README.md:104) and "By doing this, we ensure that no duplicate operations will ever happen for a given ID" (README.md:106).

What the SDK handles vs. what you write

What you write: the workflow function as straight-line async Rust — a while loop calling ctx.run(delete_batch, ...) until it returns false, a ctx.sleep between iterations, and a final ctx.run(enqueue_completion, ...). The two leaf functions (delete_batch, enqueue_completion) are plain async fns annotated with #[resonate::function]. The Kafka consumer loop is also yours: read a message, parse the id, call resonate.run(...).spawn(), drop the handle.

What the SDK handles: persisting a durable promise per record id at dispatch (src/bin/consumer.rs:95), checkpointing every ctx.run result before returning to the workflow body, retrying delete_batch automatically when it returns Error::Application (src/lib.rs:29), recording sleep deadlines so ctx.sleep survives process death, deduplicating dispatch by promise id so a re-delivered Kafka message resolves to the existing promise instead of starting a second run, and routing the workflow to any worker subscribed at poll://any@workers so a second consumer picks up where a killed one left off — the behavior the README's "Recovery" section describes (README.md:56).

Failure modes covered

  • Worker crashes between delete_batch iterations. The result of each completed ctx.run(delete_batch, ...) (src/lib.rs:76) is checkpointed before the loop iterates. On restart — or on a different worker in the workers group — the workflow replays from history and re-enters the loop on the next un-checkpointed iteration. The README's "Recovery" section spells this out: kill a consumer mid-run, bring it back, watch progress resume (README.md:56).
  • Worker crashes between the final delete_batch and enqueue_completion. The exit-loop state is checkpointed, so replay skips the deletion loop and starts at ctx.run(enqueue_completion, ...) (src/lib.rs:81). The completion message is published exactly once on the happy path. If the worker dies after the Kafka producer.send(...).await (src/lib.rs:54) returns but before ctx.run(enqueue_completion, ...) checkpoints its success, replay will re-execute enqueue_completion and the completion message will be sent a second time — the checkpoint lands on ctx.run success, not on Kafka ack.
  • Transient error inside delete_batch. The function returns Err(Error::Application { ... }) on a 25% roll (src/lib.rs:26). The SDK retries delete_batch automatically per the registered function's retry policy; the workflow body is not re-entered until the leaf either succeeds or exhausts retries.
  • The same Kafka message is re-delivered. The consumer uses the record id as the durable promise id (src/bin/consumer.rs:96). A second resonate.run(...).spawn() with the same id resolves to the existing promise rather than starting a new run, so duplicate Kafka delivery does not produce duplicate deletions.
  • A slow record blocks a fast one behind it. The consumer never awaits workflow completion. .spawn() returns a handle that is immediately dropped (src/bin/consumer.rs:101), so the next iteration of the consumer loop pulls the next Kafka message while the previous workflow keeps running in the background.
  • Two consumers are running and one is killed mid-workflow. Both register against the workers group (src/bin/consumer.rs:25). Workflows are dispatched to poll://any@workers (src/bin/consumer.rs:97), so the survivor picks up and resumes from the last checkpoint of the killed worker's in-flight workflow — the second scenario in the README's "Recovery" section (README.md:58).

When to reach for this pattern

  • If a Kafka or other queue consumer dispatches multi-step tasks and you need replay across worker restarts without re-running completed steps.
  • If a slow message must not block faster messages behind it on the same consumer.
  • If the same message id may arrive twice (at-least-once delivery, manual replays, retries from upstream) and you need exactly-once semantics for the downstream effect.
  • If workers can scale horizontally and a killed worker's in-flight task should resume on a surviving worker.
  • If an individual step has transient failure modes that warrant automatic retry without re-running prior steps.
  • If the per-message task can take an unknown amount of time and you want the orchestrating code to read as a plain loop, not a state machine.

Sources