当前位置: 首页 > backend >正文

kombu 运行超长时间任务导致RabbitMQ消费者断开

kombu 运行超长时间任务导致RabbitMQ消费者断开

  • 测试代码
  • 问题分析
  • 解决方法
    • 使用异步线程运行超长时间任务 防止心跳阻塞

在排查异步任务没有执行的问题时,发现RabbitMQ管理端消息队列消费者异常断开,页面显示 … no consumers … 如下图所示
在这里插入图片描述
文章会引入测试代码模拟超长时间的异步任务,以及分析消费者为啥断开,最后给出解决方案

测试代码

异步任务执行使用的时 kombu的示例代码,参考 https://docs.celeryq.dev/projects/kombu/en/stable/userguide/examples.html
在这里插入图片描述

  1. queues.py
from __future__ import annotationsfrom kombu import Exchange, Queuetask_exchange = Exchange('kombu_big_tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),Queue('midpri', task_exchange, routing_key='midpri'),Queue('lopri', task_exchange, routing_key='lopri')]

queues.py 定义了3个队列,它们绑定到同一个交换机,使用与队列名相同的routing_key。通过不同的队列名区分优先级(hipri高优先级、midpri中优先级、lopri低优先级)

  1. worker.py
from __future__ import annotationsimport osos.environ['KOMBU_LOG_CONNECTION'] = 'True'
os.environ['KOMBU_LOG_CHANNEL'] = 'True'import logginglogging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)from kombu.log import get_logger
from kombu.mixins import ConsumerMixin
from kombu.utils.functional import reprcallfrom queues import task_queueslogger = get_logger(__name__)class Worker(ConsumerMixin):def __init__(self, connection):self.connection = connectiondef get_consumers(self, Consumer, channel):return [Consumer(queues=task_queues,accept=['pickle', 'json'],callbacks=[self.process_task],prefetch_count=1)]def process_task(self, body, message):fun = body['fun']args = body['args']kwargs = body['kwargs']logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))try:fun(*args, **kwargs)except Exception as exc:logger.error('task raised exception: %r', exc)message.ack()if __name__ == '__main__':from kombu import Connectionfrom kombu.utils.debug import setup_logging# setup root loggersetup_logging(loglevel=logging.DEBUG)with Connection('amqp://guest:guest@localhost:5672//', heartbeat=60) as conn:try:worker = Worker(conn)worker.run()except KeyboardInterrupt:print('bye bye')

work.py 主要用来执行异步任务,作为RabbitMQ 的消费者

  1. tasks.py
from __future__ import annotationsimport timedef hello_task(who='world'):print(f"execute hello task")time.sleep(360)print(f'Hello {who}')

tasks.py 声明需要执行的任务的方法,由work.py 消费者调度。在这里引入time模块来模拟执行超长时间任务的情况

  1. client.py
from __future__ import annotationsfrom kombu.pools import producersfrom queues import task_exchangepriority_to_routing_key = {'high': 'hipri','mid': 'midpri','low': 'lopri',
}def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):payload = {'fun': fun, 'args': args, 'kwargs': kwargs}routing_key = priority_to_routing_key[priority]with producers[connection].acquire(block=True) as producer:producer.publish(payload,serializer='pickle',compression='bzip2',exchange=task_exchange,declare=[task_exchange],routing_key=routing_key)if __name__ == '__main__':from kombu import Connectionfrom tasks import hello_taskconnection = Connection('amqp://guest:guest@localhost:5672//')send_as_task(connection, fun=hello_task, args=('Kombu',), kwargs={},priority='high')

client.py 发送消息到RabbitMQ,属于生产者。消息体里面包含方法、参数

