当前位置: 首页 > news >正文

RocketMQ 消息发送核心源码解析:DefaultMQProducerImpl.send () 方法深度剖析

引言

在分布式系统中,消息队列是实现异步通信、服务解耦和流量削峰的关键组件。Apache RocketMQ 作为一款高性能、高可靠的消息中间件,被广泛应用于各类互联网场景。其中,消息发送是最基础也是最重要的功能之一。本文将深入剖析 RocketMQ 中 DefaultMQProducerImpl.send() 方法的实现原理,带你了解消息发送的核心流程和关键技术点。

一、DefaultMQProducerImpl 概述

DefaultMQProducerImpl 是 RocketMQ 消息生产者的核心实现类,它实现了消息发送的具体逻辑。其类层次结构如下:

DefaultMQProducerImpl├── 实现了 MQProducerInner 接口├── 依赖于 MQClientInstance 进行网络通信├── 包含 TopicPublishInfoManager 管理主题路由信息├── 使用 SendMessageHookList 支持发送钩子扩展

 核心属性

 private final InternalLogger log = ClientLogger.getLog();/*** 随机数*/private final Random random = new Random();/*** 关联的消息生产者的组件*/private final DefaultMQProducer defaultMQProducer;/*** topic 发布信息的映射表*/private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =new ConcurrentHashMap<String, TopicPublishInfo>();/*** 发送消息的钩子List**/private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();/*** 完成事务消息的钩子list*/private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();/*** rpc钩子*/private final RPCHook rpcHook;/*** 异步发送线程池队列*/private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;/*** 默认异步发送消息的线程池*/private final ExecutorService defaultAsyncSenderExecutor;/*** 检查请求的队列*/protected BlockingQueue<Runnable> checkRequestQueue;/*** 检查请求的线程池*/protected ExecutorService checkExecutor;/*** 服务的状态*/private ServiceState serviceState = ServiceState.CREATE_JUST;/*** 客户端的实例*/private MQClientInstance mQClientFactory;/*** 检查禁用钩子的list*/private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();/*** 容错策略*/private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();/*** 异步化发送消息的线程池*/private ExecutorService asyncSenderExecutor;

二、send () 方法的核心流程

2.1 方法签名与重载

DefaultMQProducerImpl 提供了多个重载的 send() 方法,支持同步、异步和单向发送模式:

// 同步发送
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;// 异步发送
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;// 单向发送
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;// 带超时参数的同步发送
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;// 带队列选择器的发送
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

2.2 核心发送流程

所有 send() 方法最终都会调用 sendDefaultImpl() 方法,这是消息发送的核心实现:

    private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;//根据topic进行查找topic的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;//设置总的重试次数int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//进行选择一个消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择出来的队列不为nullif (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//走一个网络通信逻辑sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}

三、关键步骤详解

3.1 获取主题路由信息

 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//从本地的Map中进行获取Topic的路由信息TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//本地的Map中没有这个Topic的路由信息,那么就进行从NameServer中获取路由信息if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 从远程的NameServer中获取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}//topic的路由信息是可用的,那么就直接返回这个topicPublishInfoif (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//从远程的NameServer中获取路由信息 在进行返回this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}

3.2 消息队列选择

