Windows系统下【Celery任务队列】python使用celery 详解(二)
开发阶段的自动重载
celery -A celery_tasks worker --loglevel=info -P eventlet --autoreload
--autoreload
仅适用于开发环境,不建议在生产环境中使用,因为它可能会影响性能。
配置任务跟踪启动状态
app.conf.task_track_started = True
app.conf.task_track_started = True
是 Celery 的一个配置项,用于启用任务执行状态的跟踪。具体来说,它允许 Celery 在任务开始执行时更新任务状态为 STARTED
,而不仅仅是在任务完成(SUCCESS
)或失败(FAILURE
)时才更新状态。
1.配置任务过期时间
在 Celery 中,任务结果的过期时间由 result_expires
配置项控制。
Celery 的默认配置是 24 小时(86400 秒)
-
全局配置
# 设置任务结果保留 1 小时(3600 秒) app.conf.result_expires = 3600 # 秒
-
2.针对单个任务设置
使用@app.task
装饰器的expires
参数为特定任务单独设置过期时间# 定义发送消息任务 @app.task(expires=1800) # 该任务结果保留 30 分钟 def send_msg(message):print(f"开始发送消息: {message}")time.sleep(3)print(f"发送消息完成: {message}")return message
-
永久保留:设置为
None
或0
时,结果不会自动过期(需手动清理)。app.conf.result_expires = None # 永久保留
-
立即过期:设置为负数(如
-1
)时,结果会在任务完成后立即删除。app.conf.result_expires = -1 # 立即删除
2.获取异步任务的状态与结果
在 Celery 里,AsyncResult
类可用于获取异步任务的状态与结果。
2.1状态检查方法
ready()
:检查任务是否已经完成(无论成功还是失败),返回布尔值。- 频繁检查
result.ready()
会增加与结果后端(如 Redis)的通信开销
- 频繁检查
successful()
:检查任务是否成功完成,返回布尔值。如果任务仍在执行或失败,返回False
。failed()
:检查任务是否执行失败,返回布尔值。status
:获取任务的当前状态,常见的状态包括:PENDING
:任务等待执行或状态未知。STARTED
:任务已开始执行(需要配置task_track_started=True
才能看到此状态)。SUCCESS
:任务成功完成。FAILURE
:任务执行失败。REVOKED
:任务已被撤销。RETRY
:任务因异常正在重试。
2.2结果获取方法
由于任务是异步执行的,可能需要等待一段时间才能完成。可以使用 get()
方法阻塞当前线程,直到任务完成并返回结果,但要注意设置合理的超时时间,避免长时间阻塞。
get(timeout=None, propagate=True, interval=0.5)
等待任务完成并返回结果。timeout
:设置超时时间(秒),超过此时间将抛出TimeoutError
。propagate
:如果为True
(默认值),任务失败时会抛出异常;如果为False
,则返回异常对象。interval
:检查任务状态的间隔时间(秒)。
result
:获取任务的结果。如果任务尚未完成,返回None
(除非使用get()
方法等待)。
2.3任务控制方法
revoke(terminate=False, signal=None, wait=False, timeout=None)
撤销(取消)任务的执行。terminate
:如果为True
,会强制终止正在执行的任务。signal
:指定终止任务时使用的信号(如SIGKILL
)。wait
:如果为True
,会等待任务被撤销后再返回。timeout
:等待撤销操作完成的超时时间。
forget()
:从结果后端中删除任务结果,释放资源。调用此方法后,将无法再获取该任务的结果。
2.4错误信息获取方法
traceback
:获取任务失败时的详细堆栈跟踪信息。exception
:获取任务失败时抛出的异常对象。
2.5任务元数据方法
date_done
:获取任务完成的时间(datetime
对象)。children
:获取由该任务创建的子任务列表。
import time
from celery.result import AsyncResult
from celery_tasks import send_msg, app# 调用任务
result = send_msg.delay('hello world')# 获取任务 ID
task_id = result.id
print(f"Task ID: {task_id}")# 创建 AsyncResult 对象
async_result = AsyncResult(task_id, app=app)# 检查任务状态
print(f"Initial status: {async_result.status}")# 等待任务完成并获取结果(设置超时时间为 10 秒)
try:result_value = async_result.get(timeout=10, propagate=False)print(f"Task result: {result_value}")if async_result.successful():print("Task completed successfully.")elif async_result.failed():print(f"Task failed with exception: {async_result.exception}")print(f"Traceback: {async_result.traceback}")
except TimeoutError:print("Task timed out.")# 撤销任务async_result.revoke(terminate=True, signal='SIGKILL')print("Task revoked.")# 检查任务状态
print(f"Initial status: {async_result.status}")# 检查任务是否被撤销
if async_result.status == 'REVOKED':print("Task is revoked.")# 释放结果资源
async_result.forget()
print("Task result forgotten.")