Handling multiple job dependencies in RQ

Lijo Jose
2 min readDec 7, 2020

python-rq is one of the key components in designing system based on task scheduler architectures. This provides a playground to start the designing. Design involves lots of challenges. Merging multiple jobs is one of the challenges. Here we will see how to handle this in python-rq environment.

While designing system workflow using task queues, we might end up situations like, one job waiting for multiple other jobs to complete. In celery we have chords and groups to handle this. How can we do this in python-rq ?

Master worker creates job for Aggregator which indicates output from Worker 1 and Worker 2 should be available to process this job. A sample json structure will look as below. Master Worker creates a token which will be used across workers.

{
"token": "5a3b64a3-63a9-449c-9385-b306d114c164",
"worker1": {
"processed": false
},
"worker2": {
"processed": false
},
"other_params": {}
}

In the same way Master Worker creates job for Worker 1 and Worker 2. Sample json body will look like below

{
"token": "5a3b64a3-63a9-449c-9385-b306d114c164",
"aggregator_queue": "aggregator_1_queue",
"other_params": {}
}

Care abouts

We can have multiple workers for aggregator. To ensure atomicity, all the jobs for a given token should go to the same aggregator. For this, we can pass the aggregator queue name for the workers as part of job data to direct the job to the same aggregator.

The Aggregator

For a given job/token, aggregator receives 3 inputs. While receiving the first input, aggregator will create a context for the job. A sample context will look like.

{
"token": "5a3b64a3-63a9-449c-9385-b306d114c164",
"received":
{
"key-info": false,
"worker1": true,
"worker2": false
},
"other_params":{}
}

After processing for the input, the context will be stored in redis.

redis_connection.mset({token:json.dumps(job_context)})

When the next inputs received, the context will be restored from redis and further processing will be done. The received session in context will be updated to reflect the status of input.

json.loads(redis_conneciton.get(token))

Once all the input received, the next course of action will be taken in.

And finally, the context should be deleted from redis.

redis_connection.delete(token)

Further…

The designing involves other challenges like

  • workflows with many-1-many connections
  • task scheduling using python-rq for containerised tasks.

We will try to address these in the coming days.

--

--