knktc's Notes

python, cloud, linux...

0%

Use uWSGI Spooler as a Simple Async Queue

While building closestack, a virtual machine management project, I tried to keep deployment simple and avoided using Celery or other heavier async stacks.

That worked for a while, but it was easy to predict that as the number of users grew, waiting times would grow too. So I needed a simpler asynchronous solution to improve concurrency without bringing in RabbitMQ, Redis, and Celery.

uWSGI already provides a built-in queue manager called spooler, which turned out to be a good fit.

Why not Celery?

Because once Celery enters the picture, a lot more infrastructure and operational concerns show up too.

The problem still needs solving

Without adding RabbitMQ, Redis, or Celery, uWSGI‘s built-in spooler can be used to handle queueing and asynchronous work.

Since most of my projects already use the classic Nginx + uWSGI + Django pattern, I looked more carefully through the uWSGI documentation and found that it also offers features such as shared queues, spoolers, and cron-like task management.

This post introduces the spooler approach. Full documentation is here:

http://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/Spooler.html

Try it out

Everything below was tested in a Python 3.6 environment.

Build a simple uWSGI + Django service

  1. Create and activate a virtual environment:
1
2
python3 -m venv demo_venv
source demo_venv/bin/activate
  1. Install Django and uWSGI:
1
2
pip install django
pip install uwsgi
  1. Create a demo Django project:
1
django-admin startproject demo

This creates a project structure like:

1
2
3
4
5
6
7
demo/
├── demo
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── manage.py
  1. Create a uwsgi.ini file under the project directory:
1
2
3
4
5
6
7
[uwsgi]
http = 127.0.0.1:8080
chdir = /Users/knktc/tmp/demo
wsgi-file = demo/wsgi.py
home = /Users/knktc/tmp/demo_venv
processes = 4
threads = 2

Here I used http instead of socket, so Nginx is not required during testing.

  1. Start it:
1
uwsgi uwsgi.ini

Then open http://127.0.0.1:8080 to confirm the service is running.

Add async tasks

Create the spooler directory

According to the documentation, the spooler stores queued tasks as files inside a directory, and the spooler process watches those files to implement a task queue.

Add this to uwsgi.ini:

1
spooler = %(chdir)/demo/tasks

After restarting uWSGI, the tasks directory will be created automatically, and the startup log will contain something like:

1
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks with pid 7363

Write tasks

Create a write_task endpoint that inserts a task into the queue:

1
2
3
4
5
6
7
import uwsgi
from django.http import HttpResponse


def write_task(request):
uwsgi.spool({b'body': b'hello world'})
return HttpResponse('done!')

In Python 3, both keys and values passed to uwsgi.spool() must be bytes, otherwise uWSGI raises:

spooler callable dictionary must contains only bytes

Add the route in urls.py:

1
2
3
4
5
6
from django.urls import path
from demo.views import write_task

urlpatterns = [
path('write_task/', write_task),
]

Restart and visit:

http://127.0.0.1:8080/write_task/

The browser should show done!, and you should see a spool file appear under tasks/.

Process tasks

Now create a worker in worker.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import os
import time
import uwsgi
import datetime

LOG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log.txt')


def worker(arguments):
time.sleep(5)
current_datetime = str(datetime.datetime.now())
data = str(arguments.get('body'))
with open(LOG_FILE, 'a') as f:
f.write('{}:{}\n'.format(current_datetime, data))
return uwsgi.SPOOL_OK


uwsgi.spooler = worker

This worker waits 5 seconds, then writes the current time and message body into log.txt.

uwsgi.SPOOL_OK means the task completed successfully and the spool file should be removed.

To activate this worker, add the following to uwsgi.ini:

1
import = demo/worker.py

After restarting uWSGI, you should see that the task files disappear and log.txt is created, for example:

1
2018-07-23 09:59:09.019494:b'hello world'

If you hit the endpoint several times in a row, the HTTP requests complete quickly instead of waiting 5 seconds, which means the task handling is asynchronous.

Advanced usage

Multiple spooler workers

You can increase parallelism by setting:

1
spooler-processes = 4

After restarting, uWSGI will spawn multiple spooler processes for the same queue, allowing tasks to be processed in parallel.

Task dispatching

The spooler itself is intentionally simple. By default, one worker function receives all queued tasks. If you need multiple task types, you can build your own dispatcher.

You can also define multiple spooler directories:

1
2
spooler = %(chdir)/demo/tasks_a
spooler = %(chdir)/demo/tasks_b

Then add two endpoints:

1
2
3
4
urlpatterns = [
path('write_task_a/', write_task_a),
path('write_task_b/', write_task_b),
]

And write tasks into the corresponding spooler paths:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def write_task_a(request):
task_config = {
'type': 'a',
'message': 'this is task A'
}
uwsgi.spool({b'body': json.dumps(task_config).encode('utf8'),
b'spooler': b'/Users/knktc/tmp/demo/demo/tasks_a'})
return HttpResponse('done!')


def write_task_b(request):
task_config = {
'type': 'b',
'message': 'this is task B'
}
uwsgi.spool({b'body': json.dumps(task_config).encode('utf8'),
b'spooler': b'/Users/knktc/tmp/demo/demo/tasks_b'})
return HttpResponse('done!')

To process them differently, turn the worker into a dispatcher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import os
import json
import uwsgi
import datetime

LOG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log.txt')


def worker_dispatcher(arguments):
data = arguments.get('body').decode('utf8')
config = json.loads(data)
worker_type = config.get('type')
message = config.get('message')
if worker_type == 'a':
worker_a(message)
elif worker_type == 'b':
worker_b(message)

return uwsgi.SPOOL_OK


def worker_a(message):
current_datetime = str(datetime.datetime.now())
with open(LOG_FILE, 'a') as f:
f.write('{}:{}\n'.format(current_datetime, message))
return True


def worker_b(message):
current_datetime = str(datetime.datetime.now())
with open(LOG_FILE, 'a') as f:
f.write('{}:{}\n'.format(current_datetime, message))
return True


uwsgi.spooler = worker_dispatcher

Now the dispatcher examines each task body and routes the work to the correct handler.

After restarting uWSGI and calling write_task_a/ and write_task_b/ alternately, you will see separate tasks appear in the two spooler directories, and the final log.txt will show both task types being processed.

Reference

如果我的文字帮到了您,那么可不可以请我喝罐可乐?