5 min readResonate HQJust published

Coordinating a Databricks job from a backend in Python with Resonate

How a single generator orchestrator triggers a long-running Databricks job and blocks on a durable promise that the notebook resolves out-of-band.

Resonate brand card on a dark background with a plum spectrum wave at the bottom and the post headline in white Sansation.

A backend service has to trigger a Databricks job that may run for minutes to hours, then continue with downstream work once the job finishes. The Resonate shape of the solution is a single generator orchestrator that creates a durable promise with ctx.promise(), kicks off the Databricks job with the promise id as a notebook parameter, and yields on the promise until the notebook (running in the Databricks cluster, outside the worker process) calls back into the FastAPI app to resolve the promise by id. The example wires the FastAPI handlers, the orchestrator, and the Databricks JobsApi.run_now call together in roughly forty lines.

The shape of the solution

app = FastAPI()
resonate = Resonate().remote()
resonate.start()
 
 
# handlers
@app.get("/run")
async def get(id: str, url: str):
    h = data_pipeline.begin_run(id, url)
    if h.done():
        return "I am done"
    return "working on it"
 
 
@app.post("/resolve")
async def resolve(id: str, value: str):
    resonate.promises.resolve(id, ikey=id, data=json.dumps(value))
 
 
@resonate.register
def data_pipeline(ctx: Context, url: str) -> Generator[Yieldable, Any, None]:
    p = yield ctx.promise()
    yield ctx.run(run_job, promise_id=p.id, job=int(os.environ["JOB_ID"]), url=url)
    v = yield p
    # yield ctx.run(send_email)
    print(f"databricks execution has finished with value {v}")
    return
 
 
def run_job(ctx: Context, promise_id: str, job: int, url: str) -> None:
    client = ApiClient(
        host=os.environ["DATABRICKS_HOST"],
        token=os.environ["DATABRICKS_TOKEN"],
    )
    jobs = JobsApi(client)
    jobs.run_now(
        job_id=job,
        notebook_params={"promise_id": promise_id, "url": url},
        jar_params=None,
        python_params=None,
        spark_submit_params=None,
    )
# from example-databricks-in-the-loop-py/app.py:13-54

data_pipeline is a generator. The three yield points are the durable shape of the workflow: create a promise, run the trigger step, wait on the promise. The Databricks notebook itself is not yielded — it runs in the Databricks cluster and is treated as an out-of-process actor that will eventually call POST /resolve?id=<promise_id>&value=<value> to wake the workflow.

The durable primitives in play

  • Resonate().remote() — constructs a client that uses the Resonate server as both promise store and message source. Defaults to localhost and to group="default" (configured at construction, see resonate/resonate.py:199). On the 0.6.x line this construction surface is the most likely thing to drift on a newer SDK release; the rest of the primitives below have been stable. app.py:14. SDK: resonate-sdk-py @ v0.6.7 resonate/resonate.py:195-254.
  • resonate.start() — starts the bridge threads so this process begins claiming work for the group that was configured at construction (default: default). app.py:15. SDK: resonate-sdk-py @ v0.6.7 resonate/resonate.py:266-274.
  • @resonate.register — registers data_pipeline as a Resonate function under its own name and version 1, making it invocable by id. app.py:32. SDK: resonate-sdk-py @ v0.6.7 resonate/resonate.py:317-399.
  • data_pipeline.begin_run(id, url) — starts the orchestrator asynchronously, keyed on the caller-supplied id. Returns a Handle with done() and result(). Duplicate calls with the same id reattach rather than start a parallel run. app.py:21. SDK: resonate-sdk-py @ v0.6.7 resonate/resonate.py:1393-1426 and resonate/models/handle.py:11-32.
  • ctx.promise() — creates a durable promise inside the workflow and returns an RFI handle whose id can be passed to outside systems. The promise lives in the Resonate server until resolved. The SDK docstring describes this primitive specifically as the HITL mechanism: "execution needs to pause until a human provides input or takes an action. The created promise can later be fulfilled using Resonate.promise.resolve()." app.py:34. SDK: resonate-sdk-py @ v0.6.7 resonate/resonate.py:1161-1224.
  • ctx.run(run_job, ...) — schedules run_job as a local effectively-once durable step. Alias for ctx.lfc(). The step is checkpointed at invocation and result, so a worker crash after JobsApi.run_now returns will not re-trigger the Databricks job on replay. app.py:35. SDK: resonate-sdk-py @ v0.6.7 resonate/resonate.py:863-886.
  • yield p — suspends the generator on the durable promise. The worker is free to release the task; the server holds the suspension until the promise is resolved. app.py:36.
  • resonate.promises.resolve(id, ikey=id, data=...) — low-level promise resolve. Issues PATCH /promises/{id} with {"state": "RESOLVED", "value": {...}} against the Resonate server. app.py:29. SDK: resonate-sdk-py @ v0.6.7 resonate/stores/remote.py:213-239.

