且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Python编程:Django中使用Celery执行异步任务和定时任务

更新时间:2022-09-05 10:26:31

使用步骤

1、安装

pip install django django-celery

2、新建工程

$ django-admin.py startproject celery_project
$ python manage.py startapp course
$ cd celery_project

项目结构

├── celery_project
│   ├── __init__.py
│   ├── celery_config.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── course
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── migrations
│   │   ├── __init__.py
│   ├── models.py
│   ├── tasks.py
│   ├── tests.py
│   └── views.py
├── db.sqlite3
└── manage.py

3、新建任务

course/tasks.py

# -*- coding: utf-8 -*-

import time
from celery.task import Task


class CourseTask(Task):
    name = "course-task"

    def run(self, *args, **kwargs):
        print("start course task")
        time.sleep(4)  # 模拟耗时
        print(args, kwargs)
        print("end course task")

4、配置celery

celery_project/celery_config.py

# -*- coding: utf-8 -*-

import djcelery
from datetime import timedelta

djcelery.setup_loader()


BROKER_BACKEND = "redis"

BROKER_URL = 'redis://localhost:6379/1'

CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'


# 设置任务队列
CELERY_QUEUES = {
    "beat_queue": {
        "exchange": "beat_queue",
        "exchange_type": "direct",
        "binding_key": "beat_queue"
    },
    "work_queue": {
        "exchange": "work_queue",
        "exchange_type": "direct",
        "binding_key": "work_queue"
    }
}

# 设置默认队列
CELERY_DEFAULT_QUEUE = "work_queue"

# 注册任务函数
CELERY_IMPORTS = (
    "course.tasks",
)

# 设置时区,默认UTC
CELERY_TIMEZONE = 'Asia/Shanghai'

# 有些情况下防止死锁
CELERYD_FORCE_EXECV = True

# 设置并发的worker数量
CELERYD_CONCURRENCY = 4

# 允许重试
CELERYD_ACKS_LATE = True

# 每个worker最多执行100个任务,防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 100

# 单个任务最大运行时间
CELERYD_TASK_TIME_LIMIT = 6 * 60

5、配置settings

celery_project/settings.py

# 添加app
INSTALLED_APPS = [
    ...
    'djcelery',
    'course'
]

# 引入 Celery设置
from .celery_config import *

6、编写视图函数

course/views.py

# -*- coding: utf-8 -*-

from course.tasks import CourseTask
from django.http import JsonResponse


def do(reqeust):
    # 执行异步任务
    print("start do")
    CourseTask.apply_async(args=("hello", ), queue="work_queque")
    print("end do")
    return JsonResponse({"result": "ok"})

7、配置路由

celery_project/urls.py

from course import views

urlpatterns = [
    url(r'^do/$', views.do)
]

8、启动Django服务和 worker

$ python manage.py help
$ python manage.py runserver
$ python manage.py celery worker -l INFO

# 或者
$ python manage.py celery worker -Q queue

访问测试:http://127.0.0.1:8000/do/

9、设置定时任务

celery_project/celery_config.py

# 设置定时任务
CELERYBEAT_SCHEDULE = {
    "course-task": {
        "task": "course-task",
        "schedule": timedelta(seconds=5),
        "options": {
            "queue": "beat_queue"
        }
    }
}

10、启动beat

$ python manage.py celery beat -l INFO

监控工具flower

安装

pip install flower

启动

celery flower --address=0.0.0.0 --port=5555 --broker=redis://localhost:6379/1 --basic_auth=user:123

django中启动

python manage.py celery flower

访问管理界面 http://localhost:5555

总结

Django中使用Celery 只是多了3个步骤:

1、task任务编写

2、celery配置

3、启动celery-worker