/*** 选择一个可用的队列* @param tpInfo topic的路由信息* @param lastBrokerName 上一次发送时候的broker* @return*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {//对这个queue进行累加一下indexint index = tpInfo.getSendWhichQueue().incrementAndGet();//遍历topic里的每个consumeQueuefor (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {//针对index 进行取模int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;//轮询选择一个messageQueueMessageQueue mq = tpInfo.getMessageQueueList().get(pos);//判断这个broker是否可用if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}//遍历完之后,发现没有可用的broker 至少选择一个brokerfinal String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {return new MessageQueue(mq.getTopic(), notBestBroker, tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);} else {return mq;}} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);}

3.3 底层消息发送

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();//获取broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//发送消息的请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);requestHeader.setBname(mq.getBrokerName());if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC: //同步发送long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//同步发送消息sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}

3.4 调用NettyClient发送消息

 public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {//获取一个当前的时间戳long beginStartTime = System.currentTimeMillis();//这里获取一个channel 这个channel 就是Broker跟NameServer之间的连接final Channel channel = this.getAndCreateChannel(addr);//如果连接存在并且连接是活跃的 那么就可以发送请求if (channel != null && channel.isActive()) {try {//执行发送前的一些操作 先不关心doBeforeRpcHooks(addr, request);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");}//这里就是真正发送网络请求的地方RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);//执行完成后的一些操作 先不关心doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque();try {//初始化响应的future对象final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);//获取请求的网络连接地址final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});//等待响应 使用countDownLatch进行同步的等待RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//没有返回结果if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {//请求超时了 发送成功了throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {//请求压根就没有发送成功throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}//如果响应成功return responseCommand;} finally {//最终一定会把响应从table中删除this.responseTable.remove(opaque);}}

 3.5 BrokerController接收请求

        客户端发送消息给BrokerController,BrokerController会进行调用SendMessageProcessor进行处理发送过来的消息。

    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default://解析出来一个发送消息请求的headerSendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}//构建一个消息的上下文mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}

  处理单个消息的逻辑:asyncSendMessage,调用底层的存储逻辑进行异步增加消息。

this.brokerController.getMessageStore().asyncPutMessage(msgInner);

最终会进行调用 CommitLog#asyncPutMessage的方法,根据决定是调用CommitLog#asyncPutMessage还是调用 DLedgerCommitLog#asyncPutMessage方法。如果是调用 DLedgerCommitLog#asyncPutMessage方法,最终会进行调用DLedgerServer#handleAppend。

保存完消息之后,ReputMessageService线程会调用CommitLogDispatcherBuildConsumeQueue(向ConsumeQueue中添加数据)和CommitLogDispatcherBuildIndex(向索引中添加数据) 这两个分发组件。

四、性能优化与最佳实践

4.1 性能优化建议

  1. 批量发送:对于小消息,使用批量发送提高吞吐量

List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "TagA", "Message 1".getBytes()));
messages.add(new Message("TopicTest", "TagA", "Message 2".getBytes()));
producer.send(messages);

   2.异步发送:对响应时间敏感的场景使用异步模式

producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理成功响应}@Overridepublic void onException(Throwable e) {// 处理异常}
});

3.合理配置重试次数:根据业务需求调整 retryTimesWhenSendFailed

4.启用故障延迟机制:避免向故障 Broker 发送消息

4.2 监控与告警

  1. 发送成功率:监控 SendStatus.SEND_OK 的比例

  2. 响应时间:监控消息发送的平均响应时间

  3. 异常日志:关注 sendDefaultImpl 方法中的异常捕获

  4. Broker 状态:监控 Broker 的可用性和负载情况

http://www.xdnf.cn/news/768475.html

相关文章:

  • 前端开发知识体系全景指南
  • 小目标检测:YOLOV7改进之双坐标注意力(DCA)
  • Python Day41
  • 神经网络与深度学习(第一章)
  • 链式前向星图解
  • 排序算法C语言实现
  • Linux配置DockerHub镜像源配置
  • Qt实现的水波进度条和温度进度条
  • 神经网络中的梯度消失与梯度爆炸
  • cnn训练并用grad-cam可视化
  • 基于遥感图像深度学习的海洋测深
  • 2024年数维杯国际大学生数学建模挑战赛C题时间信号脉冲定时噪声抑制与大气时延抑制模型解题全过程论文及程序
  • 题目 3230: 蓝桥杯2024年第十五届省赛真题-星际旅行
  • [蓝桥杯]约瑟夫环
  • web架构2------(nginx多站点配置,include配置文件,日志,basic认证,ssl认证)
  • 2025年5月24日系统架构设计师考试题目回顾
  • 【RAG 应用的可视化框架】
  • 【C++】类的构造函数
  • 【iOS(swift)笔记-13】App版本不升级时本地数据库sqlite更新逻辑一
  • 软件测评师教程 第2章 软件测试基础 笔记
  • 大数据-275 Spark MLib - 基础介绍 机器学习算法 集成学习 随机森铃 Bagging Boosting
  • 【C++进阶篇】C++11新特性(上篇)
  • 【笔记】在 Clang 工具链中降级 NumPy 到 2.2.4
  • JavaWeb预习(jsp)
  • 【AI智能体】Spring AI MCP 从使用到操作实战详解
  • 手机隐藏玩法有哪些?
  • 从线性方程组角度理解公式 s=n−r(3E−A)
  • Android Studio 配置之gitignore
  • Day43
  • 九(3).引用作为方法别名返回