问题分析

  1. 执行 worker.py,会发现管理端会产生1个exchange以及3个queue
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    运行的日志输出
[Kombu connection:0x1eb707e80d0] establishing connection...
2025-07-19 15:55:05 - kombu.connection - DEBUG - [Kombu connection:0x1eb707e80d0] establishing connection...
2025-07-19 15:55:05 - amqp - DEBUG - Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@Win10-2022QOGYT', 'copyright': 'Copyright (c) 2007-2025 Broadcom Inc and/or its subsidiaries', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 27.3.4.2', 'product': 'RabbitMQ', 'version': '4.1.2'}, mechanisms: [b'ANONYMOUS', b'PLAIN', b'AMQPLAIN'], locales: ['en_US']
[Kombu connection:0x1eb707e80d0] connection established: <Connection: amqp://guest:**@127.0.0.1:5672// at 0x1eb707e80d0>
2025-07-19 15:55:05 - kombu.connection - DEBUG - [Kombu connection:0x1eb707e80d0] connection established: <Connection: amqp://guest:**@127.0.0.1:5672// at 0x1eb707e80d0>
2025-07-19 15:55:05 - kombu.mixins - INFO - Connected to amqp://guest:**@127.0.0.1:5672//
[Kombu connection:0x1eb707e80d0] create channel
2025-07-19 15:55:05 - kombu.connection - DEBUG - [Kombu connection:0x1eb707e80d0] create channel
2025-07-19 15:55:05 - amqp - DEBUG - using channel_id: 1
2025-07-19 15:55:05 - amqp - DEBUG - Channel open
[Kombu channel:1] exchange_declare(exchange='kombu_big_tasks', type='direct', durable=True, auto_delete=False, arguments=None, nowait=False, passive=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] exchange_declare(exchange='kombu_big_tasks', type='direct', durable=True, auto_delete=False, arguments=None, nowait=False, passive=False)
[Kombu channel:1] prepare_queue_arguments({}, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] prepare_queue_arguments({}, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None)
[Kombu channel:1] queue_declare(queue='hipri', passive=False, durable=True, exclusive=False, auto_delete=False, arguments={}, nowait=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] queue_declare(queue='hipri', passive=False, durable=True, exclusive=False, auto_delete=False, arguments={}, nowait=False)
[Kombu channel:1] queue_bind(queue='hipri', exchange='kombu_big_tasks', routing_key='hipri', arguments=None, nowait=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] queue_bind(queue='hipri', exchange='kombu_big_tasks', routing_key='hipri', arguments=None, nowait=False)
[Kombu channel:1] exchange_declare(exchange='kombu_big_tasks', type='direct', durable=True, auto_delete=False, arguments=None, nowait=False, passive=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] exchange_declare(exchange='kombu_big_tasks', type='direct', durable=True, auto_delete=False, arguments=None, nowait=False, passive=False)
[Kombu channel:1] prepare_queue_arguments({}, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] prepare_queue_arguments({}, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None)
[Kombu channel:1] queue_declare(queue='midpri', passive=False, durable=True, exclusive=False, auto_delete=False, arguments={}, nowait=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] queue_declare(queue='midpri', passive=False, durable=True, exclusive=False, auto_delete=False, arguments={}, nowait=False)
[Kombu channel:1] queue_bind(queue='midpri', exchange='kombu_big_tasks', routing_key='midpri', arguments=None, nowait=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] queue_bind(queue='midpri', exchange='kombu_big_tasks', routing_key='midpri', arguments=None, nowait=False)
[Kombu channel:1] exchange_declare(exchange='kombu_big_tasks', type='direct', durable=True, auto_delete=False, arguments=None, nowait=False, passive=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] exchange_declare(exchange='kombu_big_tasks', type='direct', durable=True, auto_delete=False, arguments=None, nowait=False, passive=False)
[Kombu channel:1] prepare_queue_arguments({}, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] prepare_queue_arguments({}, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None)
[Kombu channel:1] queue_declare(queue='lopri', passive=False, durable=True, exclusive=False, auto_delete=False, arguments={}, nowait=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] queue_declare(queue='lopri', passive=False, durable=True, exclusive=False, auto_delete=False, arguments={}, nowait=False)
[Kombu channel:1] queue_bind(queue='lopri', exchange='kombu_big_tasks', routing_key='lopri', arguments=None, nowait=False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] queue_bind(queue='lopri', exchange='kombu_big_tasks', routing_key='lopri', arguments=None, nowait=False)
[Kombu channel:1] basic_qos(0, 1, False)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] basic_qos(0, 1, False)
[Kombu channel:1] basic_consume(queue='hipri', no_ack=False, consumer_tag='None1', callback=<bound method Consumer._receive_callback of <Consumer: [<Queue hipri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> hipri bound to chan:1>, <Queue midpri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> midpri bound to chan:1>, <Queue lopri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> lopri bound to chan:1>]>>, nowait=True, arguments=None, on_cancel=None)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] basic_consume(queue='hipri', no_ack=False, consumer_tag='None1', callback=<bound method Consumer._receive_callback of <Consumer: [<Queue hipri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> hipri bound to chan:1>, <Queue midpri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> midpri bound to chan:1>, <Queue lopri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> lopri bound to chan:1>]>>, nowait=True, arguments=None, on_cancel=None)
[Kombu channel:1] basic_consume(queue='midpri', no_ack=False, consumer_tag='None2', callback=<bound method Consumer._receive_callback of <Consumer: [<Queue hipri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> hipri bound to chan:1>, <Queue midpri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> midpri bound to chan:1>, <Queue lopri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> lopri bound to chan:1>]>>, nowait=True, arguments=None, on_cancel=None)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] basic_consume(queue='midpri', no_ack=False, consumer_tag='None2', callback=<bound method Consumer._receive_callback of <Consumer: [<Queue hipri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> hipri bound to chan:1>, <Queue midpri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> midpri bound to chan:1>, <Queue lopri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> lopri bound to chan:1>]>>, nowait=True, arguments=None, on_cancel=None)
[Kombu channel:1] basic_consume(queue='lopri', no_ack=False, consumer_tag='None3', callback=<bound method Consumer._receive_callback of <Consumer: [<Queue hipri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> hipri bound to chan:1>, <Queue midpri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> midpri bound to chan:1>, <Queue lopri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> lopri bound to chan:1>]>>, nowait=False, arguments=None, on_cancel=None)
2025-07-19 15:55:05 - kombu.channel - DEBUG - [Kombu channel:1] basic_consume(queue='lopri', no_ack=False, consumer_tag='None3', callback=<bound method Consumer._receive_callback of <Consumer: [<Queue hipri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> hipri bound to chan:1>, <Queue midpri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> midpri bound to chan:1>, <Queue lopri -> <Exchange kombu_big_tasks(direct) bound to chan:1> -> lopri bound to chan:1>]>>, nowait=False, arguments=None, on_cancel=None)
2025-07-19 15:55:06 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : for connection e2f2a870a0e6451187c0fc3aef3d55bf
2025-07-19 15:55:06 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : Prev sent/recv: None/None, now - 17/15, monotonic - 1966022.093, last_heartbeat_sent - 1966022.093, heartbeat int. - 60 for connection e2f2a870a0e6451187c0fc3aef3d55bf
2025-07-19 15:55:07 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : for connection e2f2a870a0e6451187c0fc3aef3d55bf
2025-07-19 15:55:07 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : Prev sent/recv: 17/15, now - 17/15, monotonic - 1966023.093, last_heartbeat_sent - 1966022.093, heartbeat int. - 60 for connection e2f2a870a0e6451187c0fc3aef3d55bf
2025-07-19 15:55:08 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : for connection e2f2a870a0e6451187c0fc3aef3d55bf
2025-07-19 15:55:08 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : Prev sent/recv: 17/15, now - 17/15, monotonic - 1966024.093, last_heartbeat_sent - 1966022.093, heartbeat int. - 60 for connection e2f2a870a0e6451187c0fc3aef3d55bf
2025-07-19 15:55:09 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : for connection e2f2a870a0e6451187c0fc3aef3d55bf
2025-07-19 15:55:09 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : Prev sent/recv: 17/15, now - 17/15, monotonic - 1966025.093, last_heartbeat_sent - 1966022.093, heartbeat int. - 60 for connection e2f2a870a0e6451187c0fc3aef3d55bf
2025-07-19 15:55:10 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : for connection e2f2a870a0e6451187c0fc3aef3d55bf

