摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢?

在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下RocketMQ中“Pull和Push两种消费方式的简要流程”以及“Push消费方式的启动流程”(ps:如果不熟悉这几块内容的童鞋,可以自己回顾下上一篇的内容)。本文将详细介绍RocketMQ中Push消费方式下的“Pull消息的长轮询机制”和“Consumer端的负载均衡机制”这两块关键核心内容。

由于RocketMQ系列的技术分享存在一定的连续性,因此希望读者能回顾下往期RocketMQ分享的篇幅:

http://www.6aiq.com/article/1563128272857

http://www.6aiq.com/article/1563128435731

http://www.6aiq.com/article/1563129642050

http://www.6aiq.com/article/1563129820252

一、RocketMQ中长轮询的Pull消息机制

在上一篇中,已经简略地介绍过RocketMQ中消息消费时Pull消息的长轮询机制了,其主要的思路是:Consumer如果第一次尝试Pull消息失败(比如:Broker端没有可以消费的消息),并不立即给消费者客户端返回Response的响应,而是先hold住并且挂起请求。然后在Broker端,通过后台独立线程—PullRequestHoldService重复尝试执行Pull消息请求来取消息。同时,另外一个ReputMessageService线程不断地构建ConsumeQueue/IndexFile数据,并取出hold住的Pull请求进行二次处理。通过这种长轮询机制,即可解决Consumer端需要通过不断地发送无效的轮询Pull请求,而导致整个RocketMQ集群中Broker端负载很高的问题。

1.1 Consumer向Broker端发送Pull消息请求的主要过程

在RocketMQ的Consumer端,后台独立线程服务—pullMessageService是Pull消息请求的发起者,它不断地尝试从阻塞队列—LinkedBlockingQueue中获取元素PullRequest,并根据pullRequest中的参数以及订阅关系信息调用pullAPIWrapper的pullKernelImpl()方法发送封装后的Pull消息请求—PullMessageRequestHeader至Broker端来拉取消息(具体完成发送一次Pull消息的PRC通信请求的是MQClientAPIImpl中的pullMessage()方法)。这里涉及细节的时序图(ps:时序图中没有涉及PRC异步通信中的callback过程)如下:

Consumer向Broker端发送长轮询请求的时序图.jpg

其中, DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法是发送Pull消息请求的关键:

(1)校验ProcessQueue是否“drop”, 如果为“drop”为true则直接返回(这个“drop”的设置在下面一节—“Consumer端的负载均衡机制”中会提到);

(2)给ProcessQueue设置Pull消息的时间戳;

(3)做流量控制,对于满足下面条件的任何一种情况,稍后再发起Pull消息的请求;

条件1:正在消费的队列中,未被消费的消息数和消息大小超过阀值(默认每个队列消息数为1000个/消息存储容量100MB);

条件2:如果是顺序消费,正在消费的队列中,消息的跨度超过阀值(默认2000);

(4)根据topic获取订阅关系—SubscriptionData;

(5)构建Pull消息的回调对象—PullBack,这里从Broker端Pull消息的返回结果处理是通过异步回调(发送异步通信RPC请求),其中如果Broker端返回Pull消息成功,在回调方法中先填充至处理队列—processQueue中(将Pull下来的消息,设置到ProcessQueue的msgTreeMap容器中),然后通过消费消息的服务线程—consumeMessageService,将封装好的ConsumeRequest提交至消费端消费线程池—consumeExecutor异步执行处理(具体处理逻辑:通过业务应用系统在DefaultMQPushConsumer实例中注册的消息监听器完成业务端的消息消费);

(6)从Consumer端内存中获取commitOffsetValue;

(7)通过RocketMQ的Remoting通信层向Broker端发送Pull消息的RPC请求;

1.2 Broker端处理Pull消息请求的一般过程

这里先来说下对于一般情况下(即为所要Pull的消息在RocketMQ的Broker端已经是存在,一般可以Pull到的情况),Broker端处理这个Pull消息请求的主要过程。其时序图(ps:图中只是画了大部分的流程,详细细节还需要对照源码看下)如下:

