文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢?

在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送”两部分进行了详细的阐述,本文将主要从消息消费为切入点简要地介绍下“RocketMQ中Pull和Push的两种消费方式”、“RocketMQ中消费者(Push模式)的启动流程”和“RocketMQ中Pull和Push两种消费方式的简要流程”。在阅读本篇之前希望读者能够先仔细阅读下关于RocketMQ分布式消息队列的前几篇文章:

http://www.6aiq.com/article/1563128272857

http://www.6aiq.com/article/1563128435731

http://www.6aiq.com/article/1563129642050

一、如何选择消息消费的方式—Pull or Push?

1.1 MQ中Pull和Push的两种消费方式

对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:

(1) Push方式:由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是**“慢消费问题” ),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;

(2)Pull方式:由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”**)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

1.2 RocketMQ消息消费的长轮询机制

思考题

上面简要说明了Push和Pull两种消息消费方式的概念和各自特点。如果长时间没有消息,而消费者端又不停的发送Pull请求不就会导致RocketMQ中Broker端负载很高吗?那么在RocketMQ中如何解决以做到高效的消息消费呢?

通过研究源码可知,RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡上面Push/Pull模型的各自缺点。基本设计思路是:消费者如果第一次尝试Pull消息失败(比如:Broker端没有可以消费的消息),并不立即给消费者客户端返回Response的响应,而是先hold住并且挂起请求(将请求保存至pullRequestTable本地缓存变量中),然后Broker端的后台独立线程—PullRequestHoldService会从pullRequestTable本地缓存变量中不断地去取,具体的做法是查询待拉取消息的偏移量是否小于消费队列最大偏移量,如果条件成立则说明有新消息达到Broker端(这里,在RocketMQ的Broker端会有一个后台独立线程—ReputMessageService不停地构建ConsumeQueue/IndexFile数据,同时取出hold住的请求并进行二次处理),则通过重新调用一次业务处理器—PullMessageProcessor的处理请求方法—processRequest()来重新尝试拉取消息(此处,每隔5S重试一次,默认长轮询整体的时间设置为30s)。

RocketMQ消息Pull的长轮询机制的关键在于Broker端的PullRequestHoldService和ReputMessageService两个后台线程。对于RocketMQ的长轮询(LongPolling)消费模式后面会专门详细介绍。

二、RocketMQ中两种消费方式的demo代码

(1)Pull模式的Consumer端代码如下:

        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("consumer");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest111");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            System.out.println(pullResult.getMsgFoundList().get(0).toString());
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case NO_MATCHED_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    //TODO
                }
            }
        }
        consumer.shutdown();

在示例代码中,可以看到业务工程在Consumer启动后,Consumer主动获取MessageQueue的Set集合,遍历该集合中的每一个队列,发送Pull的请求(参数中带有队列中的消息偏移量),同时需要Consumer端自己保存消息消费的offset偏移量至本地变量中。在Pull模式下,需要业务应用代码自身去完成比较多的事情,因此在实际应用中用的较少。

(2)Push模式的Consumer端代码如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest111", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setInstanceName("consumer1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

在示例代码中,业务工程的应用程序使用Push方式进行消费时,Consumer端注册了一个监听器,Consumer在收到消息后主动调用这个监听器完成消费并进行对应的业务逻辑处理。由此可见,业务应用代码只需要完成消息消费即可,无需参与MQ本身的一些任务处理(ps:业务代码显得更为简洁一些)。

三、RocketMQ中消费者Push方式的启动流程

这一节主要先讲下RocketMQ消费者的启动流程,看下在启动的时候究竟完成了什么样的操作。由于RocketMQ的DefaultMQPushConsumer和DefaultMQPullConsumer启动流程大部分类似,而DefaultMQPushConsumer更为复杂一些,因此这一节内容主要讲的是DefaultMQPushConsumer启动流程。Push方式的Consumer启动流程的时序图如下图所示:

RocketMQ的PushConsumer启动时序图.jpg

