A worker that pulls from one Kafka topic, runs a multi-step task, and pushes to another has two failure shapes: head-of-line blocking when a slow message stalls faster ones behind it, and redoing already-completed steps when the worker crashes mid-task. This example uses the Resonate Rust SDK to turn the per-message handler into a durable workflow keyed by the record id, so the consumer dispatches non-blockingly and any crashed step is replayed from its last checkpoint. The example reads record ids off a records_to_be_deleted Redpanda topic, runs a multi-step batch-deletion loop per id, and publishes a completion to records_that_were_deleted.
The shape of the solution
/// The durable workflow.
///
/// Loops until `delete_batch` returns false, then publishes a completion
/// message. Each `ctx.run(...)` call is a checkpoint — if the worker
/// crashes mid-flight, Resonate replays from the last checkpoint and the
/// workflow resumes on whichever worker picks it up next.
#[resonate::function]
pub async fn workflow(ctx: &Context, record_id: String, offset: String) -> Result<String> {
println!("processing record {record_id} in position {offset}");
while ctx.run(delete_batch, record_id.clone()).await? {
println!("record {record_id} still has rows to delete");
ctx.sleep(Duration::from_secs(5)).await?;
}
println!("all rows deleted for record {record_id} in position {offset}");
ctx.run(enqueue_completion, (record_id.clone(), offset.clone()))
.await?;
Ok(format!("workflow {record_id} completed"))
}
// from example-kafka-worker-rs/src/lib.rs:67The consumer side does no orchestration of its own. It pulls each Kafka message, parses the record id, and hands it to resonate.run(...).spawn() — the Rust SDK's non-blocking dispatch. The consumer gets a handle back, drops it, and moves on to the next message:
// Non-blocking dispatch — the Rust SDK equivalent
// of TS `beginRun`. We get a handle back and drop
// it; the workflow runs in the background, on this
// worker or any other in the `workers` group.
match resonate
.run(&record_id, workflow, (record_id.clone(), offset.clone()))
.target("poll://any@workers")
.spawn()
.await
{
Ok(_handle) => {
println!(
"kicked off workflow for record {record_id} (partition={partition}, offset={offset})"
);
}
Err(e) => eprintln!("failed to begin workflow for {record_id}: {e}"),
}
// from example-kafka-worker-rs/src/bin/consumer.rs:91The same process registers the workflow and leaf functions and so can both dispatch and execute them, addressed by the workers group:
let resonate = Resonate::new(ResonateConfig {
url: Some("http://localhost:8001".into()),
group: Some("workers".into()),
..Default::default()
});
// ...
resonate.register(workflow).unwrap();
resonate.register(delete_batch).unwrap();
resonate.register(enqueue_completion).unwrap();
// from example-kafka-worker-rs/src/bin/consumer.rs:23The durable primitives in play
#[resonate::function]— proc-macro that registers a function so the SDK can call it under a durable promise and replay itsctx.runcalls from history. Applied atsrc/lib.rs:22,src/lib.rs:42, andsrc/lib.rs:73.resonate.run(id, fn, args).target("poll://any@workers").spawn()— non-blocking dispatch from the consumer loop.idis the durable promise id; if the same id is dispatched again, the SDK returns the existing promise instead of starting a new run.src/bin/consumer.rs:95.ctx.run(delete_batch, record_id.clone())— durable call to a registered function. Result is checkpointed; on replay the function is not re-executed, the checkpointed result is returned.src/lib.rs:76.ctx.run(enqueue_completion, (record_id.clone(), offset.clone()))— second durable call, ordering enforced by sequentialawaitin the workflow body.src/lib.rs:81.ctx.sleep(Duration::from_secs(5))— durable sleep. Survives crashes; the SDK records the wake time so a replay on a different worker still wakes at the original deadline.src/lib.rs:78.Error::Application { message }returned fromdelete_batch— an application error, which the SDK retries automatically per the function's retry policy.src/lib.rs:29.- Durable promise id = the Kafka record id — set via the first argument to
resonate.run(...). The README states: "the record ID is used as the Durable Promise ID" (README.md:104) and "By doing this, we ensure that no duplicate operations will ever happen for a given ID" (README.md:106).
What the SDK handles vs. what you write
What you write: the workflow function as straight-line async Rust — a while loop calling ctx.run(delete_batch, ...) until it returns false, a ctx.sleep between iterations, and a final ctx.run(enqueue_completion, ...). The two leaf functions (delete_batch, enqueue_completion) are plain async fns annotated with #[resonate::function]. The Kafka consumer loop is also yours: read a message, parse the id, call resonate.run(...).spawn(), drop the handle.
What the SDK handles: persisting a durable promise per record id at dispatch (src/bin/consumer.rs:95), checkpointing every ctx.run result before returning to the workflow body, retrying delete_batch automatically when it returns Error::Application (src/lib.rs:29), recording sleep deadlines so ctx.sleep survives process death, deduplicating dispatch by promise id so a re-delivered Kafka message resolves to the existing promise instead of starting a second run, and routing the workflow to any worker subscribed at poll://any@workers so a second consumer picks up where a killed one left off — the behavior the README's "Recovery" section describes (README.md:56).
Failure modes covered
- Worker crashes between
delete_batchiterations. The result of each completedctx.run(delete_batch, ...)(src/lib.rs:76) is checkpointed before the loop iterates. On restart — or on a different worker in theworkersgroup — the workflow replays from history and re-enters the loop on the next un-checkpointed iteration. The README's "Recovery" section spells this out: kill a consumer mid-run, bring it back, watch progress resume (README.md:56). - Worker crashes between the final
delete_batchandenqueue_completion. The exit-loop state is checkpointed, so replay skips the deletion loop and starts atctx.run(enqueue_completion, ...)(src/lib.rs:81). The completion message is published exactly once on the happy path. If the worker dies after the Kafkaproducer.send(...).await(src/lib.rs:54) returns but beforectx.run(enqueue_completion, ...)checkpoints its success, replay will re-executeenqueue_completionand the completion message will be sent a second time — the checkpoint lands onctx.runsuccess, not on Kafka ack. - Transient error inside
delete_batch. The function returnsErr(Error::Application { ... })on a 25% roll (src/lib.rs:26). The SDK retriesdelete_batchautomatically per the registered function's retry policy; the workflow body is not re-entered until the leaf either succeeds or exhausts retries. - The same Kafka message is re-delivered. The consumer uses the record id as the durable promise id (
src/bin/consumer.rs:96). A secondresonate.run(...).spawn()with the same id resolves to the existing promise rather than starting a new run, so duplicate Kafka delivery does not produce duplicate deletions. - A slow record blocks a fast one behind it. The consumer never
awaits workflow completion..spawn()returns a handle that is immediately dropped (src/bin/consumer.rs:101), so the next iteration of the consumer loop pulls the next Kafka message while the previous workflow keeps running in the background. - Two consumers are running and one is killed mid-workflow. Both register against the
workersgroup (src/bin/consumer.rs:25). Workflows are dispatched topoll://any@workers(src/bin/consumer.rs:97), so the survivor picks up and resumes from the last checkpoint of the killed worker's in-flight workflow — the second scenario in the README's "Recovery" section (README.md:58).
When to reach for this pattern
- If a Kafka or other queue consumer dispatches multi-step tasks and you need replay across worker restarts without re-running completed steps.
- If a slow message must not block faster messages behind it on the same consumer.
- If the same message id may arrive twice (at-least-once delivery, manual replays, retries from upstream) and you need exactly-once semantics for the downstream effect.
- If workers can scale horizontally and a killed worker's in-flight task should resume on a surviving worker.
- If an individual step has transient failure modes that warrant automatic retry without re-running prior steps.
- If the per-message task can take an unknown amount of time and you want the orchestrating code to read as a plain loop, not a state machine.
Sources
- Example repo: https://github.com/resonatehq-examples/example-kafka-worker-rs
- Resonate Rust SDK: https://github.com/resonatehq/resonate-sdk-rs
- Workflow + leaf functions:
src/lib.rs(https://github.com/resonatehq-examples/example-kafka-worker-rs/blob/main/src/lib.rs) - Kafka consumer + non-blocking dispatch:
src/bin/consumer.rs(https://github.com/resonatehq-examples/example-kafka-worker-rs/blob/main/src/bin/consumer.rs) - Kafka producer:
src/bin/producer.rs(https://github.com/resonatehq-examples/example-kafka-worker-rs/blob/main/src/bin/producer.rs) - Redpanda compose file:
docker-compose.yml(https://github.com/resonatehq-examples/example-kafka-worker-rs/blob/main/docker-compose.yml) - Resonate docs — Kafka worker pattern: https://docs.resonatehq.io/get-started/examples/kafka-worker
- Resonate docs — Rust SDK guide: https://docs.resonatehq.io/develop/rust
- Sibling examples: https://github.com/resonatehq-examples/example-kafka-worker-ts, https://github.com/resonatehq-examples/example-kafka-worker-py
