最近琢磨给自己的系统加上个Agent来实现配置变更和监控信息采集等任务,因为这个系统是多实例部署的,所以这些Agent可能需要同时运行相同的任务。因为我们正好在用celery,celery支持广播任务,所有的worker都能收到相同的任务,正好就能用这个特性来实现Agent的功能了。
为了测试下广播任务的效果,先写个示例任务:
1 | # filename: agent.py |
稍微解释下,test这个任务会获取执行该任务的worker的hostname,然后以该hostname为文件名在/tmp目录下创建出来文件,文件中包含有执行任务的时间。
也就是说,如果测试ok的话,在这个test任务执行的时候,在/tmp目录下应该会同时生成多个文件:有几个worker绑定到这个队列就会有几个文件。
然后在celery的app配置代码中加入以下的代码:
1 | from kombu.common import Broadcast |
这里定义了一个名为broadcast_tasks的队列,然后agent.test的任务都会通过broadcast_tasks这个exchange来发出。
在celery worker方面,试着启动这么两个worker绑定broadcast_tasks队列,这两个worker一个叫agent1,一个叫agent2:
1 | celery -A boss2_manager worker --pidfile=/var/run/celeryworker_agent1.pid -Q broadcast_tasks -n agent1@%h |
celery worker启动后,如果使用了RabbitMQ作为broker的话,可以在管理界面中看到,广播类型的队列对于RabbitMQ来说就是一个fanout类型的exchange外加多个队列:启动一个worker就创建一个队列绑定到这个fanout exchange上。
试着直接用delay方法调用下这个test任务:
1 | from agent import test |
然后去/tmp目录下看看,发现已经出现了两个文件,命名分别为:
1 | agent1@knktc-rmbp.local |
打开文件看看内容,两个文件中记录的时间相差不大。
OK,就用这个方法来做Agent吧。
参考
- Celery Broadcast的官方文档(吐槽下,最近官方把原来的celeryproject.org域名给搞丢了,最后只好搞了个新的~):https://docs.celeryq.dev/en/stable/userguide/routing.html#broadcast