What the SDK handles vs. what you write

SDK handlesYou write
Creating the durable promise in the server when ctx.promise() is yieldedThe single p = yield ctx.promise() line
Persisting the suspension when yield p runs, so the worker doesn't have to hold the generator in memoryThe v = yield p line
Routing the resumption back to a worker in the default group once /resolve patches the promiseThe Resonate().remote() construction and resonate.start()
Deduplicating begin_run(id, ...) so a retried HTTP call with the same id reattaches to the existing runThe choice to key begin_run on the caller-supplied id
Checkpointing ctx.run(run_job, ...) at invocation and at result so once the result is durably recorded, replay does not re-call run_job (and therefore not JobsApi.run_now); a crash in the window between dispatch and result is not closed by this checkpoint — see the failure-modes sectionThe single ctx.run(run_job, ...) line
Storing the resolved value as the durable promise's payload so yield p returns itThe data=json.dumps(value) argument on promises.resolve
Treating promises.resolve(id, ikey=id, ...) as idempotent on (id, ikey) so a duplicate callback from Databricks is a no-opChoosing ikey=id on the resolve call

The orchestrator is straight-line code: create a promise, fire the job, await the result. The fact that the Databricks job runs in a separate cluster, on its own schedule, with no in-process connection back to the FastAPI worker, does not change the shape of the code. That coordination lives in the durable promise plus the resolve endpoint.

Failure modes covered

  • The worker crashes between ctx.promise() and ctx.run(run_job, ...). The promise was already created server-side; on resume, the generator replays up to the checkpoint, which is the ctx.run call. The promise id is recovered from durable state, so the resolve endpoint is still talking about the same promise.
  • The worker crashes after JobsApi.run_now has been dispatched but before ctx.run records its result. The next replay re-invokes run_job, which calls JobsApi.run_now a second time. The example does not set idempotency_token on the Databricks call, so this is the one failure mode the application would still need to harden on the Databricks side. The durable promise id is unchanged across the replay.
  • The worker crashes while suspended on yield p. The task is freed; the server holds the suspension. When any worker in the default group is alive and the promise is resolved, the server routes the resumption to it; the generator is reconstructed from durable state.
  • The Databricks notebook calls /resolve twice for the same job. Both calls hit resonate.promises.resolve(id, ikey=id, data=...) (app.py:29). The Resonate server treats resolves as idempotent on (id, ikey); the second call does not re-resume the workflow.
  • The caller retries GET /run?id=foo&... after the pipeline has already completed. data_pipeline.begin_run(id=...) reattaches to the existing run; h.done() is true; the handler returns "I am done" (app.py:21-24). No second Databricks job is triggered.
  • The FastAPI process restarts between /run and the Databricks callback. The /resolve handler holds no workflow state — only a Resonate client and the promise id from the query string. A fresh process resolves the promise just as well.

When to reach for this pattern

  • If a workflow has to trigger a long-running external system (Databricks job, ML training run, batch ETL, third-party render) and continue with downstream work when that system finishes, on a timescale of minutes to hours.
  • If the external system's only natural callback channel is an HTTP webhook back to your service, not an in-process future.
  • If you want the orchestration to read top-to-bottom in one function instead of being split across "trigger" and "on-completion" handlers wired through a queue.
  • If duplicate start requests on the same external id must deduplicate to a single run of the external job, and a retried start after completion must return the stored outcome rather than re-trigger the job.
  • If the suspended wait must survive worker restarts and load-balance across a worker group, without a separate scheduler or job table.

Sources