从上面的时序图上可以看出,Push方式的Consumer启动流程完成的任务比较多,主要任务如下:

(1)设置consumerGroup、NameServer服务地址、消费起始偏移地址并根据参数Topic构建Consumer端的SubscriptionData(订阅关系值);

(2)在Consumer端注册消费者监听器,当消息到来时完成消费消息;

(3)启动defaultMQPushConsumerImpl实例,主要完成前置校验、复制订阅关系(将defaultMQPushConsumer的订阅关系复制至rebalanceImpl中,包括retryTopic(重试主题)对应的订阅关系)、创建MQClientInstance实例、设置rebalanceImpl的各个属性值、pullAPIWrapper包装类对象的初始化、初始化offsetStore实例并加载消费进度、启动消息消费服务线程以及在MQClientInstance中注册consumer等任务;

(4)启动MQClientInstance实例,其中包括完成客户端网络通信线程、拉取消息服务线程、负载均衡服务线程和若干个定时任务的启动;

(5)向所有的Broker端发送心跳(采用加锁方式);

(6)最后,唤醒负载均衡服务线程在Consumer端开始负载均衡;

四、RocketMQ中Pull和Push两种消费模式流程简析

RocketMQ提供了两种消费模式,Push和Pull,大多数场景使用的是Push模式,在源码中这两种模式分别对应的是DefaultMQPushConsumer类和DefaultMQPullConsumer类。Push模式实际上在内部还是使用的Pull方式实现的,通过Pull不断地轮询Broker获取消息,当不存在新消息时,Broker端会挂起Pull请求,直到有新消息产生才取消挂起,返回新消息。

(1)RocketMQ的Pull消费模式流程简析

RocketMQ的Pull模式相对来得简单,从上面的demo代码中可以看出,业务应用代码通过由Topic获取到的MessageQueue直接拉取消息(最后真正执行的是PullAPIWrapper的pullKernelImpl()方法,通过发送拉取消息的RPC请求给Broker端)。其中,消息消费的偏移量需要Consumer端自己去维护。

(2)RocketMQ的Push消费模式流程简析

在本文前面已经提到过了,从严格意义上说,RocketMQ并没有实现真正的消息消费的Push模式,而是对Pull模式进行了一定的优化,一方面在Consumer端开启后台独立的线程—PullMessageService不断地从阻塞队列—pullRequestQueue中获取PullRequest请求并通过网络通信模块发送Pull消息的RPC请求给Broker端。另外一方面,后台独立线程—rebalanceService根据Topic中消息队列个数和当前消费组内消费者个数进行负载均衡,将产生的对应PullRequest实例放入阻塞队列—pullRequestQueue中。这里算是比较典型的生产者-消费者模型,实现了准实时的自动消息拉取。然后,再根据业务反馈是否成功消费来推动消费进度。

在Broker端,PullMessageProcessor业务处理器收到Pull消息的RPC请求后,通过MessageStore实例从commitLog获取消息。如1.2节内容所述,如果第一次尝试Pull消息失败(比如Broker端没有可以消费的消息),则通过长轮询机制先hold住并且挂起该请求,然后通过Broker端的后台线程PullRequestHoldService重新尝试和后台线程ReputMessageService的二次处理。

思考题

使用RocketMQ的Pull模式进行消息消费时,由上面可知该模式下无需自动拉取消息,这样在DefaultMQPullConsumerImpl启动时,消息拉取线程—PullMessageService和消息队列负载线程—RebalanceService其实也就没必要启动,但实际上却启动了,这里会有问题么?

五、总结

RocketMQ的消息消费(一)(入门篇幅)就先分析到这里了。建议读者可以将作者之前写的三篇文章—“RocketMQ的RPC通信(一)/(二)”以及“RocketMQ消息发送”结合起来读,这样会整体会更加连贯,收获更大。关于RocketMQ消息消费的内容比较多也比较复杂,涉及“Consumer端的负载均衡机制”、“RocketMQ的长轮询机制”和“RocketMQ中Pull