0%

使用uwsgi实现异步任务

在开发closestack这个虚拟机管理项目的时候,本着精简的原则,我没有使用celery等异步的方案,部署倒是简单了,但是可以预见,如果同时使用系统的人员增加的话,可能出现等待时间超长的问题。因此需要使用过一个简单的方法来实现异步,以解决并发的问题。

为啥不用Celery?

因为我感觉用了celery之后要考虑的问题就更多了,索性直接部署openstack得了。

问题还要解决

在不搭建rabbitmq,redis和celery的情况下,实际上可以采用uwsgi内置的队列管理器spooler来实现队列和异步的任务。由于uwsgi在我的大部分项目中都有使用(经典模式nginx+uwsgi+django),在仔细阅读uwsgi的文档后发现,实际上uwsgi还提供一些额外的功能,例如共享队列、队列管理器和cron计划任务管理。

本文就介绍了如何使用uwsgi的spooler来实现异步任务的功能,完整的文档请参考:http://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/Spooler.html

尝试

本文介绍的操作步骤均在python3.6的环境中完成,请保证自己使用python3环境!

搭建一个简单的uwsgi+Django服务

为了完成实验,需要先搭建一个uwsg+django的服务,对于我们的后端开发人员来说应该已经很熟悉了:

1.创建并激活venv

1
2
python3 -m venv demo_venv
source demo_venv/bin/active

以下的步骤操作请保持在venv环境中进行,否则可能出现系统异常。

2.安装django,uwsgi等组件

1
2
pip install django
pip install uwsgi

3.创建demo django项目

1
django-admin startproject demo

这一步将完成django项目的创建,默认情况下会生成一个demo的目录以及一个名为demo的app。我们直接使用这个app即可。

以下为demo项目的文件和目录结构:

1
2
3
4
5
6
7
demo/
├── demo
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── manage.py

4.在demo app的目录下创建uwsgi的配置文件(命名为uwsgi.ini),配置文件内容如下:

1
2
3
4
5
6
7
[uwsgi]
http = 127.0.0.1:8080
chdir = /Users/knktc/tmp/demo
wsgi-file = demo/wsgi.py
home = /Users/knktc/tmp/demo_venv
processes = 4
threads = 2

注意,这个配置中没有使用socket而是使用了http,这样不需要nginx我们也可以直接用浏览器来访问服务器进行调试了。

5.尝试启动一把:

1
uwsgi uwsgi.ini

可以在浏览器中访问http://127.0.0.1:8080 来看到服务启动成功了:

添加异步任务

创建目录

根据uwsgi的文档,spooler功能实际上是通过目录+文件的形式来存储了任务,并使用spooler进程去检查目录中文件的变化,从而实现了任务队列的效果。

要启用任务队列,需要在uwsgi的配置文件中启用spooler功能,我们在配置文件中加入spooler目录,然后重启uwsgi,即可启用spooler功能:

1
2
# 该配置在/Users/knktc/tmp/demo/demo目录下创建出了一个tasks目录,用于存放spooler的任务文件
spooler = %(chdir)/demo/tasks

重启uwsgi后,可以看到在/Users/knktc/tmp/demo/demo下已生成了一个tasks目录。在uwsgi的启动日志中还可以看到如下的信息:

1
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks with pid 7363

写入任务

我们创建一个名为write_task的接口,访问该接口就向任务队列中写入数据:

新建一个views.py文件来编写接口:

1
2
3
4
5
6
7
import uwsgi
from django.http import HttpResponse


def write_task(request):
uwsgi.spool({b'body': b'hello world'})
return HttpResponse('done!')

您的IDE可能无法识别uwsgi这个包,因为这个包只有在uwsgi启动时才会出现。

注意,在python3中,uwsgi.spool后的参数名和值都必须是bytes类型,否则会产生如下的错误:

spooler callable dictionary must contains only bytes

urls.py文件中的配置为:

1
2
3
4
5
6
from django.urls import path
from demo.views import write_task

urlpatterns = [
path('write_task/', write_task),
]

重启服务,用浏览器访问下http://127.0.0.1:8080/write_task/

可以看到,浏览器中显示了**done!**,表明请求成功,此时在tasks目录中可以看到生成了一个文件,文件名可能是:

1
uwsgi_spoolfile_on_knktc-rbmp_7463_1_2094189768_1532338701_697166

这就是我们生成的一个任务。

运行任务

要读取队列中的消息并运行任务,我们还需要编写一个worker程序。我们可以创建一个名为worker.py的文件,然后写入如下的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os
import time
import uwsgi
import datetime

LOG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log.txt')

def worker(arguments):
time.sleep(5)
current_datetime = str(datetime.datetime.now())
data = str(arguments.get('body'))
with open(LOG_FILE, 'a') as f:
f.write('{}:{}\n'.format(current_datetime, data))
return uwsgi.SPOOL_OK

# register the worker
uwsgi.spooler = worker

这段代码中,worker先等待了5秒后才开始执行,将当前时间和body的消息写到了与worker.py同目录下的log.txt文件中。函数结尾的return uwsgi.SPOOL_OK 表明任务成功,任务文件将会被删除。

官方文档中指出了对任务执行的几种状态:

-2 (SPOOL_OK) —— 任务已完成,将会移除spool文件

-1 (SPOOL_RETRY) —— 暂时错误,在下一个spooler迭代将会重试该任务。

0 (SPOOL_IGNORE) —— 忽略此任务,如果实例中加载了多个语言,那么它们所有都会竞争管理该任务。这个返回值允许你跳过特定的语言的任务。

