0%

使用Celery来广播任务

最近琢磨给自己的系统加上个Agent来实现配置变更和监控信息采集等任务,因为这个系统是多实例部署的,所以这些Agent可能需要同时运行相同的任务。因为我们正好在用celery,celery支持广播任务,所有的worker都能收到相同的任务,正好就能用这个特性来实现Agent的功能了。

为了测试下广播任务的效果,先写个示例任务:

1
2
3
4
5
6
7
8
9
# filename: agent.py
import datetime
from celery import shared_task

@shared_task(bind=True)
def test(self):
name = self.request.hostname
with open(f'/tmp/{name}', 'w') as f:
f.write(str(datetime.datetime.now()))

稍微解释下,test这个任务会获取执行该任务的worker的hostname,然后以该hostname为文件名在/tmp目录下创建出来文件,文件中包含有执行任务的时间。

也就是说,如果测试ok的话,在这个test任务执行的时候,在/tmp目录下应该会同时生成多个文件:有几个worker绑定到这个队列就会有几个文件。

然后在celery的app配置代码中加入以下的代码:

1
2
3
4
5
6
7
8
9
from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
'agent.test': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks'
}
}

这里定义了一个名为broadcast_tasks的队列,然后agent.test的任务都会通过broadcast_tasks这个exchange来发出。

在celery worker方面,试着启动这么两个worker绑定broadcast_tasks队列,这两个worker一个叫agent1,一个叫agent2:

1
2
celery -A boss2_manager worker --pidfile=/var/run/celeryworker_agent1.pid -Q broadcast_tasks -n agent1@%h
celery -A boss2_manager worker --pidfile=/var/run/celeryworker_agent2.pid -Q broadcast_tasks -n agent2@%h

celery worker启动后,如果使用了RabbitMQ作为broker的话,可以在管理界面中看到,广播类型的队列对于RabbitMQ来说就是一个fanout类型的exchange外加多个队列:启动一个worker就创建一个队列绑定到这个fanout exchange上。

试着直接用delay方法调用下这个test任务:

1
2
3
from agent import test

test.delay()

然后去/tmp目录下看看,发现已经出现了两个文件,命名分别为:

1
2
agent1@knktc-rmbp.local
agent2@knktc-rmbp.local

打开文件看看内容,两个文件中记录的时间相差不大。

OK,就用这个方法来做Agent吧。

参考

如果我的文字帮到了您,那么可不可以请我喝罐可乐?