python-rq
is one of the key components in designing system based on task scheduler architectures. This provides a playground to start the designing. Design involves lots of challenges. Merging multiple jobs is one of the challenges. Here we will see how to handle this in python-rq
environment.
While designing system workflow using task queues, we might end up situations like, one job waiting for multiple other jobs to complete. In celery
we have chords and groups to handle this. How can we do this in python-rq
?
Master worker
creates job for Aggregator
which indicates output from Worker 1
and Worker 2
should be available to process this job. A sample json structure will look as below. Master Worker
creates a token which will be used across workers.
{
"token": "5a3b64a3-63a9-449c-9385-b306d114c164",
"worker1": {
"processed": false
},
"worker2": {
"processed": false
},
"other_params": {}
}
In the same way Master Worker
creates job for Worker 1
and Worker 2
. Sample json body will look like below
{
"token": "5a3b64a3-63a9-449c-9385-b306d114c164",
"aggregator_queue": "aggregator_1_queue",
"other_params": {}
}
Care abouts
We can have multiple workers for aggregator. To ensure atomicity, all the jobs for a given token
should go to the same aggregator. For this, we can pass the aggregator queue name for the workers as part of job data to direct the job to the same aggregator.
The Aggregator
For a given job/token, aggregator receives 3 inputs. While receiving the first input, aggregator will create a context for the job. A sample context will look like.
{
"token": "5a3b64a3-63a9-449c-9385-b306d114c164",
"received":
{
"key-info": false,
"worker1": true,
"worker2": false
},
"other_params":{}
}
After processing for the input, the context will be stored in redis
.
redis_connection.mset({token:json.dumps(job_context)})
When the next inputs received, the context will be restored from redis
and further processing will be done. The received
session in context will be updated to reflect the status of input.
json.loads(redis_conneciton.get(token))
Once all the input received, the next course of action will be taken in.
And finally, the context should be deleted from redis.
redis_connection.delete(token)
Further…
The designing involves other challenges like
- workflows with many-1-many connections
- task scheduling using
python-rq
for containerised tasks.
We will try to address these in the coming days.