knktc's Notes

python, cloud, linux...

0%

Add a Standby Mode to Celery Beat with a Monkey Patch

I have used Celery Beat for scheduled tasks for a long time. It is simple and useful, but it has one persistent problem: if multiple Beat instances run at the same time, tasks get scheduled more than once.

We previously relied on uWSGI legion mode to make sure only one Beat instance was active at a time, but that depends on having a reliable network connection. Recently I ran into a case where the network between two Beat nodes could be unstable, so uWSGI legion no longer felt safe enough. That led me to look for a way to make Celery Beat enter a “standby” mode: the service stays up, but it stops generating scheduled tasks.

After reading the Celery Beat source code, I found that the celery CLI eventually calls the start method on the Service class from celery.beat. So the easiest approach seemed to be patching that method.

Here is a simple monkey patch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
import time
from celery.beat import info, debug, humanize_seconds, signals, platforms

STANDBY_CHECK_INTERVAL = 5


def is_standby():
return True if os.path.isfile('/tmp/standby.flag') else False


def patched_start(self, embedded_process=False):
""" monkey patched start method, will skip tasks when standby flag is set """
info('beat: Starting...')
debug('beat: Ticking with max interval->%s',
humanize_seconds(self.scheduler.max_interval))

signals.beat_init.send(sender=self)
if embedded_process:
signals.beat_embedded_init.send(sender=self)
platforms.set_process_title('celery beat')

try:
while not self._is_shutdown.is_set():

if is_standby():
debug(f'beat: in standby mode, all tasks will be skipped, '
f'will check in [{STANDBY_CHECK_INTERVAL}] seconds')
time.sleep(STANDBY_CHECK_INTERVAL)
continue

interval = self.scheduler.tick()
if interval and interval > 0.0:
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
time.sleep(interval)
if self.scheduler.should_sync():
self.scheduler._do_sync()
except (KeyboardInterrupt, SystemExit):
self._is_shutdown.set()
finally:
self.sync()

A quick explanation:

  • This example checks whether /tmp/standby.flag exists.
  • If the file exists, Beat is considered to be in standby mode.
  • Inside the main loop, Beat checks for that file before scheduling tasks.
  • If the file is present, it sleeps for 5 seconds and checks again.

To use it, just apply the patch at your startup entry point:

1
2
3
from celery.beat import Service

Service.start = patched_start

After that, once Celery Beat has started, you can run touch /tmp/standby.flag to put it into standby mode. It will stop generating scheduled tasks. Remove the file, wait a few seconds, and Beat will resume normal scheduling.

如果我的文字帮到了您,那么可不可以请我喝罐可乐?