0%

使用Monkey Patch让Celery Beat支持“待命”模式

我一直用Celery Beat来做定时任务,Beat这东西简单易用,但一直有个问题就是同时启动多个Beat实例时任务就会重复发起。我们之前用uwsgi的legion模式来控制同一时间只能有一个beat实例运行,这个其实依赖于一个靠谱的网络连接。最近遇到一个问题:两个跑beat的节点的网络连接可能不稳定,那uwsgi legion可能就不太稳妥了。对此只好看看如何手动来设置,让celery beat能进入待命的模式:虽然服务在跑,但是不生成定时任务。

阅读了下celery beat的源码,使用celery命令行工具启动beat worker的时候,调用的是celery.beat下的Service类中的start方法,应该只需要修改下这里,让beat空跑起来就可以了。

简单写了下Monkey Patch相关的代码,贴出来如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
import time
from celery.beat import info, debug, humanize_seconds, signals, platforms

STANDBY_CHECK_INTERVAL = 5

def is_standby():
return True if os.path.isfile('/tmp/standby.flag') else False

def patched_start(self, embedded_process=False):
""" monkey patched start method, will skip tasks when standby flag is setted """
info('beat: Starting...')
debug('beat: Ticking with max interval->%s',
humanize_seconds(self.scheduler.max_interval))

signals.beat_init.send(sender=self)
if embedded_process:
signals.beat_embedded_init.send(sender=self)
platforms.set_process_title('celery beat')

try:
while not self._is_shutdown.is_set():

# patch starts from here
if is_standby():
debug(f'beat: in standby mode, all tasks will be skipped, '
f'will check in [{STANDBY_CHECK_INTERVAL}] seconds')
time.sleep(STANDBY_CHECK_INTERVAL)
continue
# patch ends

interval = self.scheduler.tick()
if interval and interval > 0.0:
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
time.sleep(interval)
if self.scheduler.should_sync():
self.scheduler._do_sync()
except (KeyboardInterrupt, SystemExit):
self._is_shutdown.set()
finally:
self.sync()

简单说明下:

  • 这个示例中检查了/tmp目录中是否有standby.flag文件,如果有的话就算是进入了待命的状态;
  • 在while循环中先检查了这个文件是否存在,如果存在,则sleep 5秒钟,然后再次检查。

在使用的时候只需要在代码入口处应用这个patch就可以了,例如:

1
2
3
from celery.beat import Service

Service.start = patched_start

测试运行下,当celery beat启动后,只需要touch /tmp/standby.flag,创建标志文件,可以看到celery beat进入待命模式,不再生成定时任务。删除这个文件后,等待几秒钟,celery beat将会继续开始生成定时任务。

如果我的文字帮到了您,那么可不可以请我喝罐可乐?