组团学

高阶应用-celery

阅读 (290071)

一、问题

用户发起request,并且要等待response返回。但是在视图中有一些耗时的操作,导致用户可能会等待很长时间才能接受response,这样用户体验很差

网站每隔一段时间要同步一次数据,但是http请求是需要触发的

celery网址:http://docs.jinkan.org/docs/celery/

二、celery模块包含

  • 任务task

    本质是一个python函数,将耗时操作封装成一个函数

  • 队列queue

    将要执行的任务放队列里

  • 工人worker

    负责执行队列中的任务

  • 代理broker

    负责调度,在部署环境中使用redis

三、解决

  • 将耗时的操作放到celery中执行
  • 定时执行

四、安装

  • pip install celery==3.1.26
  • pip install celery-with-redis=3.0
  • pip install django-celery=3.3.1
  • pip install redis==2.10.6

五、配置settings.py

  • INSTALLED_APPS 添加

    djcelery

  • 在settings下方添加如下代码

    import djcelery djcelery.setup_loader()#初始化 BROKER_URL='redis://:密码@127.0.0.1:6379/0'#0带表16个库中使用第0个库 CELERY_IMPORTS=('App.task') #myApp是项目名

六、创建task.py文件

路径 App/task.py

from celery import task import time @task def tt(): print("lucky is a good man") time.sleep(5) print("lucky is a nice man")

七、迁移,生成celery需要的数据库表

python manage.py migrate

八、在工程project目录下创建celery.py的文件

from __future__ import absolute_import import os from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'whthas_home.settings') app = Celery('portal') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))

九、在工程目录下的project目录下的__init__.py文件中添加

from .celery import app as celery_app

十、视图

from .task import tt def celery(request): tt.delay() # 添加到celery中执行,不会阻塞 return HttpResponse("测试celery")

传参:
可以在tt.delay([参数]) delay中添加参数

十一、启动顺序

  • 启动redis

    localhost:~ xialigang$ redis-cli

  • 启动服务

    python manange.py runserver

  • 启动worker

    python manage.py celery worker --loglevel=info

十二、修改报错(解释器为3.7版本)

报错信息

File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/celery/utils/timer2.py", line 19 from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger ^

原因:python3.7版本中async是关键字,所以报错

解决方法有两种:

  • 降python版本,重新安装Python版本为3.7以下 如3.6

  • 把kombu.async模块重命名,再把引用处修改过来

    把所有async的引入位置改为async1

    将site-packages\kombu\async -> site-packages\kombu\async1

    site-packages/celery/concurrency/asynpool.py", line 42

    site-packages/kombu/transport/redis.py"

    line 815 862 871 876 877 914 920 925

    site-packages/celery/worker/autoscale.py", line 21

    site-packages/celery/worker/components.py", line 14

    site-packages/celery/worker/strategy.py", line 13

十三、定时执行

  • 定时执行一个任务

    • 在settings.py文件中添加如下代码

      from datetime import timedelta # 在settings.py文件添加 CELERYBEAT_SCHEDULE = { 'schedule-test': { 'task': 'App.task.test', #App的名称 task为你创建任务的py文件名 test为你的任务的名称 'schedule': timedelta(seconds=3), 'args': (2,) }, }
    • 启动顺序:

      • 启动Django

        Python3 manage.py runserver

      • 启动worker

        python manage.py celery worker --loglevel=info

      • 开启定时任务

        python manage.py celery beat --loglevel=info(或者celery -A 你的工程名称 beat -l info)

  • 定时多个任务

    • settings.py添加如下代码

      from datetime import timedelta # 在settings.py文件添加 CELERYBEAT_SCHEDULE = { #第一个任务 'schedule-test1': { 'task': 'App.mytask.test', 'schedule': timedelta(seconds=3), 'args': (2,) }, #第二个任务 'schedule-test2': { 'task': 'App.mytask.test2', 'schedule': timedelta(seconds=3), }, }
    • App/task.py

      from celery import task import time @task def test1(i): print('打印',i) time.sleep(5) print('打印',i) @task def test2(): print('test2') time.sleep(5) print('test2')
需要 登录 才可以提问哦