要让这个woker生效,还需要在uwsgi.ini中加一句:

1
import = demo/worker.py

重启uwsgi,此时可以看到tasks中的文件消失了,同时生成了log.txt,内容为:

1
2018-07-23 09:59:09.019494:b'hello world'

尝试连续多次访问http://127.0.0.1:8080/write_task/ ,可以发现每次请求都很快地完成了,而不是需要等待5秒钟,异步任务的效果已经实现了。

检查log.txt的内容,可以看到确实是按照间隔5秒的时间间距来逐个完成了任务:

1
2
3
4
5
6
2018-07-23 10:05:50.076336:b'hello world'
2018-07-23 10:05:55.079512:b'hello world'
2018-07-23 10:06:00.081813:b'hello world'
2018-07-23 10:06:05.083541:b'hello world'
2018-07-23 10:06:10.086147:b'hello world'
2018-07-23 10:06:15.093093:b'hello world'

进阶

多个spooler

可直接在uwsgi配置中写入spooler process的配置,增加worker的进程,以实现并行地对任务进行处理:

1
spooler-processes = 4

以上的配置将spooler的进程设置为4,在重新启动uwsgi时可以看到四个spooler的进程:

1
2
3
4
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks with pid 7923
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks with pid 7924
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks with pid 7925
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks with pid 7926

此时再执行几次write_task接口,则可以看到几个任务几乎同时完成了,以此可以实现一些提高并发的效果:

1
2
3
4
2018-07-23 10:27:39.366717:b'hello world'
2018-07-23 10:27:39.981558:b'hello world'
2018-07-23 10:27:40.620102:b'hello world'
2018-07-23 10:27:41.216263:b'hello world'

任务分发

由于spooler的任务队列实在有点简单,发送到队列中的任务只有同一个worker函数在监听,在closestack这个项目中,可能需要一部分worker执行创建虚拟机的工作,另一部分worker执行清理虚拟机环境的工作,要实现对多种类型任务的处理,只能自己通过一些手段来实现了。

在uwsgi的配置文件中可以设置多个spooler,相当于产生了多个队列,下面的配置会产生两个目录tasks_a和tasks_b:

1
2
spooler = %(chdir)/demo/tasks_a
spooler = %(chdir)/demo/tasks_b

重启uwsgi,可以在日志中看到,针对每个spooler都启动了4个进程:

1
2
3
4
5
6
7
8
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_a with pid 12065
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_a with pid 12066
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_a with pid 12067
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_a with pid 12068
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_b with pid 12069
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_b with pid 12070
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_b with pid 12071
spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_b with pid 12072

为了对两个可以分别写入任务,我们增加了两个新的接口:

urls.py中的配置为:

1
2
3
4
5
6
7
from django.urls import path
from demo.views import write_task_a, write_task_b

urlpatterns = [
path('write_task_a/', write_task_a),
path('write_task_b/', write_task_b),
]

相应地,views.py里面的接口代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def write_task_a(request):
task_config = {
'type': 'a',
'message': 'this is task A'
}
uwsgi.spool({b'body': json.dumps(task_config).encode('utf8'), b'spooler': b'/Users/knktc/tmp/demo/demo/tasks_a'})
return HttpResponse('done!')


def write_task_b(request):
task_config = {
'type': 'b',
'message': 'this is task B'
}
uwsgi.spool({b'body': json.dumps(task_config).encode('utf8'), b'spooler': b'/Users/knktc/tmp/demo/demo/tasks_b'})
return HttpResponse('done!')

可以看到,我们在向队列中写入任务时,需要指定一个spooler的参数,参数值为spooler目录的完整路径(指定相对路径还不行)。这样,针对两个接口的请求,将会分别向不同的spooler目录下写入任务。

为了能够分别地处理两个队列中的任务,我们需要把woker也给改造下,改造成为一个dispacher,对不同的任务分别指定程序来进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import os
import json
import uwsgi
import datetime

LOG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log.txt')

def worker_dispatcher(arguments):
data = arguments.get('body').decode('utf8')
config = json.loads(data)
worker_type = config.get('type')
message = config.get('message')
if worker_type == 'a':
worker_a(message)
elif worker_type == 'b':
worker_b(message)

return uwsgi.SPOOL_OK


def worker_a(message):
current_datetime = str(datetime.datetime.now())
with open(LOG_FILE, 'a') as f:
f.write('{}:{}\n'.format(current_datetime, message))
return True


def worker_b(message):
current_datetime = str(datetime.datetime.now())
with open(LOG_FILE, 'a') as f:
f.write('{}:{}\n'.format(current_datetime, message))
return True


uwsgi.spooler = worker_dispatcher

在上面的代码中,worker_dispatcher函数接收到消息,然后解析消息内容,根据消息内容将任务分别分发给两个不同的函数来进行处理,从而实现任务分发的效果。

重启uwsgi,随后交替访问http://127.0.0.1:8080/write_task_a/http://127.0.0.1:8080/write_task_b/ 两个接口,可以看到在tasks_a和tasks_b目录下分别生成了不同的任务,同时在结果的log.txt文件中,可以看到以下的结果:

1
2
3
4
2018-07-24 05:27:10.877155:this is task A
2018-07-24 05:27:14.472384:this is task B
2018-07-24 05:27:20.142162:this is task A
2018-07-24 05:27:24.819657:this is task B

OK, 任务分发的效果达到了。

参考

如果我的文字帮到了您,那么可不可以请我喝罐可乐?