I recently wanted to add an Agent component to one of my systems for tasks such as configuration updates and monitoring data collection.
Because the system is deployed with multiple instances, those agents may need to run the same task at the same time. Since we were already using Celery, its broadcast feature turned out to be a good fit: the same task can be delivered to every worker.
To test the idea, I first wrote a simple task:
1 | # filename: agent.py |
This task reads the worker hostname from self.request.hostname, then creates a file under /tmp with that hostname as the filename and the execution time as the content.
In other words, if the broadcast works correctly, multiple files should appear in /tmp at the same time, one for each worker bound to the queue.
Then I added the following Celery app configuration:
1 | from kombu.common import Broadcast |
This defines a queue named broadcast_tasks, and the agent.test task is routed through the broadcast_tasks exchange.
On the worker side, I started two workers bound to broadcast_tasks, one named agent1 and the other agent2:
1 | celery -A boss2_manager worker --pidfile=/var/run/celeryworker_agent1.pid -Q broadcast_tasks -n agent1@%h |
If RabbitMQ is used as the broker, you can also see this in its management UI: a Celery broadcast queue is effectively a fanout exchange plus one queue per worker bound to it.
Then I simply triggered the task with delay():
1 | from agent import test |
Checking /tmp afterward, I found two files:
1 | agent1@knktc-rmbp.local |
The timestamps inside them were almost the same.
That was enough to confirm the approach, so I ended up using this method for the Agent feature.
Reference
- Celery broadcast documentation: https://docs.celeryq.dev/en/stable/userguide/routing.html#broadcast