worker.py 开启了 debug级别的日志,方便排查问题。在没有执行超长时间任务之前,消费者发送心跳到RabbitMQ服务端,服务端感知消费者存在不会断开消费者的连接。
在这里插入图片描述
RabbitMQ 服务端的日志如下

2025-07-19 15:55:05.840000+08:00 [info] <0.1487.0> accepting AMQP connection 127.0.0.1:59923 -> 127.0.0.1:5672
2025-07-19 15:55:05.844000+08:00 [info] <0.1487.0> connection 127.0.0.1:59923 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'

本地是window系统启动的RabbitMQ,服务端的默认日志路径为
C:\Users%USERNAME%\AppData\Roaming\RabbitMQ\log
在这里插入图片描述
2. 执行 client.py,消息发送给服务端,worker.py 接收到消息之后,开始执行任务,任务开始执行后,心跳的运行日志没了,心跳发送被超长时间任务阻塞了,如下图所示
在这里插入图片描述
管理端这边也可以查询到有一个任务正在执行没有应答
在这里插入图片描述
等待一段时间之后,服务端没有接收到心跳,主动断开连接,服务端日志如下

2025-07-19 15:55:05.840000+08:00 [info] <0.1487.0> accepting AMQP connection 127.0.0.1:59923 -> 127.0.0.1:5672
2025-07-19 15:55:05.844000+08:00 [info] <0.1487.0> connection 127.0.0.1:59923 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'
2025-07-19 16:09:23.336000+08:00 [info] <0.1982.0> accepting AMQP connection 127.0.0.1:60967 -> 127.0.0.1:5672
2025-07-19 16:09:23.341000+08:00 [info] <0.1982.0> connection 127.0.0.1:60967 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'
2025-07-19 16:09:23.350000+08:00 [warning] <0.1982.0> closing AMQP connection <0.1982.0> (127.0.0.1:60967 -> 127.0.0.1:5672, vhost: '/', user: 'guest', duration: '14ms'):
2025-07-19 16:09:23.350000+08:00 [warning] <0.1982.0> client unexpectedly closed TCP connection
2025-07-19 16:12:05.968000+08:00 [error] <0.1487.0> closing AMQP connection <0.1487.0> (127.0.0.1:59923 -> 127.0.0.1:5672, duration: '17M, 0s'):
2025-07-19 16:12:05.968000+08:00 [error] <0.1487.0> missed heartbeats from client, timeout: 60s

