0%

Using Celery broadcast tasks

I recently wanted to add an Agent component to one of my systems for tasks such as configuration updates and monitoring data collection.

Because the system is deployed with multiple instances, those agents may need to run the same task at the same time. Since we were already using Celery, its broadcast feature turned out to be a good fit: the same task can be delivered to every worker.

To test the idea, I first wrote a simple task:

1
2
3
4
5
6
7
8
9
# filename: agent.py
import datetime
from celery import shared_task

@shared_task(bind=True)
def test(self):
name = self.request.hostname
with open(f'/tmp/{name}', 'w') as f:
f.write(str(datetime.datetime.now()))

This task reads the worker hostname from self.request.hostname, then creates a file under /tmp with that hostname as the filename and the execution time as the content.

In other words, if the broadcast works correctly, multiple files should appear in /tmp at the same time, one for each worker bound to the queue.

Then I added the following Celery app configuration:

1
2
3
4
5
6
7
8
9
from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
'agent.test': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks'
}
}

This defines a queue named broadcast_tasks, and the agent.test task is routed through the broadcast_tasks exchange.

On the worker side, I started two workers bound to broadcast_tasks, one named agent1 and the other agent2:

1
2
celery -A boss2_manager worker --pidfile=/var/run/celeryworker_agent1.pid -Q broadcast_tasks -n agent1@%h
celery -A boss2_manager worker --pidfile=/var/run/celeryworker_agent2.pid -Q broadcast_tasks -n agent2@%h

If RabbitMQ is used as the broker, you can also see this in its management UI: a Celery broadcast queue is effectively a fanout exchange plus one queue per worker bound to it.

Then I simply triggered the task with delay():

1
2
3
from agent import test

test.delay()

Checking /tmp afterward, I found two files:

1
2
agent1@knktc-rmbp.local
agent2@knktc-rmbp.local

The timestamps inside them were almost the same.

That was enough to confirm the approach, so I ended up using this method for the Agent feature.

Reference

如果我的文字帮到了您,那么可不可以请我喝罐可乐?