当前位置: 首页 > ds >正文

python中使用高并发分布式队列库celery的那些坑

python中使用高并发分布式队列库celery的那些坑

      • 🌟 简单理解
      • 🛠️ 核心功能
      • 🚀 工作机制
      • 📦 示例代码(使用 Redis 作为 broker)
      • 🔗 常见搭配
      • 📦 我的环境
      • 📦第一个问题
      • 📦第二个问题
        • 原因分析

Celery 是一个用于 分布式任务队列 的 Python 库,常用于处理异步任务(即任务不需要立即执行,后台慢慢做),尤其适合执行定时任务或耗时操作。


🌟 简单理解

Celery 就是让你把“任务”扔到后台执行,而不是阻塞当前程序。


🛠️ 核心功能

功能说明
异步任务执行比如发邮件、处理图片、生成报告等不需要立即完成的操作。
分布式任务调度可以运行在多台服务器上,实现任务负载均衡。
定时任务(周期任务)类似 crontab,可设置任务定时执行(如每天 8 点发日报)。
任务重试机制失败任务可以自动重试,适用于网络波动等场景。
与Django/Flask集成非常适合与这些 Web 框架配合使用,将长耗时任务下放到 Celery。

🚀 工作机制

Celery 一般由以下几部分组成:

  1. Producer(生产者):你写的代码,会将任务“发”出去。
  2. Broker(中间人):任务先存放在消息队列(如 Redis、RabbitMQ)中。
  3. Worker(工人):后台运行的进程,专门“接收”和“执行”这些任务。
  4. Result Backend(结果后端):可选,记录任务结果,如执行成功或失败。

📦 示例代码(使用 Redis 作为 broker)

# tasks.py
from celery import Celeryapp = Celery('mytasks', broker='redis://localhost:6379/0')@app.task
def add(x, y):return x + y

运行方式:

celery -A tasks worker --loglevel=info

调用方式(异步执行):

add.delay(3, 5)  # 返回一个异步结果对象

🔗 常见搭配

  • 消息中间件:Redis、RabbitMQ(推荐 Redis 简单易用)
  • Web框架集成:Django、Flask
  • 配合 Flower、Prometheus、Grafana 等工具可实现任务监控

如果你正在开发一个 需要做“异步处理”或“后台任务”的系统,Celery 是 Python 中的主流选择之一。但是该库看似简单,却隐藏着无数坑,本文就带大家了解一下我在使用过程中遇到的那些坑。

📦 我的环境

  • windows 10
  • python 3.12
  • celery 5.5.2

📦第一个问题

执行命令:

celery -A main_async:celery_app worker --loglevel=info

报错:

[2025-05-29 19:40:22,107: INFO/MainProcess] Task main_async.background_content_similarity[4c84e1c8-6a13-4241-8e62-04e17b3884cb] received
[2025-05-29 19:40:22,142: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')
billiard.einfo.RemoteTraceback:
"""
Traceback (most recent call last):File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362, in workloopresult = (True, prepare_result(fun(*args, **kwargs)))^^^^^^^^^^^^^^^^^^^^File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640, in fast_trace_tasktasks, accept, hostname = _loc^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0)
"""The above exception was the direct cause of the following exception:Traceback (most recent call last):File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362, in workloopresult = (True, prepare_result(fun(*args, **kwargs)))^^^^^^^^^^^^^^^^^^^^File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640, in fast_trace_tasktasks, accept, hostname = _loc^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0)

该问题是由于celery的默认并发网络编程线程库引起的,换成eventlet可以解决问题,只需修改启动命令即可,如下:

celery -A main_async:celery_app worker --loglevel=info -P eventlet

📦第二个问题

第二个问题是日志问题,报错类似如下所示:

'LoggingProxy' object has no attribute 'encoding'"
原因分析

Celery 在启动 worker 时,默认会将标准输出和标准错误重定向到其日志系统中。这意味着 sys.stdout 和 sys.stderr 被替换为 LoggingProxy 对象。然而,某些库或代码可能期望这些对象具有标准文件对象的属性,如 encoding 或 fileno,从而导致 AttributeError。

此时只需要将worker_redirect_stdouts参数设置为False即可解决问题,代码如下:

# Celery 配置
celery_app.conf.update(task_serializer="json",accept_content=["json"],result_serializer="json",timezone="Asia/Shanghai",enable_utc=True,include=["main_async"],  # 显式指定任务模块task_track_started=True,  # 跟踪任务开始状态task_ignore_result=False,  # 保存任务结果task_store_errors_even_if_ignored=True,  # 存储错误worker_redirect_stdouts = False	# 禁止将stdout和stderr重定向到当前记录器。
)
http://www.xdnf.cn/news/9850.html

相关文章:

  • 深入解析Java8核心新特性(Optional、新的日期时间API、接口增强)
  • Android AIDL Hal最低保证出现的问题
  • CSS基础巩固-选择
  • 【大模型02】Deepseek使用和prompt工程
  • PH热榜 | 2025-05-29
  • leetcode235.二叉搜索树的最近公共祖先:迭代法利用有序性高效寻根
  • 【音频处理】java流式调用ffmpeg命令
  • 《Python 应用中的蓝绿部署与滚动更新:持续集成中的实践与优化》
  • Java设计模式从基础到实际运用
  • 【redis实战篇】第六天
  • 一根网线连接两台电脑组建局域网
  • 不起火,不爆炸,高速摄像机、数字图像相关DIC技术在动力电池新国标安全性能测试中的应用
  • 代码随想录算法训练营第60期第五十一天打卡
  • R3GAN训练自己的数据集
  • Java中float和double的区别与用法解析
  • 华为OD机试真题——阿里巴巴找黄金宝箱(III)(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • WPF 全局加载界面、多界面实现渐变过渡效果
  • DexWild:野外机器人策略的灵巧人机交互
  • 华为OD机试真题——简单的自动曝光平均像素(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • 如何更好的理解云计算和云原生?
  • JDBC连接数据库精准提炼
  • MongoDB(七) - MongoDB副本集安装与配置
  • Python 中的 if-elif-else 语句与控制流详解:从基础到高级应用
  • 电感专题归纳
  • Unity-QFramework框架学习-MVC、Command、Event、Utility、System、BindableProperty
  • 深入理解 SELinux:通过 Nginx 和 SSH 服务配置实践安全上下文与端口策略
  • 家庭路由器改装,搭建openwrt旁路由以及手机存储服务器,实现外网节点转发、内网穿透、远程存储、接入满血DeepSeek方案
  • LVS+keepalived高可用群集
  • mac笔记本如何快捷键截图后自动复制到粘贴板
  • 首发!PPIO派欧云上线DeepSeek-R1-0528-Qwen3-8B蒸馏模型