服务端日志分析如下:
2025-07-19 15:55:05.840000 worker.py 运行,消费者连接上RabbitMQ,日志标识 <0.1487.0> 通道标识 59923
2025-07-19 16:09:23.336000 client.py 运行,生产者连接上RabbitMQ,日志标识 <0.1982.0> 通道标识60967, 投递消息任务到RabbitMQ
2025-07-19 16:09:23.341000 消息发送成功后,生产者断开连接 client unexpectedly closed TCP connection 日志标识 <0.1982.0>
2025-07-19 16:12:05.968000 服务端没有收到消费者的心跳 <0.1487.0> missed heartbeats from client, timeout: 60s, 断开连接 closing AMQP connection <0.1487.0>

服务端断开连接之后,消费者还在执行任务,任务执行完毕之后,进行应答,由于连接已经断开,worker.py(消费者)会产生错误,之后就会重连,日志如下

execute hello task
Hello Kombu
[Kombu channel:1] close()
2025-07-19 16:27:23 - kombu.channel - DEBUG - [Kombu channel:1] close()
2025-07-19 16:27:23 - amqp - DEBUG - Closed channel #1
[Kombu connection:0x1eb7089e950] closed
2025-07-19 16:27:23 - kombu.connection - DEBUG - [Kombu connection:0x1eb7089e950] closed
2025-07-19 16:27:23 - kombu.mixins - WARNING - Connection to broker lost, trying to re-establish connection...
Traceback (most recent call last):File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\mixins.py", line 174, in runfor _ in self.consume(limit=None, **kwargs):File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\mixins.py", line 190, in consumewith self.consumer_context(**kwargs) as (conn, channel, consumers):File "E:\Anaconda\Lib\contextlib.py", line 137, in __enter__return next(self.gen)^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\mixins.py", line 183, in consumer_contextwith self.Consumer() as (connection, channel, consumers):File "E:\Anaconda\Lib\contextlib.py", line 137, in __enter__return next(self.gen)^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\mixins.py", line 232, in Consumerwith self._consume_from(*self.get_consumers(cls, channel)) as c:File "E:\Anaconda\Lib\contextlib.py", line 137, in __enter__return next(self.gen)^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\utils\compat.py", line 135, in nestedreraise(exc[0], exc[1], exc[2])File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\exceptions.py", line 35, in reraiseraise valueFile "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\utils\compat.py", line 118, in nestedvars.append(enter())^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\messaging.py", line 464, in __enter__self.consume()File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\messaging.py", line 514, in consumeself._basic_consume(T, no_ack=no_ack, nowait=False)File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\messaging.py", line 641, in _basic_consumequeue.consume(tag, self._receive_callback,File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\entity.py", line 750, in consumereturn self.channel.basic_consume(^^^^^^^^^^^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\utils\debug.py", line 69, in __wrappedreturn meth(*args, **kwargs)^^^^^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\channel.py", line 1574, in basic_consumep = self.send_method(^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\abstract_channel.py", line 79, in send_methodreturn self.wait(wait, returns_tuple=returns_tuple)^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\abstract_channel.py", line 99, in waitself.connection.drain_events(timeout=timeout)File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\connection.py", line 526, in drain_eventswhile not self.blocking_read(timeout):^^^^^^^^^^^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\connection.py", line 532, in blocking_readreturn self.on_inbound_frame(frame)^^^^^^^^^^^^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\method_framing.py", line 77, in on_framecallback(channel, msg.frame_method, msg.frame_args, msg)File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\connection.py", line 538, in on_inbound_methodreturn self.channels[channel_id].dispatch_method(^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\abstract_channel.py", line 156, in dispatch_methodlistener(*args)File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\channel.py", line 1629, in _on_basic_deliverfun(msg)File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\messaging.py", line 668, in _receive_callbackreturn on_m(message) if on_m else self.receive(decoded, message)^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\messaging.py", line 634, in receive[callback(body, message) for callback in callbacks]File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\messaging.py", line 634, in <listcomp>[callback(body, message) for callback in callbacks]^^^^^^^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\worker.py", line 45, in process_taskmessage.ack()File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\kombu\message.py", line 126, in ackself.channel.basic_ack(self.delivery_tag, multiple=multiple)File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\channel.py", line 1407, in basic_ackreturn self.send_method(^^^^^^^^^^^^^^^^^File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\abstract_channel.py", line 70, in send_methodconn.frame_writer(1, self.channel_id, sig, args, content)File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\method_framing.py", line 186, in write_framewrite(buffer_store.view[:offset])File "F:\Github\python\kombu_worker\.venv\Lib\site-packages\amqp\transport.py", line 350, in writeself._write(s)
ConnectionResetError: [WinError 10054] 远程主机强迫关闭了一个现有的连接。
[Kombu connection:0x1eb708c1c50] establishing connection...
2025-07-19 16:27:23 - kombu.connection - DEBUG - [Kombu connection:0x1eb708c1c50] establishing connection...

这个超长时间的任务会循环的重复运行。消费者运行超长时间任务,阻塞心跳 -> 服务端断开连接,任务重新回到队列 -> 消费者应答失败,重新连接服务端 -> 消费者运行超长时间任务,阻塞心跳,如图所示
在这里插入图片描述
服务端的日志表现如下

2025-07-19 15:55:05.840000+08:00 [info] <0.1487.0> accepting AMQP connection 127.0.0.1:59923 -> 127.0.0.1:5672
2025-07-19 15:55:05.844000+08:00 [info] <0.1487.0> connection 127.0.0.1:59923 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'
2025-07-19 16:09:23.336000+08:00 [info] <0.1982.0> accepting AMQP connection 127.0.0.1:60967 -> 127.0.0.1:5672
2025-07-19 16:09:23.341000+08:00 [info] <0.1982.0> connection 127.0.0.1:60967 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'
2025-07-19 16:09:23.350000+08:00 [warning] <0.1982.0> closing AMQP connection <0.1982.0> (127.0.0.1:60967 -> 127.0.0.1:5672, vhost: '/', user: 'guest', duration: '14ms'):
2025-07-19 16:09:23.350000+08:00 [warning] <0.1982.0> client unexpectedly closed TCP connection
2025-07-19 16:12:05.968000+08:00 [error] <0.1487.0> closing AMQP connection <0.1487.0> (127.0.0.1:59923 -> 127.0.0.1:5672, duration: '17M, 0s'):
2025-07-19 16:12:05.968000+08:00 [error] <0.1487.0> missed heartbeats from client, timeout: 60s
2025-07-19 16:15:23.418000+08:00 [info] <0.2184.0> accepting AMQP connection 127.0.0.1:61211 -> 127.0.0.1:5672
2025-07-19 16:15:23.425000+08:00 [info] <0.2184.0> connection 127.0.0.1:61211 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'
2025-07-19 16:18:23.455000+08:00 [error] <0.2184.0> closing AMQP connection <0.2184.0> (127.0.0.1:61211 -> 127.0.0.1:5672, duration: '3M, 0s'):
2025-07-19 16:18:23.455000+08:00 [error] <0.2184.0> missed heartbeats from client, timeout: 60s
2025-07-19 16:21:23.448000+08:00 [info] <0.2248.0> accepting AMQP connection 127.0.0.1:61377 -> 127.0.0.1:5672
2025-07-19 16:21:23.453000+08:00 [info] <0.2248.0> connection 127.0.0.1:61377 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'
2025-07-19 16:24:23.484000+08:00 [error] <0.2248.0> closing AMQP connection <0.2248.0> (127.0.0.1:61377 -> 127.0.0.1:5672, duration: '3M, 0s'):
2025-07-19 16:24:23.484000+08:00 [error] <0.2248.0> missed heartbeats from client, timeout: 60s
2025-07-19 16:27:23.474000+08:00 [info] <0.2312.0> accepting AMQP connection 127.0.0.1:61524 -> 127.0.0.1:5672
2025-07-19 16:27:23.481000+08:00 [info] <0.2312.0> connection 127.0.0.1:61524 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'
2025-07-19 16:30:23.502000+08:00 [error] <0.2312.0> closing AMQP connection <0.2312.0> (127.0.0.1:61524 -> 127.0.0.1:5672, duration: '3M, 0s'):
2025-07-19 16:30:23.502000+08:00 [error] <0.2312.0> missed heartbeats from client, timeout: 60s
2025-07-19 16:33:23.507000+08:00 [info] <0.2489.0> accepting AMQP connection 127.0.0.1:61749 -> 127.0.0.1:5672
2025-07-19 16:33:23.512000+08:00 [info] <0.2489.0> connection 127.0.0.1:61749 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'
2025-07-19 16:36:23.543000+08:00 [error] <0.2489.0> closing AMQP connection <0.2489.0> (127.0.0.1:61749 -> 127.0.0.1:5672, duration: '3M, 0s'):
2025-07-19 16:36:23.543000+08:00 [error] <0.2489.0> missed heartbeats from client, timeout: 60s
2025-07-19 16:39:23.533000+08:00 [info] <0.2732.0> accepting AMQP connection 127.0.0.1:62055 -> 127.0.0.1:5672
2025-07-19 16:39:23.538000+08:00 [info] <0.2732.0> connection 127.0.0.1:62055 -> 127.0.0.1:5672: user 'guest' authenticated and granted access to vhost '/'

管理端可以查看到消费者断开的间隙,消费端正在运行超长时间任务,阻塞了心跳
在这里插入图片描述
在这里插入图片描述

解决方法

使用异步线程运行超长时间任务 防止心跳阻塞

修该 worker.py,增加线程池,异步执行任务

from __future__ import annotationsimport osos.environ['KOMBU_LOG_CONNECTION'] = 'True'
os.environ['KOMBU_LOG_CHANNEL'] = 'True'import logginglogging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)from kombu.log import get_logger
from kombu.mixins import ConsumerMixin
from kombu.utils.functional import reprcall
from queues import task_queues
from concurrent.futures import ThreadPoolExecutorlogger = get_logger(__name__)class Worker(ConsumerMixin):def __init__(self, connection):self.connection = connectionself.thread_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix='queue_worker')def get_consumers(self, Consumer, channel):return [Consumer(queues=task_queues,accept=['pickle', 'json'],callbacks=[self.on_message],prefetch_count=1)]def on_message(self, body, message):self.thread_pool.submit(self.process_task, body, message)def process_task(self, body, message):fun = body['fun']args = body['args']kwargs = body['kwargs']logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))try:fun(*args, **kwargs)except Exception as exc:logger.error('task raised exception: %r', exc)message.ack()if __name__ == '__main__':from kombu import Connectionfrom kombu.utils.debug import setup_logging# setup root loggersetup_logging(loglevel=logging.DEBUG)with Connection('amqp://guest:guest@localhost:5672//', heartbeat=60) as conn:try:worker = Worker(conn)worker.run()except KeyboardInterrupt:print('bye bye')

