【RocketMQ 生产者和消费者】- 消费者发起消息拉取请求 PullMessageService
文章目录
- 1. 前言
- 1. 入口方法 dispatchPullRequest
- 3. 消息拉取服务 PullMessageService
- 3.1 属性
- 3.2 服务启动
- 3.3 PullMessageService#pullMessage 拉取消息
- 3.4 nextPullOffset 计算
- 3.5 pullMessage 消息拉取前置处理
- 3.6 pullKernelImpl 发起消息拉取请求
- 3.7 pullMessage 发起消息拉取请求
- 3.8 pullMessageAsync 异步拉取消息
- 4. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
- 【RocketMQ 生产者和消费者】- 消费者启动源码
- 【RocketMQ 生产者和消费者】- 消费者重平衡(1)
- 【RocketMQ 生产者和消费者】- 消费者重平衡(2)- 分配策略
- 【RocketMQ 生产者和消费者】- 消费者重平衡(3)- 消费者 ID 对负载均衡的影响
- 【RocketMQ 生产者和消费者】- 消费者的订阅关系一致性
前几篇文章我们探讨了消费者重平衡的源码,在这篇文章 【RocketMQ 生产者和消费者】- 消费者重平衡(1) 中当消费者新分配到一个队列之后,就会在最后通过 dispatchPullRequest
方法提交一个消息拉取请求,这个方法是一个抽象方法,在 Pull 和 Push 模式中有不同的实现。
- PULL 模式下拉取消息是消费者决定的,所以 PULL 类型的消费者的实现都是空实现。
- PUSH 模式下则是消费者自己去拉取消息来消费, 所以这些拉取请求会被 PullMessageService 服务去处理。
因此我们可以得到一个很重要的消息,就是在 push 模式下消费者的拉取请求会被 PullMessageService 处理,那么这个类是如何发起拉取请求的,这篇文章我们就来看下。
1. 入口方法 dispatchPullRequest
/*** 分发拉取消息的请求* @param pullRequestList*/
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}
}
在分发消息请求的时候是通过 executePullRequestImmediately
分发去处理的,因此我们来看下这个方法。
/*** 立刻将消息拉取请求提交到 pullRequestQueue 中* @param pullRequest*/
public void executePullRequestImmediately(final PullRequest pullRequest) {this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
最终这个方法会调用到 PullMessageService#executePullRequestImmediately
,而这个方法就是将请求添加到 pullRequestQueue
集合中。
/*** 将拉取消息的请求存储到 pullRequestQueue 队列中, 因为是阻塞队列, 所以只要有 pullRequest 请求到来就会被当前服务去处理* @param pullRequest*/
public void executePullRequestImmediately(final PullRequest pullRequest) {try {this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}
}
3. 消息拉取服务 PullMessageService
PullMessageService 是在 MQClientInstance 的构造器中创建出来的。
也就是说消费者启动的时候就会创建一个 PullMessageService,由于当前版本下 MQClientInstance 是每一个消费者独有的,因此每一个消费者都有自己的 PullMessageService。
3.1 属性
还是一样,先来看下这个类里面的属性,方便理解请求是如何存储的。
- pullRequestQueue: 类型是
LinkedBlockingQueue<PullRequest>
,这个就是用来存储 PullRequest 的,上面的 executePullRequestImmediately 就是把请求存到这个阻塞队列中。 - mQClientFactory: 对应的消费者 MQClientInstance。
- scheduledExecutorService: 定时任务线程池,这东西是用来将 PullRequest 延时放到 pullRequestQueue 中的,为什么需要延时呢?举个例子,消费者是有消息拉取阈值的,比如消费者拉取完消息之后会存在本地 msgTree 中,如果存储数量或者大小达到阈值,那么就会延时一段时间再去继续拉取消息。
3.2 服务启动
上面就是这个类的属性了,下面来看下启动方法。
接下来看下 PullMessageService 的 run 方法,开始启动消息拉取服务。
/*** 消息拉取服务*/
@Override
public void run() {log.info(this.getServiceName() + " service started");// 服务是死循环, 没有阻塞时间, 只要有请求过来就会去处理while (!this.isStopped()) {try {// 阻塞获取拉取消息请求PullRequest pullRequest = this.pullRequestQueue.take();// 拉取到了请求, 就到这里面去处理this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");
}
可以看到这里就是从 pullRequestQueue
集合中阻塞获取消息拉取请求,如果集合是空的,那么就会一直阻塞住。获取到请求之后,调用 pullMessage 拉取消息。
3.3 PullMessageService#pullMessage 拉取消息
/*** 处理拉取消息的请求* @param pullRequest*/
private void pullMessage(final PullRequest pullRequest) {// PullMessageService 是 MQClientInstance 中新建的, 也就是每一个消费者都会有一个 PullMessageService 实例, mQClientFactory// 中根据消费者组获取出来的就是当前的消费者, 那如果获取不到, 就说明这个拉取请求找不到消费者来处理, 有可能是消费者已经关闭了final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {// 获取到消费者之后肯定是 DefaultMQPushConsumerImpl 类型, Pull 类型的消息拉取不会在这里处理, 而是用户自己决定拉取的逻辑,// 所以这里肯定是 Push 类型的消费者DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;// 拉取消息impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}
}public MQConsumerInner selectConsumer(final String group) {return this.consumerTable.get(group);
}
首先从本地的 consumerTable
中获取出这个消费者组下面的消费者,然后调用 DefaultMQPushConsumerImpl#pullMessage 来拉取消息。
3.4 nextPullOffset 计算
/*** 拉取消息* @param pullRequest 拉取消息请求, 里面指定了要从哪开始拉取, 拉取的队列是哪个*/
public void pullMessage(final PullRequest pullRequest) {...
}
首先在正式看这个方法之前,我们来看下 PullRequest 的属性,如果要拉取消息肯定要指定一个消息队列,然后从指定的 offset 开始拉取,那么对于新加入的队列,这个 offset 是怎么获取到的呢,我们还是要回到重平衡的 updateProcessQueueTableInRebalance
方法中。
上面的代码是处理新增的队列,当前消费者如果重平衡了一个新的队列,那么就需要先把这个队列的 offset 从本地缓存中先删掉,也就是本地的 offsetTable
删掉,然后在计算下一次从哪里拉取,当然这里指的是 push 模式,pull 模式是用户自己去控制拉取的,所以 computePullFromWhereWithException 在 pull 模式下是空实现。
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {long result = -1;final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();// 消息队列从哪里开始拉取, 根据不同策略走不同的逻辑, 默认是 CONSUME_FROM_LAST_OFFSETswitch (consumeFromWhere) {case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:case CONSUME_FROM_MIN_OFFSET:case CONSUME_FROM_MAX_OFFSET:// 从上一次的偏移量开始继续拉取case CONSUME_FROM_LAST_OFFSET: {// 从本地的 offsetTable 中读取出队列的消费偏移量, 如果读不到就从 broker 中拉取long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;}// 第一次启动, 从本地和 broker 中都找不到 offsetelse if (-1 == lastOffset) {if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 如果是重试队列, 从头开始消费result = 0L;} else {try {// 如果不是重试队列, 那么从这个 ConsumeQueue 的最大逻辑偏移量开始拉取消息, 相当于消费最新的消息// 这里只有获取的方式是 READ_FROM_MEMORY, 也就是不从 broker 获取, 只判断本地缓存获取不到才会返回 -1// 又或者是压根找不到 broker 地址才会返回 -1, 当然这种太极端了result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);throw e;}}} else {// -2, 比如获取 offset 的过程中抛出了其他的异常result = -1;}break;}// 感觉跟上面的差不多, 因为下面是 READ_FROM_STORE, 所以只要能请求到 broker 都会返回 >= 0 的值, 当然如果是 LocalFileOffsetStore// 那本地如果找不到这个队列的偏移量就会返回 -1, 都一样是表示最新的队列, 然后就从最早的消息开始获取case CONSUME_FROM_FIRST_OFFSET: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {result = 0L;} else {result = -1;}break;}// 从指定时间点开始消费case CONSUME_FROM_TIMESTAMP: {// 获取 offsetlong lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {// 如果能获取到, 那么不走下面的逻辑了result = lastOffset;} else if (-1 == lastOffset) {if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {try {// 重传队列从最新消息开始消费result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);throw e;}} else {try {// 非重传队列就根据时间获取偏移量, broker 会通过二分搜索来查询long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),UtilAll.YYYYMMDDHHMMSS).getTime();result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);} catch (MQClientException e) {log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);throw e;}}} else {result = -1;}break;}default:break;}return result;
}
上面是 computePullFromWhereWithException 的全部实现,默认策略是 CONSUME_FROM_LAST_OFFSET
,所以我们这里主要看下 CONSUME_FROM_LAST_OFFSET 的实现。
offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)
这里是从本地的 offsetTable 中读取出队列的消费偏移量, 如果读不到就从 broker 中拉取,从上面的重平衡代码也可以看到如果你是新分配的队列,会先把这个队列的偏移量从本地 offsetTable 中删掉,因此这里会直接从 broker 中拉取偏移量,我们可以看下 readOffset 的源码,那注意因为涉及到 broker,所以我这里默认用的是 RemoteBrokerOffsetStore
这个类的实现。
/*** 从 offsetTable 中读取偏移量* @param mq* @param type* @return*/
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {if (mq != null) {switch (type) {// 首先从本地内存 offsetTable 读取case MEMORY_FIRST_THEN_STORE:case READ_FROM_MEMORY: {// 获取消息队列的偏移量AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {// 能获取到直接返回return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {// 获取不到再看下是不是需要从存储服务中获取, 如果不需要就直接返回 -1return -1;}}// 如果是 MEMORY_FIRST_THEN_STORE 也会走到这里case READ_FROM_STORE: {try {// RemoteBrokerOffsetStore 需要从 broker 拉取偏移量, 注意这个偏移量一定是 >= 0 的long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);AtomicLong offset = new AtomicLong(brokerOffset);// 拉取成功之后更新本地缓存 offsetTablethis.updateOffset(mq, offset.get(), false);return brokerOffset;}// No offset in brokercatch (MQBrokerException e) {return -1;}//Other exceptionscatch (Exception e) {log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);return -2;}}default:break;}}return -1;
}
然后可以看到由于我们上面设置的是 READ_FROM_STORE
,所以会直接走 case READ_FROM_STORE
的分支,因此我们直接看 fetchConsumeOffsetFromBroker
这个方法的实现,发起请求的过程就直接省略了,请求 CODE 是 QUERY_CONSUMER_OFFSET
,最终来到 broker 后是通过 queryConsumerOffset
这个方法去处理的。
/*** 查询消费者偏移量* @param ctx* @param request* @return* @throws RemotingCommandException*/
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// 响应结果final RemotingCommand response =RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);// 响应头final QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.readCustomHeader();// 请求头final QueryConsumerOffsetRequestHeader requestHeader =(QueryConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);// 查询出消费者的消费偏移量long offset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());if (offset >= 0) {// 获取成功, 将偏移量返回, 获取成功responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {// 这里就是没找到消费者偏移量, 获取第一条有效的 ConsumeQueue 索引long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());// 如果消费队列最新偏移量小于等于 0,并且该消费队列的 0 偏移量数据还在内存中, 表示为新消息队列并且消息未清理过,并且数据量不是很大if (minOffset <= 0&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {// 返回结果, 偏移量为 0, 意思是消费者可以从 0 开始消费最新的消息responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {// 这里就是压根没找到消息, 这个队列的数据也没找到, 直接返回 QUERY_NOT_FOUNDresponse.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}}return response;
}
这里就是全部代码了,然后 queryOffset 就是查询的核心逻辑,里面的实现就是直接从 broker 的 offsetTable
中获取偏移量,消费者启动的时候会有一个定时任务定时上报偏移量的,所以可以直接获取,如果获取不到就返回 -1。
/*** 从 broker 的 offsetTable 缓存中查询出来消费者偏移量* @param group 消费者组* @param topic topic* @param queueId 队列 ID* @return*/
public long queryOffset(final String group, final String topic, final int queueId) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {// 根据队列 ID 获取消费偏移量Long offset = map.get(queueId);if (offset != null)return offset;}return -1;
}
那如果 broker 获取不到,就判断下这个队列是不是被交换到磁盘上面了,如果是说明内存中压根就没有这个队列的信息了,这时候就返回 QUERY_NOT_FOUND
,表示队列找不到,然后不是就返回 0,比如 topic 队列扩容了,新的队列之前没有分配给任何一个消费者,这种情况下会从 0 开始消费。
那经过上面的源码大致分析,最后我们就可以得知正常情况下新分配的队列如果是之前没有消费过的就会从 0 开始消费,如果消费过了,那么就从上一次最后拉取的位置继续拉取消息来消费。
3.5 pullMessage 消息拉取前置处理
上面我们讲了 PullRequest 中的 nextOffset 是怎么设置的,那这一小节就跟着 3.3 继续往下走,来看下消息拉取方法,因为代码有点多,所以就不直接贴出来。
// 首先获取下消息队列的处理队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 如果这个队列已经被丢弃了, 就不处理了, 什么情况会丢弃呢? 现在知道的一个就是当当前消费者分配的队列发生变化, 原本不再消费的消息队列就会设置成这个状态
if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return;
}
来看源码,首先就是判断下这个消息队列是不是 drop 状态,比如说当前消费者队列里面还有一些 PullRequest,但是这时候来了一个新的消费者负载均衡之后当前消费者的队列变了,这种情况下那些不属于当前消费者的队列就要设置为 drop 状态,后面不再去消费这个队列的消息。
// 获取最后一次拉取消息的时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
下面设置下最后一次拉取消息的时候,设置这个属性是为了可以通过 isPullExpired
这个方法判断拉取的时间是不是超过最大时间间隔了,默认是 120s,如果超过说明这个队列已经失效了。
try {/*** 1. 确保当前 Consumer 服务状态是可用的*/this.makeSureStateOK();
} catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return;
}
接下来先判断下 Consumer 的状态是否是可用的,判断的逻辑也简单,因为 Consumer 中是有一个 serviceState 属性的,如果不是处于 RUNNING 状态,说明还不可用。
/*** 确定当前 Consumer 的状态是正常的* @throws MQClientException*/
private void makeSureStateOK() throws MQClientException {if (this.serviceState != ServiceState.RUNNING) {throw new MQClientException("The consumer service state not OK, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);}
}
接下来判断如果消费者暂停了,就延时 1s 再去发送消息拉取请求,上面 3.1 的定时任务线程池就用上了。至于什么情况下会把消费者的 pause 设置成 true,看源码是在调用 resetOffset
的时候为了避免偏移量的影响就会调用下这个方法暂停拉取请求。
/*** 2. 如果消费者服务暂停了, 那么延迟 1s 再去发送拉取消息请求*/
if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return;
}
接下来就是流控校验,主要就是校验队列方面的,也就是说当前消费者已经缓存在本地的消息不能超过 1000 条,同时大小不能超过 100MB,注意这里是一个队列的。
/*** 3. 流控校验* cachedMessageCount: 消息队列拉取的消息会缓存到 ProcessQueue 的 msgTreeMap 中, 这里就是获取已经缓存的消息数* cachedMessageSizeInMiB: 已经缓存的消息总大小, 单位是 MB*/
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);// 消费者已经缓存的消息数不能超过 pullThresholdForQueue, 默认是 1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {// 延迟 50ms 之后再次发送消息拉取请求this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {// 这里就是打印下日志log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}// 流控限制, 直接返回return;
}// 消费者已经缓存的消息数大小不能超过 pullThresholdSizeForQueue, 默认是 100MB
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {// 延迟 50ms 之后再次发送消息拉取请求this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}// 流控限制, 直接返回return;
}
然后下面是关于并发消费和顺序消费的消息跨度校验和点位纠正。
/*** 4. 顺序消费和并发消费的相关校验和点位的处理*/
if (!this.consumeOrderly) {// 如果是并发消费, 判断下缓存中存储的消息的全部消息的跨度(偏移量之差)大于 consumeConcurrentlyMaxSpan, 默认 2000if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {// 延迟 50ms 之后再次发送消息拉取请求this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}// 跨度限制, 直接返回return;}
} else {// 这里就是顺序消费, 需要对队列加锁if (processQueue.isLocked()) {// 如果这个拉取消息的请求之前没有锁定过, 这里需要纠正消费点位, 防止超前消费if (!pullRequest.isPreviouslyLocked()) {long offset = -1L;try {// 获取 MessageQueue 的下一条要消费的消息的偏移量offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());} catch (Exception e) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);return;}// 判断下是不是超前消费了boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}// previouslyLocked 状态设置为 true, setPreviouslyLocked 只在这里调用了, 看来就是用来纠正消费点位的pullRequest.setPreviouslyLocked(true);// 设置纠正后的消费点位pullRequest.setNextOffset(offset);}} else {// 如果加锁失败, 那么延迟 3s 后继续发送拉取消息的请求this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}
}
如果是并发消费,那么判断下这个队列缓存中存储的消息的全部消息的跨度(偏移量之差)大于 consumeConcurrentlyMaxSpan,默认 2000,如果大于,那就说明消息可能有点多了,做下流控,延迟 50ms 之后再次发送消息拉取请求,让消费者先去消费下本地的消息。
如果是顺序消费,由于顺序消费一段时间内只能被一个消费者操作,避免顺序性的问题,因此在处理之前先加个锁。接下来获取下这个 MessageQueue 下一次要消费的消息偏移量,那这个方法在上面 3.4 小节也有说过。如果有异常就延迟 3s 再次提交消息拉取请求,如果能获取到,就判断下这个消息队列下一条消息的 offset 和当前请求要拉取的 nextOffset 是不是要小,如果是就说明还有一部分消息不会被拉取到,所以纠正下 offset。 这里正常来说也不会要大吧,因为顺序消费毕竟都是加了锁的,如果有出现过对应场景也可以在评论区交流下。最后如果加锁失败,那么延迟 3s 后继续发送拉取消息的请求。
/*** 5. 校验 topic 订阅消息, 如果找不到当前 topic 的订阅信息, 延迟 3s 后再次提交消息拉取请求*/
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return;
}
然后回到源码,接下来校验 topic 订阅消息,如果找不到当前 topic 的订阅信息,延迟 3s 后再次提交消息拉取请求,有可能是这个 topic 的配置还没有被定时任务缓存到本地。
// 获取起始时间
final long beginTimestamp = System.currentTimeMillis();/*** 6. 设置拉取消息的回调, 当拉取消息成功之后就会回调 PullCallback#onSuccess*/
PullCallback pullCallback = new PullCallback() {/*** 当成功拉取到消息且收到 broker 的响应之后会回调 pullCallback#onSuccess* @param pullResult*/@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {// 处理消息拉取结果, 在里面会过滤消息并且设置一些属性, 用户可以扩展自定义过滤类在这里面去过滤消息pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {// 找到了消息case FOUND:// 获取消息拉取的起始 offsetlong prevRequestOffset = pullRequest.getNextOffset();// 设置下一次要拉取的起始偏移量到 pullRequest 中pullRequest.setNextOffset(pullResult.getNextBeginOffset());// 拉取的总耗时long pullRT = System.currentTimeMillis() - beginTimestamp;// 增加这个 topic 下面的消息拉取总耗时DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;// 如果找不到消息if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {// 立刻重新提交拉取消息的请求, 下一次就从 pullResult.getNextBeginOffset() 开始拉取DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {// 找到了消息, 获取第一条消息的偏移量firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();// 修改消息拉取的 TPSDefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());/*** 7. 将拉取到的消息存入到处理队列的本地缓存 msgTreeMap 中, msgTreeMap 是一个 TreeMap, 根据偏移量排序*/boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());/*** 8. 拉取到消息之后, 提交消费请求 ConsumeRequest 给消费者 ConsumeMessageConcurrentlyService(并发消费)* 或者 ConsumeMessageOrderlyService(顺序消费)*/DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);/*** 9. 如果消息拉取的时间间隔大于 0, 就延迟一段时间再提交拉取请求, 否则立刻提交, 默认是立刻提交*/if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {// 延迟将拉取消息的请求存储到 pullRequestQueue 队列中, 因为是阻塞队列, 所以只要有 pullRequest 请求到来就会被消费服务去处理DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {// 立刻将拉取消息的请求存储到 pullRequestQueue 队列中, 因为是阻塞队列, 所以只要有 pullRequest 请求到来就会被消费服务去处理DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;// 没找到消息或者消息都被过滤掉了case NO_NEW_MSG:case NO_MATCHED_MSG:// 设置下一次拉取消息的起始位置pullRequest.setNextOffset(pullResult.getNextBeginOffset());// 更新 offsetStore 里面的消息拉取偏移量DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);// 立刻将拉取消息的请求存储到 pullRequestQueue 队列中, 因为是阻塞队列, 所以只要有 pullRequest 请求到来就会被当前服务去处理DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;// 偏移量不合法, offset 太大, offset 太小, 消息队列没有数据case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());// 更新下一次拉取偏移量pullRequest.setNextOffset(pullResult.getNextBeginOffset());// 设置处理队列状态为 droppedpullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {@Overridepublic void run() {try {// 更新队列的消息队列消费偏移量DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);// 持久化对应的消费队列 offsetDefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());// 删掉这个消息队列DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}@Overridepublic void onException(Throwable e) {if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("execute the pull request exception", e);}/*** 出异常的话延迟 3s 再次提交消息拉取请求*/DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}
};
接下来就是重要的一段代码了,消息拉取回调。当从 broker 中拉到了消息就会回调 onSuccess
方法,如果出异常就会回调 onException
方法,出异常就是打印下日志,然后延迟 3s 再次提交消息拉取请求,拉取成功的逻辑就比较多了,当然这里我先不展开细说,等到把 broker 如何处理消息拉取请求的逻辑说完之后再来看这部分的回调,不然现在看这些状态也搞不懂是什么意思。
/*** 7. 设置 commitOffsetEnable, 表示是否允许上报消费点位*/
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {// 集群模式下, 从本地 offsetTable 中获取下消息队列的 commitOffsetValuecommitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {// 如果大于 0, 就允许上报消费点位给 brokercommitOffsetEnable = true;}
}
继续看 pullMessage 的源码,这里是设置 commitOffsetEnable,表示是否允许上报消费点位,这个标记的意思是 broker 在处理消息拉取请求的时候如果这个标记为 true,那么就会将这个队列的偏移量存到 broker 本地的 offsetTable 中,后面再通过 broker 的定时任务持久化到文件中。要注意这里就是本地 offsetTable 中存的是什么,提交到 broker 中的就是什么,因为此时只是发起拉取请求,还不知道能不能拉到消息,所以现在没办法确定下一次要从哪个位置开始拉取。
// 消息订阅表达式
String subExpression = null;
boolean classFilter = false;
// 首先获取下 topic 相关的 TAG 订阅信息
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {// 获取订阅 TAG 表达式, 如 "tag1 || tag2 || tag3"if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {subExpression = sd.getSubString();}// Consumer 端类过滤模式, 用户自定义过滤类时才会用, 这里默认就是 false, 这个版本也用不到classFilter = sd.isClassFilterMode();
}
接下来是获取订阅的过滤表达式,比如 "tag1 || tag2 || tag3"
,但是这里一般都是 null,因为我们定义好了 topic 之后过滤信息也定下了,所以没必要每次都把这玩意给请求过去,broker 处理的时候会判断上传上来的过滤表达式和 broker 存储的是不是一样的,如果不一样就返回,但是我们用的时候一般都是一样的。
// 创建出拉取消息的系统标记
int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter
);
接下来创建出标记,节省空间。
try {/*** 8. 拉取消息*/this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);
} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
最后通过 pullKernelImpl 正式发起消息拉取请求。
3.6 pullKernelImpl 发起消息拉取请求
/*** 拉取消息* @param mq 消息队列* @param subExpression 消费者的订阅关系, 如 "tag1 | | tag2 | | tag3", 默认是 "*", 表示全部, 当然如果不设置也是订阅全部* @param expressionType 消息过滤表达式类型,是 TAG 还是 SQL92* @param subVersion topic 的订阅信息的版本* @param offset 下一次要拉取的消息的 offset* @param maxNums 一次性批量拉取的消息数, 默认是 32* @param sysFlag 系统标记* @param commitOffset 提交的消费点位, 从本地 offsetTable 中获取到的* @param brokerSuspendMaxTimeMillis broker 挂起的最长时间, 默认 15s* @param timeoutMillis 消费者消费拉取的超时时间, 默认 30s* @param communicationMode 消费者消息拉取模式, 默认是异步拉取* @param pullCallback 拉取消息的回调函数* @return* @throws MQClientException* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 首先传入 brokerName 和 brokerId 找到 broker 地址, 一开始默认是 MASTER, 用户也可以自定义从哪个 broker 拉FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);// 如果没找到 broker 地址if (null == findBrokerResult) {// 从 nameserver 中获取 topic 的路由信息并尝试更新本地缓存this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());// 更新完之后再拉取一次findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}// 找到了 brokerif (findBrokerResult != null) {{// 首先校验下过滤的模式, 如果是 SQL92 过滤, 需要在 V4_1_0_SNAPSHOT 这个版本之后才支持if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}// 系统标记int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {// 如果是从节点就把 CommitOffset 从 sysFlag 里面删掉, 从节点不需要保存拉取的消费点位sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}// 构建拉取消息请求PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();// 消费者组requestHeader.setConsumerGroup(this.consumerGroup);// topicrequestHeader.setTopic(mq.getTopic());// 队列 IDrequestHeader.setQueueId(mq.getQueueId());// 从哪个偏移位置开始拉取消息requestHeader.setQueueOffset(offset);// 最大拉取消息数量, 默认 32requestHeader.setMaxMsgNums(maxNums);// 系统标记requestHeader.setSysFlag(sysFlagInner);// 提交的消费点位, 从本地 offsetTable 中获取到的requestHeader.setCommitOffset(commitOffset);// broker 挂起的最长时间, 默认 15srequestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);// 消费者的订阅关系, 如 "tag1 | | tag2 | | tag3", 默认是 "*", 表示全部, 当然如果不设置也是订阅全部requestHeader.setSubscription(subExpression);// topic 的订阅信息的版本requestHeader.setSubVersion(subVersion);// 消息过滤表达式类型,是 TAG 还是 SQL92requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}// 发送拉取消息请求PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}// 这里还是找不到 broker, 抛出异常throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
这里的源码就是构造出 PullMessageRequestHeader 请求,然后 pullMessage 发起消息拉取请求,不过要注意下前面会判断如果是从节点,会把 CommitOffset 从 sysFlag 里面删掉,从节点不需要保存拉取的消费点位,因为消息拉取默认是向主节点发起的。
3.7 pullMessage 发起消息拉取请求
/*** MQClientAPIImpl的方法** @param addr broker 地址* @param requestHeader 拉取消息请求的请求头* @param timeoutMillis 消费者消息拉取超时时间,默认30s* @param communicationMode 消息拉取方式, 默认异步拉取* @param pullCallback 拉取消息之后调用回调函数* @return 拉取结果* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/
public PullResult pullMessage(final String addr,final PullMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {// 构建请求命令对象,请求 Code 为 PULL_MESSAGERemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {// 拉取消息肯定不能是 ONEWAY 模式case ONEWAY:assert false;return null;case ASYNC:// 默认就是这个异步模式this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);// 异步模式下返回空, 依靠回调函数来处理返回结果return null;case SYNC:return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break;}return null;
}
这里的模式除了 DefaultLitePullConsumerImpl,其他两个 PUSH 和 PULL 都是用的异步拉取,也就是拉取后会回调 PullCallback
,而消息拉取的请求 CODE 是 PULL_MESSAGE
。
3.8 pullMessageAsync 异步拉取消息
/*** 异步拉取消息, 同时触发回调函数* @param addr broker 地址* @param request 拉取消息的请求* @param timeoutMillis 消息拉取的超时时间* @param pullCallback 拉取到消息之后回调函数* @throws RemotingException* @throws InterruptedException*/
private void pullMessageAsync(final String addr,final RemotingCommand request,final long timeoutMillis,final PullCallback pullCallback
) throws RemotingException, InterruptedException {// 基于 Netty 发送异步请求this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {@Overridepublic void operationComplete(ResponseFuture responseFuture) {// 发送完成之后会回调, 回调结果有几种: 超时回调、发送失败回调、成功收到返回结果回调RemotingCommand response = responseFuture.getResponseCommand();if (response != null) {try {// 如果能收到返回结果, 说明是发送成功了, 并且收到了拉取的消息PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);assert pullResult != null;// 调用 pullCallback#onSuccess 分发处理拉取消息请求pullCallback.onSuccess(pullResult);} catch (Exception e) {pullCallback.onException(e);}} else {// 这里就是没有获取到结果, 都是通过 onException 来处理if (!responseFuture.isSendRequestOK()) {// 发送失败的pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));} else if (responseFuture.isTimeout()) {// 超时没有收到响应结果的pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,responseFuture.getCause()));} else {// 不知道什么原因的pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));}}}});
}
这里就是最后发起请求的地方,当发起请求之后 InvokeCallback 所在的 ResponseFuture 会被存到 responseTable
,等待 broker 返回结果之后再回调里面的 operationComplete
方法,但是除了 broker 回调,如果是发送失败或者是超时了也会回调这个方法,因此这里面需要判断如果 response != null,说明是正常回调,这种情况下会调用 processPullResponse
解析出 PullResult,然后再调用 pullCallback.onSuccess(pullResult)
。
如果是没有获取到结果,说明要么是发送失败,要么是超时了,这种情况下就调用 onException
,这里的 onException
前面介绍过了,逻辑比较简单,就是延迟 3s 再次提交这个消息拉取请求再次去处理。
最后我们再来看下解析结果的方法,也就是 processPullResponse
。
/*** 处理 broker 返回的拉取消息的结果* @param response* @param addr* @return* @throws MQBrokerException* @throws RemotingCommandException*/
private PullResult processPullResponse(final RemotingCommand response,final String addr) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;// 状态码switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark(), addr);}// 解析响应头PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);// 返回结果return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
这里注释也说得比较清楚,就是解析出 broker 的返回 code,然后封装一个 PullResultExt 对象,将下一次要拉取的偏移量、最小偏移量、最大偏移量、下次建议从哪个 broker 继续拉取,消息体返回。
4. 小结
这篇文章主要是说了 PullMessageService 是如何处理 PullRequest 的,同时也看了发起请求的过程,那么 broker 是如何处理消息拉取请求的,还有 pullCallback 里面的 onSuccess 是如何处理的,后面的文章再来探讨。
如有错误,欢迎指出!!!