博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketMq-consumer介绍
阅读量:7097 次
发布时间:2019-06-28

本文共 2045 字,大约阅读时间需要 6 分钟。

说到rocketMq的consumer,该篇文章特指pushConsumer,pullConsumer在后续文章中在分享。

提到consumer,需要搞清楚几个核心问题,分别是consumer的初始化过程做了哪些事情,消息是如何消费,consumer如何动态平衡的,整个逻辑还是比较绕的,其中这章节主要会讲清楚两个事情,1、初始化过程中client做了哪些事情;2、consumer如何动态平衡拉取任务,具体的任务消费会由额外的一章进行讲解。

consumer的初始化过程

img_ac2a8284cc77aaf8d7822c669904675c.png
consumer初始化过程

说明:整体执行过程如下,着重介绍subscribe和start两个过程

    1、创建consumer并设置消费分组

    2、设置消费位移

    3、设置订阅topic

    4、设置消费执行的回调函数

    5、启动consumer

consumer的初始化流程图

img_2039554580e787c1dd5186ace4fa5e46.png
初始化流程图

consumer内部初始化过程

说明:整个初始化比较复杂,为了大家能够理解,先用简单的语句概述一遍

    1、构建consumer的订阅信息,包括consumer本身的订阅和消费分组的重试队列。

    2、创建Rebalance服务,该服务每隔20s进行消费端负责的messageQueue的消费。

    3、启动消费偏移量获取服务,获取上一次消费位移。

    4、启动定时任务,其中核心任务之一是定时去namesrv拉取broker信息。

    5、启动pullMessageService,负责从broker拉取待消费消息

    6、启动rebalanceService,负责定期调整consumer端负载均衡包括第一次触发拉取任务

    7、其中rebalanceService和pullMessageService相互配合使用,前者负责将新加入messageQueue拉取任务加入到pullMessageservice当中,将旧的messageQueue的拉取任务从pullMessageService中停止,两者之间通过消息队列的形式进行通信。

构建subscription过程

img_19af04289dd875165b3e4ad0b534ca14.png
构建subscribe信息

说明:参见DefaultMQPushConsumerImpl类

    1、订阅消息最后保存至RebalanceImpl当中,因为这个是后面动态负载均衡的核心。

client端启动过程

img_d5beaaed41bdcfdfbabe384fd975b14d.png
consumer启动过程-注册订阅消息过程

说明:参见DefaultMQPushConsumerImpl类

img_8846910e7c95992fd53946e76b04bc79.png
consumer启动过程-注册回调函数并启动一系列服务

说明:参见DefaultMQPushConsumerImpl类

    1、启动了获取消费进度的服务

img_be0046a85735d2a0708f5585ff3f64d0.png
consumer核心逻辑-启动核心消费逻辑

说明:参见MQClientInstance类

    1、启动定时任务,主要是从namsrv中拉取broker的信息

    2、启动client从broker拉取消息的服务

    3、启动Rebalance服务,负责触发消息拉取的任务

    4、步骤3和步骤4之间的两个服务通过消息队列通信

img_bec507ebe9802b85b16814da0e462122.png
定时拉取broker信息

说明:参见MQClientInstance类

    1、负责从namesrv拉取broker的信息

拉取任务的执行过程

img_cebec6930a29f81e9f8f6f510bfd74c0.png
定时获取拉取任务执行消息拉取

说明:参见PullMessageService类

    1、负责从pullRequestQueue中获取拉取任务并执行,该任务由Rebalance服务投递

拉取任务的生成过程

img_70731a0516bc9c2707b5814da13224e3.png
定期动态消费负载均衡

说明:参见RebalanceService类

    1、consumer端负载均衡的入口

img_39d3c19291b99a4aafd9c7b0bc3cf760.png
针对每个consumer动态调整负载均衡

说明:参见MQClientInstance类

    1、每个consumer客户端只会有一个对象,所以这里for循环只有一次。

img_c531c1085d610c866c47a598e6b98d1a.png
针对每个topic进行负载均衡

说明:参见MQClientInstance类

1、针对每个订阅信息都进行动态负责均衡,包括consumer本身的订阅分组和consumerGroup的重试分组。

img_7035c06265e817c429cab88ab5052c76.png
针对topic下的messageQueue和consumer进行动态负载均衡

说明:参见RebalanceImpl类

    1、动态负载均衡就是一个topic下所有的messageQueue和消费分组里面的消费者按照一定的动态调整策略进行分配,同一个消费分组里面的消费者每人负责一部分的messageQueue。

img_e1ed7d9ed9023dc8b1b108f149598eed.png
动态调整拉取任务

说明:参见RebalanceImpl类

    1、consumer新负责的messageQueue加入到拉取任务当中来

        2、consumer不负责的messageQueue从拉取任务中剔除。

img_3a300c6abf2efcd5c569d189ae3fc26f.png
新增消息拉取任务

说明:参见PullMessageService类

img_e1553253db21ea08a7eba05a9bdddfcc.png
投递待拉取消息任务

说明:参见PullMessageService类

订阅重试队列逻辑

img_8a8bb3f3a6f40405e6dc39cd68e77bd8.png
注册%retry%@consumerGroup消费分组

说明:

    核心代码逻辑,这个表明了consumer订阅了重试队列并对重试队列进行消费。

转载地址:http://haaql.baihongyu.com/

你可能感兴趣的文章
Git的学习与使用(八)——Git 查看提交历史
查看>>
使用vagrant快速搭建linux实验环境
查看>>
用手机如何把PDF转成PPT文件
查看>>
LVM逻辑卷创建管理
查看>>
修饰器的简单用法
查看>>
所有者和所属组
查看>>
中文核心期刊发表要多少钱呢
查看>>
程序猿福利来啦,神目AI开放平台免费送人脸识别SDK啦
查看>>
五星好评的手机APP,拿走不谢
查看>>
iOS开发之网络编程--获取文件的MIMEType
查看>>
Oracle的云成功公式ERP Cloud Plus自治数据库
查看>>
数独(简易九宫格)
查看>>
深圳宏旺半导体,十五年国产存储芯片的研发坚持,自主品牌ICMAX亮相
查看>>
Windows搭建漏洞环境
查看>>
Android 数据传输要注意的地方
查看>>
大数据教程(8.6)yarn客户端提交job的流程梳理和总结&自定义partition编程
查看>>
Mysql 分区介绍(三) —— LIST分区
查看>>
关于 elasticsearch delete by query
查看>>
Nginx的405错误
查看>>
描述OSI参考模型各层的主要功能
查看>>