Celery-分布式任务队列
1. 定义
Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时为作提供维护此类系统所需的工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度
- 安装
pip install celery
2.celery执行流程
-
producer-生产者
异步任务、定时任务 -
broker
消息传输的中间件,生产者一旦有消息发送,将发至broker;【RQ,redis】 -
worker - 工作者
消费/执行broker中消息/任务的进程 -
backend
用于存储消息/任务结果,如果需要跟踪和查询任务状态,则需添加要配置相关
3.使用celery
- 创建异步任务函数[tasks.py] - @app.task
import timefrom celery import Celery# 1.初始化celery对象
# 通过使用本机redis且没有密码,使用远程redis有密码格式为'redis://:密码@ip:6379/1'
app = Celery("dashopt", broker="redis://:@127.0.0.1:6379/1", backend="redis://:@127.0.0.1:6379/1")# 2.创建异步任务函数
@app.task()
def send_message(m, n):print("hahahaha")time.sleep(10)print("哈哈哈哈哈")return m + n
-
终端启动worker
celery -A 文件名 worker -l info
celery -A tasks worker -l info
-
将异步任务推送到消息中间件broker中
异步函数名.delay(参数1, 参数2)
另外起一个终端shell窗口,执行以下代码
from tasks import send_message
send_message.delay("10", "20")
4.Django和Celery结合流程**
1. 创建django项目
django-admin startproject djcelery
2. 创建celery配置文件【和settings.py同路径】
"""celery配置文件
"""
import os
from celery import Celery
from django.conf import settings# 1.为celery设置环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djcelery.settings")# 2.初始化celery应用
app = Celery("dashopt", broker="redis://:@127.0.0.1:6379/1")# 3.设置自动发现异步任务
app.autodiscover_tasks(settings.INSTALLED_APPS)
3. 在应用下创建tasks.py【存放异步任务】
import time
from djcelery.celery import app@app.task()
def async_task():print("111")time.sleep(5)print("222")
4. 视图函数中调用异步任务【delay(参数1, 参数2)】
import time
from django.shortcuts import render
from django.http import HttpResponse
from .tasks import async_task# Create your views here.
def test_celery(request):print("hahahahha")# 有一个阻塞任务,比如:发邮件async_task.delay()return HttpResponse(time.strftime("%H:%M:%S"))
5. 终端启动celery
`celery -A dashopt worker -l info`
6. 浏览器中测试【注册用户】
5.使用场景
Celery 就是 Django 的「分布式任务外挂」——把耗时的、需要异步的、定时的工作丢给它,后台慢慢跑,前端瞬间返回。
场景 | 为什么用 Celery |
---|---|
发送注册邮件 / 短信 | 发邮件慢,用户不能干等 5 秒 |
图片 / 视频转码 | CPU 密集,不能阻塞 Web 进程 |
定时报表 | 每天 0 点自动生成 Excel 并发邮件 |
秒杀队列 | 高并发先把订单压进队列,后台慢慢写库 |