高阶应用-celery
阅读 (290071)一、问题
用户发起request,并且要等待response返回。但是在视图中有一些耗时的操作,导致用户可能会等待很长时间才能接受response,这样用户体验很差
网站每隔一段时间要同步一次数据,但是http请求是需要触发的
celery网址:http://docs.jinkan.org/docs/celery/
二、celery模块包含
-
任务task
本质是一个python函数,将耗时操作封装成一个函数
-
队列queue
将要执行的任务放队列里
-
工人worker
负责执行队列中的任务
-
代理broker
负责调度,在部署环境中使用redis
三、解决
- 将耗时的操作放到celery中执行
- 定时执行
四、安装
- pip install celery==3.1.26
- pip install celery-with-redis=3.0
- pip install django-celery=3.3.1
- pip install redis==2.10.6
五、配置settings.py
-
INSTALLED_APPS 添加
djcelery
-
在settings下方添加如下代码
import djcelery djcelery.setup_loader()#初始化 BROKER_URL='redis://:密码@127.0.0.1:6379/0'#0带表16个库中使用第0个库 CELERY_IMPORTS=('App.task') #myApp是项目名
六、创建task.py文件
路径 App/task.py
from celery import task
import time
@task
def tt():
print("lucky is a good man")
time.sleep(5)
print("lucky is a nice man")
七、迁移,生成celery需要的数据库表
python manage.py migrate
八、在工程project目录下创建celery.py的文件
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'whthas_home.settings')
app = Celery('portal')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
九、在工程目录下的project目录下的__init__.py文件中添加
from .celery import app as celery_app
十、视图
from .task import tt
def celery(request):
tt.delay() # 添加到celery中执行,不会阻塞
return HttpResponse("测试celery")
传参:
可以在tt.delay([参数]) delay中添加参数
十一、启动顺序
-
启动redis
localhost:~ xialigang$ redis-cli
-
启动服务
python manange.py runserver
-
启动worker
python manage.py celery worker --loglevel=info
十二、修改报错(解释器为3.7版本)
报错信息
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/utils/timer2.py", line 19
from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
^
原因:python3.7版本中async是关键字,所以报错
解决方法有两种:
-
降python版本,重新安装Python版本为3.7以下 如3.6
-
把kombu.async模块重命名,再把引用处修改过来
把所有async的引入位置改为async1
将site-packages\kombu\async -> site-packages\kombu\async1
site-packages/celery/concurrency/asynpool.py", line 42
site-packages/kombu/transport/redis.py"
line 815 862 871 876 877 914 920 925
site-packages/celery/worker/autoscale.py", line 21
site-packages/celery/worker/components.py", line 14
site-packages/celery/worker/strategy.py", line 13
十三、定时执行
-
定时执行一个任务
-
在settings.py文件中添加如下代码
from datetime import timedelta # 在settings.py文件添加 CELERYBEAT_SCHEDULE = { 'schedule-test': { 'task': 'App.task.test', #App的名称 task为你创建任务的py文件名 test为你的任务的名称 'schedule': timedelta(seconds=3), 'args': (2,) }, }
-
启动顺序:
-
启动Django
Python3 manage.py runserver
-
启动worker
python manage.py celery worker --loglevel=info
-
开启定时任务
python manage.py celery beat --loglevel=info(或者celery -A 你的工程名称 beat -l info)
-
-
-
定时多个任务
-
settings.py添加如下代码
from datetime import timedelta # 在settings.py文件添加 CELERYBEAT_SCHEDULE = { #第一个任务 'schedule-test1': { 'task': 'App.mytask.test', 'schedule': timedelta(seconds=3), 'args': (2,) }, #第二个任务 'schedule-test2': { 'task': 'App.mytask.test2', 'schedule': timedelta(seconds=3), }, }
-
App/task.py
from celery import task import time @task def test1(i): print('打印',i) time.sleep(5) print('打印',i) @task def test2(): print('test2') time.sleep(5) print('test2')
-