Broker端接受长轮询请求的处理时序图.jpg

从上面的简易时序图中可以看到Broker端Pull消息的主要关键点如下:

(1)Pull消息的业务处理器—PullMessageProcessor的processRequest为处理拉取消息请求的入口,在设置reponse返回结果中的opaque值后,就完成一些前置的校验(Broker是否可读、Topic/ConsumerGroup是否存在、读取队列Id是否在Topic配置的队列范围数内);

(2)根据“ConsumerGroup”、“Topic”、“queueId”和“offset”这些参数来调用MessageStore实例的getMessage()方法来产尝试读取Broker端的消息;

(3)其中,通过findConsumeQueue()方法,获取逻辑消费队列—ConsumeQueue;

(4)根据offset与逻辑消费队列中的maxOffset、minOffset的比较,来设置状态值status,同时计算出下次Pull消息的开始偏移量值—nextBeginOffset,然后通过MappedFile的方式获取ConsumeQueue的Buffer映射结果值;

(5)根据算出来的offsetPy(物理偏移量值)和sizePy(消息的物理大小),从commitLog获取对应消息的Buffer映射结果值,并填充至GetMessageResult返回对象,并设置返回结果(状态/下次其实偏移量/maxOffset/minOffset)后return;

(6)根据isTransferMsgByHeap的设置情况(默认为true),选择下面两种方式之一来真正读取GetMessageResult的消息内容并返回至Consumer端;

方式1:使用JDK NIO的ByteBuffer,循环地读取存有消息内容的messageBufferList至堆内内存中,返回byte[]字节数组,并设置到响应的body中;然后,通过RPC通信组件—NettyRemotingServer发送响应至Consumer端;

方式2:采用基于Zero-Copy的Netty组件的FileRegion,其包装的“FileChannel.tranferTo”实现文件传输,可以直接将文件缓冲区的数据发送至通信目标通道Channel中,避免了通过循环write方式导致的内存拷贝开销,这种方式性能上更优;

(7)在PullMessageProcessor业务处理器的最后,提交并持久化消息消费的offset偏移量进度;

1.3 Broker端对于Pull请求挂起处理的流程

说完了Pull消息请求的一般流程,下面主要看下Broker端的PullMessageProcessor业务处理器在RocketMQ中还没有消息可以拉取情况下( 即为:PULL_NOT_FOUND)的处理流程,本节内容也是RocketMQ中长轮询机制的关键。

长轮询机制是对普通轮询的一种优化方案,它平衡了传统Push/Pull模型的各自缺点,Server端如果当前没有Client端请求拉取的相关数据会hold住这个请求,直到Server端存在相关的数据,或者等待超时时间后返回。在响应返回后,Client端又会再次发起下一次的长轮询请求。RocketMQ的push模式正是采用了这种长轮询机制的设计思路,如果在上面所述的第一次尝试Pull消息失败后(比如Broker端暂时没有可以消费的消息),先hold住并且挂起该请求(这里,设置返回响应response为null,此时不会向Consumer端发送任何响应的内容,即不会对响应结果进行处理),然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService的二次处理。在Broker端,两个后台线程服务PullRequestHoldService和ReputMessageService是实现长轮询机制的关键点。下面就来分别介绍这两个服务线程:

(1)PullRequestHoldService:该服务线程会从pullRequestTable本地缓存变量中取PullRequest请求,检查轮询条件—“ 待拉取消息的偏移量是否小于消费队列最大偏移量”是否成立,如果条件成立则说明有新消息达到Broker端,则通过PullMessageProcessor的executeRequestWhenWakeup()方法重新尝试发起Pull消息的RPC请求(此处,每隔5S重试一次,默认长轮询整体的时间设置为30s);

