在开发closestack这个虚拟机管理项目的时候,本着精简的原则,我没有使用celery等异步的方案,部署倒是简单了,但是可以预见,如果同时使用系统的人员增加的话,可能出现等待时间超长的问题。因此需要使用过一个简单的方法来实现异步,以解决并发的问题。
为啥不用Celery?
因为我感觉用了celery之后要考虑的问题就更多了,索性直接部署openstack得了。
问题还要解决
在不搭建rabbitmq,redis和celery的情况下,实际上可以采用uwsgi内置的队列管理器spooler来实现队列和异步的任务。由于uwsgi在我的大部分项目中都有使用(经典模式nginx+uwsgi+django),在仔细阅读uwsgi的文档后发现,实际上uwsgi还提供一些额外的功能,例如共享队列、队列管理器和cron计划任务管理。
本文就介绍了如何使用uwsgi的spooler来实现异步任务的功能,完整的文档请参考:http://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/Spooler.html
尝试
本文介绍的操作步骤均在python3.6的环境中完成,请保证自己使用python3环境!
搭建一个简单的uwsgi+Django服务
为了完成实验,需要先搭建一个uwsg+django的服务,对于我们的后端开发人员来说应该已经很熟悉了:
1.创建并激活venv
1 | python3 -m venv demo_venv |
以下的步骤操作请保持在venv环境中进行,否则可能出现系统异常。
2.安装django,uwsgi等组件
1 | pip install django |
3.创建demo django项目
1 | django-admin startproject demo |
这一步将完成django项目的创建,默认情况下会生成一个demo的目录以及一个名为demo的app。我们直接使用这个app即可。
以下为demo项目的文件和目录结构:
1 | demo/ |
4.在demo app的目录下创建uwsgi的配置文件(命名为uwsgi.ini),配置文件内容如下:
1 | [uwsgi] |
注意,这个配置中没有使用socket而是使用了http,这样不需要nginx我们也可以直接用浏览器来访问服务器进行调试了。
5.尝试启动一把:
1 | uwsgi uwsgi.ini |
可以在浏览器中访问http://127.0.0.1:8080 来看到服务启动成功了:
添加异步任务
创建目录
根据uwsgi的文档,spooler功能实际上是通过目录+文件的形式来存储了任务,并使用spooler进程去检查目录中文件的变化,从而实现了任务队列的效果。
要启用任务队列,需要在uwsgi的配置文件中启用spooler功能,我们在配置文件中加入spooler目录,然后重启uwsgi,即可启用spooler功能:
1 | # 该配置在/Users/knktc/tmp/demo/demo目录下创建出了一个tasks目录,用于存放spooler的任务文件 |
重启uwsgi后,可以看到在/Users/knktc/tmp/demo/demo下已生成了一个tasks目录。在uwsgi的启动日志中还可以看到如下的信息:
1 | spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks with pid 7363 |
写入任务
我们创建一个名为write_task的接口,访问该接口就向任务队列中写入数据:
新建一个views.py文件来编写接口:
1 | import uwsgi |
您的IDE可能无法识别uwsgi这个包,因为这个包只有在uwsgi启动时才会出现。
注意,在python3中,uwsgi.spool后的参数名和值都必须是bytes类型,否则会产生如下的错误:
spooler callable dictionary must contains only bytes
urls.py文件中的配置为:
1 | from django.urls import path |
重启服务,用浏览器访问下http://127.0.0.1:8080/write_task/
可以看到,浏览器中显示了**done!**,表明请求成功,此时在tasks目录中可以看到生成了一个文件,文件名可能是:
1 | uwsgi_spoolfile_on_knktc-rbmp_7463_1_2094189768_1532338701_697166 |
这就是我们生成的一个任务。
运行任务
要读取队列中的消息并运行任务,我们还需要编写一个worker程序。我们可以创建一个名为worker.py的文件,然后写入如下的代码:
1 | import os |
这段代码中,worker先等待了5秒后才开始执行,将当前时间和body的消息写到了与worker.py同目录下的log.txt文件中。函数结尾的return uwsgi.SPOOL_OK 表明任务成功,任务文件将会被删除。
官方文档中指出了对任务执行的几种状态:
-2 (SPOOL_OK) —— 任务已完成,将会移除spool文件
-1 (SPOOL_RETRY) —— 暂时错误,在下一个spooler迭代将会重试该任务。
0 (SPOOL_IGNORE) —— 忽略此任务,如果实例中加载了多个语言,那么它们所有都会竞争管理该任务。这个返回值允许你跳过特定的语言的任务。
要让这个woker生效,还需要在uwsgi.ini中加一句:
1 | import = demo/worker.py |
重启uwsgi,此时可以看到tasks中的文件消失了,同时生成了log.txt,内容为:
1 | 2018-07-23 09:59:09.019494:b'hello world' |
尝试连续多次访问http://127.0.0.1:8080/write_task/ ,可以发现每次请求都很快地完成了,而不是需要等待5秒钟,异步任务的效果已经实现了。
检查log.txt的内容,可以看到确实是按照间隔5秒的时间间距来逐个完成了任务:
1 | 2018-07-23 10:05:50.076336:b'hello world' |
进阶
多个spooler
可直接在uwsgi配置中写入spooler process的配置,增加worker的进程,以实现并行地对任务进行处理:
1 | spooler-processes = 4 |
以上的配置将spooler的进程设置为4,在重新启动uwsgi时可以看到四个spooler的进程:
1 | spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks with pid 7923 |
此时再执行几次write_task接口,则可以看到几个任务几乎同时完成了,以此可以实现一些提高并发的效果:
1 | 2018-07-23 10:27:39.366717:b'hello world' |
任务分发
由于spooler的任务队列实在有点简单,发送到队列中的任务只有同一个worker函数在监听,在closestack这个项目中,可能需要一部分worker执行创建虚拟机的工作,另一部分worker执行清理虚拟机环境的工作,要实现对多种类型任务的处理,只能自己通过一些手段来实现了。
在uwsgi的配置文件中可以设置多个spooler,相当于产生了多个队列,下面的配置会产生两个目录tasks_a和tasks_b:
1 | spooler = %(chdir)/demo/tasks_a |
重启uwsgi,可以在日志中看到,针对每个spooler都启动了4个进程:
1 | spawned the uWSGI spooler on dir /Users/knktc/tmp/demo/demo/tasks_a with pid 12065 |
为了对两个可以分别写入任务,我们增加了两个新的接口:
urls.py中的配置为:
1 | from django.urls import path |
相应地,views.py里面的接口代码为:
1 | def write_task_a(request): |
可以看到,我们在向队列中写入任务时,需要指定一个spooler的参数,参数值为spooler目录的完整路径(指定相对路径还不行)。这样,针对两个接口的请求,将会分别向不同的spooler目录下写入任务。
为了能够分别地处理两个队列中的任务,我们需要把woker也给改造下,改造成为一个dispacher,对不同的任务分别指定程序来进行处理:
1 | import os |
在上面的代码中,worker_dispatcher函数接收到消息,然后解析消息内容,根据消息内容将任务分别分发给两个不同的函数来进行处理,从而实现任务分发的效果。
重启uwsgi,随后交替访问http://127.0.0.1:8080/write_task_a/和http://127.0.0.1:8080/write_task_b/ 两个接口,可以看到在tasks_a和tasks_b目录下分别生成了不同的任务,同时在结果的log.txt文件中,可以看到以下的结果:
1 | 2018-07-24 05:27:10.877155:this is task A |
OK, 任务分发的效果达到了。
参考
- 其实有人已经封装好了uwsgi的spooler功能,可在django中直接使用: https://github.com/Bahus/uwsgi_tasks