在运行任务的时候,不会阻塞心跳了,worker.py日志如下

2025-07-19 16:56:19 - kombu.channel - DEBUG - [Kombu channel:1] message_to_python(<amqp.basic_message.Message object at 0x000001B436F16C30>)
2025-07-19 16:56:19 - __main__ - INFO - Got task: hello_task('Kombu')
execute hello task
2025-07-19 16:56:20 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : for connection 7fb334777d214ef1ac01025683428e30
2025-07-19 16:56:20 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : Prev sent/recv: None/None, now - 17/18, monotonic - 1969696.078, last_heartbeat_sent - 1969696.078, heartbeat int. - 60 for connection 7fb334777d214ef1ac01025683428e30
2025-07-19 16:56:21 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : for connection 7fb334777d214ef1ac01025683428e30
2025-07-19 16:56:21 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : Prev sent/recv: 17/18, now - 17/18, monotonic - 1969697.078, last_heartbeat_sent - 1969696.078, heartbeat int. - 60 for connection 7fb334777d214ef1ac01025683428e30
2025-07-19 16:56:22 - amqp.connection.Connection.heartbeat_tick - DEBUG - heartbeat_tick : for connection 7fb334777d214ef1ac01025683428e30

可以看到管理端的任务运行完了
在这里插入图片描述

