A backend service has to trigger a Databricks job that may run for minutes to hours, then continue with downstream work once the job finishes. The Resonate shape of the solution is a single generator orchestrator that creates a durable promise with ctx.promise(), kicks off the Databricks job with the promise id as a notebook parameter, and yields on the promise until the notebook (running in the Databricks cluster, outside the worker process) calls back into the FastAPI app to resolve the promise by id. The example wires the FastAPI handlers, the orchestrator, and the Databricks JobsApi.run_now call together in roughly forty lines.
The shape of the solution
app = FastAPI()
resonate = Resonate().remote()
resonate.start()
# handlers
@app.get("/run")
async def get(id: str, url: str):
h = data_pipeline.begin_run(id, url)
if h.done():
return "I am done"
return "working on it"
@app.post("/resolve")
async def resolve(id: str, value: str):
resonate.promises.resolve(id, ikey=id, data=json.dumps(value))
@resonate.register
def data_pipeline(ctx: Context, url: str) -> Generator[Yieldable, Any, None]:
p = yield ctx.promise()
yield ctx.run(run_job, promise_id=p.id, job=int(os.environ["JOB_ID"]), url=url)
v = yield p
# yield ctx.run(send_email)
print(f"databricks execution has finished with value {v}")
return
def run_job(ctx: Context, promise_id: str, job: int, url: str) -> None:
client = ApiClient(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
)
jobs = JobsApi(client)
jobs.run_now(
job_id=job,
notebook_params={"promise_id": promise_id, "url": url},
jar_params=None,
python_params=None,
spark_submit_params=None,
)
# from example-databricks-in-the-loop-py/app.py:13-54data_pipeline is a generator. The three yield points are the durable shape of the workflow: create a promise, run the trigger step, wait on the promise. The Databricks notebook itself is not yielded — it runs in the Databricks cluster and is treated as an out-of-process actor that will eventually call POST /resolve?id=<promise_id>&value=<value> to wake the workflow.
The durable primitives in play
Resonate().remote()— constructs a client that uses the Resonate server as both promise store and message source. Defaults to localhost and togroup="default"(configured at construction, seeresonate/resonate.py:199). On the 0.6.x line this construction surface is the most likely thing to drift on a newer SDK release; the rest of the primitives below have been stable.app.py:14. SDK:resonate-sdk-py @ v0.6.7 resonate/resonate.py:195-254.resonate.start()— starts the bridge threads so this process begins claiming work for the group that was configured at construction (default:default).app.py:15. SDK:resonate-sdk-py @ v0.6.7 resonate/resonate.py:266-274.@resonate.register— registersdata_pipelineas a Resonate function under its own name and version 1, making it invocable by id.app.py:32. SDK:resonate-sdk-py @ v0.6.7 resonate/resonate.py:317-399.data_pipeline.begin_run(id, url)— starts the orchestrator asynchronously, keyed on the caller-suppliedid. Returns aHandlewithdone()andresult(). Duplicate calls with the sameidreattach rather than start a parallel run.app.py:21. SDK:resonate-sdk-py @ v0.6.7 resonate/resonate.py:1393-1426andresonate/models/handle.py:11-32.ctx.promise()— creates a durable promise inside the workflow and returns anRFIhandle whoseidcan be passed to outside systems. The promise lives in the Resonate server until resolved. The SDK docstring describes this primitive specifically as the HITL mechanism: "execution needs to pause until a human provides input or takes an action. The created promise can later be fulfilled usingResonate.promise.resolve()."app.py:34. SDK:resonate-sdk-py @ v0.6.7 resonate/resonate.py:1161-1224.ctx.run(run_job, ...)— schedulesrun_jobas a local effectively-once durable step. Alias forctx.lfc(). The step is checkpointed at invocation and result, so a worker crash afterJobsApi.run_nowreturns will not re-trigger the Databricks job on replay.app.py:35. SDK:resonate-sdk-py @ v0.6.7 resonate/resonate.py:863-886.yield p— suspends the generator on the durable promise. The worker is free to release the task; the server holds the suspension until the promise is resolved.app.py:36.resonate.promises.resolve(id, ikey=id, data=...)— low-level promise resolve. IssuesPATCH /promises/{id}with{"state": "RESOLVED", "value": {...}}against the Resonate server.app.py:29. SDK:resonate-sdk-py @ v0.6.7 resonate/stores/remote.py:213-239.
What the SDK handles vs. what you write
| SDK handles | You write |
|---|---|
Creating the durable promise in the server when ctx.promise() is yielded | The single p = yield ctx.promise() line |
Persisting the suspension when yield p runs, so the worker doesn't have to hold the generator in memory | The v = yield p line |
Routing the resumption back to a worker in the default group once /resolve patches the promise | The Resonate().remote() construction and resonate.start() |
Deduplicating begin_run(id, ...) so a retried HTTP call with the same id reattaches to the existing run | The choice to key begin_run on the caller-supplied id |
Checkpointing ctx.run(run_job, ...) at invocation and at result so once the result is durably recorded, replay does not re-call run_job (and therefore not JobsApi.run_now); a crash in the window between dispatch and result is not closed by this checkpoint — see the failure-modes section | The single ctx.run(run_job, ...) line |
Storing the resolved value as the durable promise's payload so yield p returns it | The data=json.dumps(value) argument on promises.resolve |
Treating promises.resolve(id, ikey=id, ...) as idempotent on (id, ikey) so a duplicate callback from Databricks is a no-op | Choosing ikey=id on the resolve call |
The orchestrator is straight-line code: create a promise, fire the job, await the result. The fact that the Databricks job runs in a separate cluster, on its own schedule, with no in-process connection back to the FastAPI worker, does not change the shape of the code. That coordination lives in the durable promise plus the resolve endpoint.
Failure modes covered
- The worker crashes between
ctx.promise()andctx.run(run_job, ...). The promise was already created server-side; on resume, the generator replays up to the checkpoint, which is thectx.runcall. The promise id is recovered from durable state, so the resolve endpoint is still talking about the same promise. - The worker crashes after
JobsApi.run_nowhas been dispatched but beforectx.runrecords its result. The next replay re-invokesrun_job, which callsJobsApi.run_nowa second time. The example does not setidempotency_tokenon the Databricks call, so this is the one failure mode the application would still need to harden on the Databricks side. The durable promise id is unchanged across the replay. - The worker crashes while suspended on
yield p. The task is freed; the server holds the suspension. When any worker in thedefaultgroup is alive and the promise is resolved, the server routes the resumption to it; the generator is reconstructed from durable state. - The Databricks notebook calls
/resolvetwice for the same job. Both calls hitresonate.promises.resolve(id, ikey=id, data=...)(app.py:29). The Resonate server treats resolves as idempotent on(id, ikey); the second call does not re-resume the workflow. - The caller retries
GET /run?id=foo&...after the pipeline has already completed.data_pipeline.begin_run(id=...)reattaches to the existing run;h.done()is true; the handler returns"I am done"(app.py:21-24). No second Databricks job is triggered. - The FastAPI process restarts between
/runand the Databricks callback. The/resolvehandler holds no workflow state — only a Resonate client and the promise id from the query string. A fresh process resolves the promise just as well.
When to reach for this pattern
- If a workflow has to trigger a long-running external system (Databricks job, ML training run, batch ETL, third-party render) and continue with downstream work when that system finishes, on a timescale of minutes to hours.
- If the external system's only natural callback channel is an HTTP webhook back to your service, not an in-process future.
- If you want the orchestration to read top-to-bottom in one function instead of being split across "trigger" and "on-completion" handlers wired through a queue.
- If duplicate start requests on the same external id must deduplicate to a single run of the external job, and a retried start after completion must return the stored outcome rather than re-trigger the job.
- If the suspended wait must survive worker restarts and load-balance across a worker group, without a separate scheduler or job table.
Sources
- Example repo: https://github.com/resonatehq-examples/example-databricks-in-the-loop-py
- Python SDK repo: https://github.com/resonatehq/resonate-sdk-py
- Resonate documentation: https://docs.resonatehq.io
- Human-in-the-Loop pattern page: https://docs.resonatehq.io/get-started/examples/human-in-the-loop
- Files cited in this post:
app.py:13-54— FastAPI app,/runand/resolvehandlers,data_pipelineorchestrator,run_jobsteptrigger.py:23-31— trigger scriptpyproject.toml:11— SDK pin- SDK references (all in
resonate-sdk-pyat tagv0.6.7):Resonate.remoteresonate/resonate.py:195-254;@resonate.registerresonate/resonate.py:317-399;ctx.promiseresonate/resonate.py:1161-1224;ctx.run/ctx.lfcresonate/resonate.py:863-886;Function.begin_run/Handleresonate/resonate.py:1393-1426andresonate/models/handle.py:11-32;PromiseStore.resolveresonate/stores/remote.py:213-239
