A worker sitting between two Kafka topics has two failure shapes: head-of-line blocking when a slow message stalls a faster one behind it, and re-execution of already-completed steps when the worker crashes mid-task. Resonate's shape of solution is to turn the per-message handler into a durable workflow keyed by the message id, so the consumer can hand off non-blockingly and any crashed step is replayed from its last checkpoint. The example app reads record ids off a records_to_be_deleted Redpanda topic, runs a multi-step batch-deletion loop per id, and publishes a completion to records_that_were_deleted.
The shape of the solution
@resonate.register
def workflow(ctx, record_id, offset):
print(f"processing record {record_id} in position {offset}")
while (yield ctx.run(delete_batch, record_id)):
print(f"record {record_id} still has rows to delete")
yield ctx.sleep(5)
print(f"all rows deleted for record {record_id} in position {offset}")
yield ctx.run(enqueue, record_id, offset)
# from example-kafka-worker-py/record_deletor/delete.py:22-29delete_batch returns True while there are still rows to delete and False when the simulated work is finished, so the while loop terminates on False and the workflow falls through to enqueue. Each yield ctx.run(...) and yield ctx.sleep(...) is a durable checkpoint: the SDK records the invocation and the result before the generator is allowed to advance.
The consumer side does no orchestration of its own — it parses each Kafka message and hands the record id to workflow.begin_run, which is non-blocking:
record_id = json.loads(msg.value().decode("utf-8"))
if isinstance(record_id, str): # in case json.dumps(msg_id) was just a string
record_id = [record_id]
_ = workflow.begin_run(record_id[0], record_id[0], msg.offset())
# from example-kafka-worker-py/record_deletor/delete.py:43-46The first argument to begin_run is the durable promise id; the example reuses the record id for that, so duplicate Kafka deliveries collapse onto the same durable execution.
The durable primitives in play
Resonate.remote()— constructs a Resonate client backed by aRemoteStoreand a long-poll message source pointing at a Resonate Server on localhost. The worker registers under group"default"and is eligible to recover any workflow created against the same server. (record_deletor/delete.py:6; SDKresonate/resonate.py:194-256at tagv0.6.7.)@resonate.register— registers the generator under its__name__so it can be invoked by string and recovered by any worker in the same group. (record_deletor/delete.py:22; SDKresonate/resonate.py:317-396at v0.6.7.)workflow.begin_run(id, *args)— creates a durable promise keyed byidand returns aHandleimmediately so the consumer loop can advance to the next Kafka message. If a promise with that id already exists, the call reattaches instead of starting a second execution. (record_deletor/delete.py:46; SDKresonate/resonate.py:1393-1426at v0.6.7.)ctx.run(delete_batch, record_id)— runs the step as a child durable promise; on application error the SDK retries it with the default exponential-backoff policy without re-running earlier steps. (record_deletor/delete.py:25; SDKresonate/options.py:23,resonate/retry_policies/exponential.py:9-14at v0.6.7.)ctx.sleep(5)— durable sleep between batch attempts; survives worker restarts. The workflow does not wake early when a process comes back up. (record_deletor/delete.py:27; SDKresonate/resonate.py:1137-1160at v0.6.7.)ctx.run(enqueue, record_id, offset)— second checkpoint after the loop exits, so the completion publish is not re-issued on replay if it already succeeded against the durable promise. (record_deletor/delete.py:29.)
What the SDK handles vs. what you write
You write: a generator function that yields ctx.run(...) or ctx.sleep(...) at every step you want checkpointed, a confluent-kafka consumer that calls workflow.begin_run(record_id, ...) per message, and the imperative work each step actually performs (delete_batch, enqueue). You pick the durable promise id — here, the record id from the message payload — and you decide what counts as a step.
The SDK handles: persisting a durable promise per workflow keyed by id, persisting a child durable promise per ctx.run step, retrying failed steps with exponential backoff (default for plain functions; generators default to no retry and rely on replay), skipping already-completed steps on replay, scheduling durable sleep across restarts, deduplicating begin_run calls with the same id, and routing recovered work to any worker in the same group connected to the same Resonate Server.
Default retry policy is set in the SDK: Exponential() for plain functions, Never() for generator functions (resonate/options.py:23 at v0.6.7). That is why delete_batch (plain) automatically retries on its simulated 25% exception, but the workflow generator itself does not retry — it is replayed deterministically from its checkpoint trail.
Failure modes covered
- Worker crashes between batches.
delete_batchhas completed N times against the same record id, then the process dies. On restart, Resonate replays the workflow; the completedctx.runinvocations short-circuit to their stored results and the loop resumes at the next batch attempt. This is the explicit recovery scenario documented in README §Recovery (lines 55-61) and is the mechanism behindctx.run(record_deletor/delete.py:25). - A batch step throws a transient error.
delete_batchhas a simulated 25% chance of raising (record_deletor/delete.py:10-12). Because it is a plain function, the SDK's defaultExponentialpolicy retries thatctx.runinvocation with capped backoff (delay=1,factor=2,max_delay=30). The prior loop iterations and any earlier steps are not re-run; only the failed step retries. - The same record id is delivered twice. Kafka's at-least-once semantics permit duplicates, and the consumer has
enable.auto.commit=True(record_deletor/config.py:16) with no coordination between workflow start and offset commit. A worker crash betweenbegin_runand the next auto-commit leaves the offset uncommitted, so on restart the broker redelivers the same record id to whichever consumer in the group picks up the partition. Becausebegin_runis keyed onrecord_id(record_deletor/delete.py:46), the redelivered invocation attaches to the existing durable promise rather than starting a parallel workflow. README:122-127 calls this out: the record id as promise id guarantees no duplicate operations. - Head-of-line blocking on long-running messages. A single record may take many batch iterations (the loop continues while
delete_batchreturnsTrue). Because the consumer callsbegin_runand does not block on completion (record_deletor/delete.py:46), the next Kafka message is polled and dispatched immediately. Workflow execution proceeds concurrently on the SDK's worker pool, not in the consumer thread. - Worker pool failover. Multiple
record-deletorprocesses can run against the same Resonate Server. If the worker currently executing a workflow dies, another worker in the same group picks up the recovered invocation. README:60-61 demonstrates this by killing the leader and observing the other instance resume. - Completion-publish step crash. If the worker dies after the loop exits but before
enqueuefinishes, replay re-enters at the secondctx.run(record_deletor/delete.py:29). The publish only re-runs if its result was not yet checkpointed — Resonate guarantees the checkpoint, but the underlying Kafka producer is not transactional, so a crash strictly betweenproducer.flush()and the checkpoint write is the one window where a duplicate downstream publish is possible.
When to reach for this pattern
- If you have a queue or topic where a single message kicks off a multi-step task and you cannot afford to redo completed steps after a worker restart.
- If individual tasks vary widely in duration and you do not want a slow message to stall faster ones behind it on the same partition.
- If at-least-once delivery means the same task id may arrive twice and you want the second invocation to dedupe rather than run again.
- If the per-message task contains a polling or retry loop with sleeps that must survive process restarts.
- If you want to scale the worker pool horizontally and have recovered tasks resume on whichever process is healthy, without writing your own coordination.
- If the final step publishes a downstream message and you need at-most-one-effect across retries (with the caveat that the broker producer itself must cooperate for strict exactly-once).
Sources
- Example repo: https://github.com/resonatehq-examples/example-kafka-worker-py
- Resonate Python SDK: https://github.com/resonatehq/resonate-sdk-py
- Pinned SDK version:
resonate-sdk>=0.6.7(pyproject.toml:12) - Workflow generator:
record_deletor/delete.py:22-29 - Consumer loop and
begin_rundispatch:record_deletor/delete.py:31-52 - Topic creation:
setup/setup_topics.py:8-11 - Redpanda broker config:
redpanda/docker-compose.yml:3-15 Resonate.remoteclassmethod at v0.6.7:resonate/resonate.py:194-256(SDK repo, tagv0.6.7)Function.begin_runsemantics at v0.6.7:resonate/resonate.py:1393-1426(SDK repo, tagv0.6.7)ctx.run(alias ofctx.lfc) at v0.6.7:resonate/resonate.py:863-886(SDK repo, tagv0.6.7)ctx.sleepat v0.6.7:resonate/resonate.py:1137-1160(SDK repo, tagv0.6.7)- Default retry policy at v0.6.7:
resonate/options.py:23andresonate/retry_policies/exponential.py:9-14(SDK repo, tagv0.6.7) - Companion TypeScript port: https://github.com/resonatehq-examples/example-kafka-worker-ts
- Resonate docs: https://docs.resonatehq.io