(2)ReputMessageService:该服务线程会在Broker端不断地从数据存储对象—commitLog中解析数据并分发请求,随后构建出ConsumeQueue(逻辑消费队列)和IndexFile(消息索引文件)两种类型的数据。同时从本地缓存变量—pullRequestTable中,取出hold住的PullRequest请求并执行二次处理(具体的做法是,在PullMessageProcessor的executeRequestWhenWakeup()方法中,通过业务线程池pullMessageExecutor,异步提交重新Pull消息的请求任务,即为重新调了一次PullMessageProcessor业务处理器的processRequest()方法,来实现Pull消息请求的二次处理)。这里,ReputMessageService服务线程,每处理一次,Thread.sleep(1),继续下一次处理。

二、Consumer端的负载均衡机制

看了上面一节—**“RocketMQ中长轮询的Pull消息机制”**后,大家可能会有这样子一个疑问:在Consumer端pullMessageService线程作为消息的主动拉取者不断地从阻塞队列中获取元素PullRequest,那么这里的PullRequest是在哪儿由哪个线程放入至阻塞队列中的呢?本节内容将介绍“Consumer端的负载均衡机制”,同时解答上面的疑问。

2.1 RocketMQ为何需要在Consumer端做负载均衡?

在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来Pull消息的,而在Push模式中只是采用了长轮询的方式而实现了准实时的自动消息拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列—MessageQueue中去Pull消息。因此,消息队列的负载均衡处理(即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费),由Consumer端来主动完成更为合理。

2.2 Consumer端负载均衡的主要流程

1. Consumer端的心跳包发送

在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。

2. Consumer端实现负载均衡的核心类—RebalanceImpl

在上一篇文章的"Consumer启动流程"中已经介绍了在启动MQClientInstance实例时候,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,该方法是实现Consumer端负载均衡的核心关键。

这里,rebalanceByTopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程:

(1)从rebalanceImpl实例的本地缓存变量—topicSubscribeInfoTable中,获取该Topic主题下的消息消费队列集合(mqSet);

(2)根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送获取该消费组下消费者Id列表的RPC通信请求( Broker端基于前面Consumer端上报的心跳包数据而构建的consumerTable做出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP);

(3)先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。

这里的平均分配算法,类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数,并求出每一页需要包含的平均size和每个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的记录(这里即为:MessageQueue)。具体的算法代码如下:

@Override

public List allocate(String consumerGroup, String currentCID, List mqAll,

List cidAll) {

//省略代码……

List result = new ArrayList();

//省略代码……

int index = cidAll.indexOf(currentCID);

int mod = mqAll.size() % cidAll.size();

int averageSize =

mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()

+ 1 : mqAll.size() / cidAll.size());

int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;

int range = Math.min(averageSize, mqAll.size() - startIndex);

for (int i = 0; i < range; i++) {

result.add(mqAll.get((startIndex + i) % mqAll.size()));

}

return result;

(4)然后,调用updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对,具体的过滤比对方式如下图:

RebalancePushImpl负载均衡(分发pullRequest到pullRequestQueue).jpg

这里可以分如下两种情况来筛选过滤:

**a.图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。**将这些队列设置Dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回true。如果等待1s后,仍然拿不到当前消费处理队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;

**b.图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。**判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置Dropped属性为true,并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry;

最后,为过滤后的消息队列集合(mqSet)中的每个MessageQueue创建一个ProcessQueue对象并存入RebalanceImpl的processQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要创建的pullRequest对象属性中),并创建拉取请求对象—pullRequest添加到拉取列表—pullRequestList中,最后执行dispatchPullRequest()方法,将Pull消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发起Pull消息的请求。其中,可以重点对比下,RebalancePushImpl和RebalancePullImpl两个实现类的dispatchPullRequest()方法不同,RebalancePullImpl类里面的该方法为空,这样子也就回答了上一篇中最后的那道思考题了。

三、总结

RocketMQ的消息消费(二)(push模式实现)篇幅就先分析到这里了。关于RocketMQ消息