http://www.xdnf.cn/news/15775.html

相关文章:

  • (LeetCode 面试经典 150 题) 49. 字母异位词分组 (哈希表)
  • 基于Eureka和restTemple的负载均衡
  • buildroot运行qemu进行pcie设备模拟,开发驱动的方式
  • 【RK3576】【Android14】Android平台构建
  • 爬虫逆向之JS混淆案例(全国招标公告公示搜索引擎 type__1017逆向)
  • 重学Framework Input模块:如何实现按键一键启动Activity-学员作业
  • HTML5中的自定义属性
  • 【洛谷】询问学号、寄包柜、移动零、颜色分类(vector相关算法题p1)
  • 实验室危险品智能管控:行为识别算法降低爆炸风险
  • bws-rs:Rust 编写的 S3 协议网关框架,支持灵活后端接入
  • 汽车ECU控制器通信架构
  • Java学习--------消息队列的重复消费、消失与顺序性的深度解析​
  • Linux 内存管理(2):了解内存回收机制
  • Python实现智能文件搜索系统:从基础到高级应用
  • 【Oracle】ORACLE OMF说明
  • AUTOSAR进阶图解==>AUTOSAR_SWS_DiagnosticLogAndTrace
  • Redisson RLocalCachedMap 核心参详解
  • kotlin部分常用特性总结
  • Ultralytics代码详细解析(三:engine->trainer.py主框架)
  • LVS——nat模式
  • 电机相关常见名词
  • 如何解决Flink CDC同步时间类型字段8小时时间差的问题,以MySQL为例
  • Redis Sentinel哨兵集群
  • Spring之【AnnotatedBeanDefinitionReader】
  • 针对大规模语言模型的上下文工程技术调研与总结(翻译并摘要)
  • 【C++】入门阶段
  • 基于开放API接口采集的定制开发开源AI智能名片S2B2C商城小程序数据整合与增长策略研究
  • 本地部署开源的 AI 驱动的搜索引擎 Perplexica 并实现外部访问
  • Spring Bean 的作用域(Bean Scope)
  • SpringAI_Chat模型_DeepSeek模型--基础对话