转载自:
https://blog.csdn.net/javahongxi/article/details/72956608
https://www.cnblogs.com/tudachui/p/10998984.html
1. 簡介RocketMQ
RocektMQ是阿里巴巴在2012年开源的一个纯java、分布式、队列模型的第三代消息中间件,不仅在传统高频交易链路有着低延迟的出色表现,在实时计算等大数据领域也有着不错的吞吐。
2016年11月11号,双十一大促见证了RocketMQ低延迟存储架构的成功试水,99.996%的延迟落在了10ms以内,极个别由于GC引发的停顿在50ms以内,其高性能、低延时和高可靠的特性承载了近年来双十一17万笔/秒的交易峰值,在整个生产链路上都有着稳定和出色的表现。其在同年捐赠给Apache后正式进入孵化期。并于2017年9月RocketMQ正式从Apache社区正式毕业,成为Apache顶级项目。
RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。RocketMQ集群中包含4个模块:Namesrv, Broker, Producer, Consumer。
Namesrv: 存储当前集群所有Brokers信息、Topic跟Broker的对应关系。
Broker: 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。
Producer: 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID。同一个ID下所有实例组成一个生产者集群。
Consumer: 消息消费者,每个订阅者也有一个ID(编号),多个消费者实例可以共用同一个ID。同一个ID下所有实例组成一个消费者集群。
官方简介:
1. Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息。
Producer Group
一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。
2. Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费。
Push Consumer
Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立 刻回调Listener接口方法。
Pull Consumer
Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制。
Consumer Group
一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。
3. Broker
消息中转角色,负责存储消息,转发消息,一般也称为Server。
Master
Broker中的主节点。
Slave
Broker中的副节点。
4. Nameserver
专为RocketMQ设计的轻量级名称服务。集群中Nameserver互相独立,彼此没有通信关系,单台Nameserver挂掉,不影响其他Nameserver,即使全部挂掉,也不影响业务系统使用。而且Nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
5. 广播消费
一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次,广播消费中的Consumer Group概念可以认为在消息划分方面无意义。
6. 集群消费
一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息。
7. 顺序消息
消费消息的顺序要同发送消息的顺序一致,在RocketMQ中,主要指的是局部顺序,即一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。
8. 普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
9. 严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。
10. Message Queue
在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组,offset就是下标。
11. 异步复制
消息写入master节点,再由master节点异步复制到slave节点,类似mysql中的master-slave机制。
12. 同步双写
消息同时写入master节点和slave节点。
13. 异步刷盘
Broker的一种持久化策略,消息写入pagecache后,直接返回。由异步线程负责将pagecache写入硬盘。
14. 同步刷盘
Broker的一种持久化策略,消息写入pagecache后,由同步线程将pagecache写入硬盘后,再返回。
3. RocketMQ集群部署模式
RocketMQ作为消息中间件,其主要功能为消息的Publish/Subscribe。而Broker担任的消息转发和存储功能,其部署方式有很多种:
1. 单Master
优点:除了配置简单没什么优点。
缺点:不可靠,该机器重启或宕机,将导致整个服务不可用。
2. 多Master
优点:配置简单,性能最高。
缺点:可能会有少量消息丢失,单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性。
3. 异步多Master多Slave
每个Master配一个Slave,有多对Master-Slave,集群采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预。
缺点:Master宕机或磁盘损坏时会有少量消息丢失。
4. 同步多Master多Slave
每个Master配一个Slave,有多对Master-Slave,集群采用同步双写方式,主备都写成功,向应用返回成功。
优点:服务可用性与数据可用性非常高。
缺点:性能比异步集群略低,当前版本主宕备不能自动切换为主。
4. RocketMQ集群特性分析
集群工作流程:
1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
模块及关键点:
1. Nameserver
Nameserver的开发旨在轻量级,多台Nameserver互相独立,彼此间互不通信,这样的设计,保证了单台Nameserver宕机,不影响Nameserver。nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
Namesrv用于存储Topic、Broker关系信息,功能简单,稳定性高。多个Namesrv之间相互没有通信,单台Namesrv宕机不影响其他Namesrv与集群;即使整个Namesrv集群宕机,已经正常工作的Producer,Consumer,Broker仍然能正常工作,但新起的Producer, Consumer,Broker就无法工作。
Namesrv压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向Namesr发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致Namesrv误认为Broker心跳失败。
2. Broker与NameServer关系
连接:
每个Broker与系统中所有的Nameserver保持长连接。
心跳间隔:
每隔30秒(此时间无法更改)向所有Nameserver发送心跳,心跳包含了自身的topic配置信息。
心跳超时:
Nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的Broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。
断开:
一旦连接断开,Nameserver会立即更新topic与队列的对应关系,但不会通知生产者和消费者。
负载均衡:
一个topic分布在多个Broker上,一个broker可以配置多个topic,它们是多对多的关系。如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。
可用性:
由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。
两个关键点:
1. 一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。
2. 消费者得到master宕机通知后,转向slave消费,但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。
可靠性:
所有发往broker的消息,有同步刷盘和异步刷盘机制,总的来说,可靠性非常高。
同步刷盘时,消息写入物理文件才会返回成功,因此非常可靠。
异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电。
3. Consumer与NameServer关系
连接:
单个消费者和一台nameserver保持长连接。定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。
心跳:
与nameserver没有心跳。
轮询时间:
默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。
4. Consumer与Broker关系
连接:
单个消费者和该消费者关联的所有broker保持长连接。
心跳:
默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。
断开:
消费者挂掉;心跳超时导致broker主动关闭连接。
动作:
一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。
负载均衡:
集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
消费机制
本地队列:
消费者不间断的从broker拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消息队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer的pullThresholdForQueue属性控制,默认1000,可手动设置。
轮询间隔:
消息拉取线程拉取间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置。
消息消费数量:
监听器每次接受本地队列的消息数量是由参数DefaultMQPushConsumer的consumeMessageBatchMaxSize属性控制,默认为1,可手动设置。
消费进度存储:
每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置。
5. Producer与Nameserver关系
连接:
单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。
心跳:
与nameserver没有心跳。
6. Producer与Broker关系
连接:
单个生产者和该生产者关联的所有broker保持长连接。
心跳:
默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。
断开:
移除Broker上的生产者信息。
负载均衡:
生产者时间没有关系,每个生产者向队列轮流发送消息。
7. Broker
1,高并发读写服务
Broker的高并发读写主要是依靠以下两点:
消息顺序写,所有Topic数据同时只会写一个文件,一个文件满1G,再写新文件,真正的顺序写盘,使得发消息TPS大幅提高。
消息随机读,RocketMQ尽可能让读命中系统pagecache,因为操作系统访问pagecache时,即使只访问1K的消息,系统也会提前预读出更多的数据,在下次读时就可能命中pagecache,减少IO操作。
2,负载均衡与动态伸缩
负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。
动态伸缩能力(非顺序消息):Broker的伸缩性体现在两个维度:Topic, Broker。
Topic维度:假如一个Topic的消息量特别大,但集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。
Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker就可以。Broker起来后向Namesrv注册,Producer、Consumer通过Namesrv发现新Broker,立即跟该Broker直连,收发消息。
3,高可用&高可靠
高可用:集群部署时一般都为主备,备机实时从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。
高可靠:所有发往broker的消息,有同步刷盘和异步刷盘机制;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电
4,Broker与Namesrv的心跳机制
单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。Namesrv会反查Broer的心跳信息,如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Producer、Consumer有Broker宕机。
8. 消费者
消费者启动时需要指定Namesrv地址,与其中一个Namesrv建立长连接。消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。连接建立后,从namesrv中获取当前消费Topic所涉及的Broker,直连Broker。
Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳,就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。
消费者端的负载均衡
先讨论消费者的消费模式,消费者有两种模式消费:集群消费,广播消费。
广播消费:每个消费者消费Topic下的所有队列。
集群消费:一个topic可以由同一个ID下所有消费者分担消费。具体例子:假如TopicA有6个队列,某个消费者ID起了2个消费者实例,那么每个消费者负责消费3个队列。如果再增加一个消费者ID相同消费者实例,即当前共有3个消费者同时消费6个队列,那每个消费者负责2个队列的消费。
消费者端的负载均衡,就是集群消费模式下,同一个ID的所有消费者实例平均消费该Topic的所有队列。
9. 生产者
Producer启动时,也需要指定Namesrv的地址,从Namesrv集群中选一台建立长连接。如果该Namesrv宕机,会自动连其他Namesrv。直到有可用的Namesrv为止。
生产者每30秒从Namesrv获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。
生产者端的负载均衡
生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上。
这里需要注意一点:假如某个Broker宕机,意味生产者最长需要30秒才能感知到。在这期间会向宕机的Broker发送消息。当一条消息发送到某个Broker失败后,会往该broker自动再重发2次,假如还是发送失败,则抛出发送失败异常。业务捕获异常,重新发送即可。客户端里会自动轮询另外一个Broker重新发送,这个对于用户是透明的。
10. 小结
以上论述了RocketMQ各组件特性和关系,除此之外,在允许可自动创建Topic的配置下,Producer负责创建Topic消息和发送消息,发送消息支持三种方式,异步、同步和onway方式。就总体的消息传输层面来说,RocketMQ有集群模式和广播模式,默认是集群模式,集群模式以其原生的Consumer Group(消费组)实现了负载均衡,广播模式下,Consumer Group下的每个Consumer实例则要消费全部的消息数据。就业务适用层面来说,rocketMQ可以根据业务需求,实现Order Message(订单消息)、Broadcasting(广播消息)、Scheduled Message(延迟消息)、Batch(同Topic批量消息发送)、Filter(基本消息过滤和高级消息过滤)、OpenMessaging(流式消息发送)等。
5. RocketMQ分布式部署实践案例
此处就RocketMQ的 多Master多Slave 的模式在Linux服务器部署案例进行详细的说明
1. 本次部署环境
为了方便演示,此处使用的是root权限,生产环境下可能需要堡垒机。
Linux服务器192.168.121.100、192.168.121.200 两台(下文均简称100、200),详细部署环境示意表如下:
服务器配置表
2. 编辑Hosts
分别修改235 和236 的hosts 文件
1 | sudo vim /etc/hosts |
修改hosts 文件需获得sudo 权限,可申请堡垒机权限(即获得root权限)。
3. 下载官方源码
下载官方RocketMQ压缩包,下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.2.0/, 并选择Download the 4.2.0 release 选项的 rocketmq-all-4.2.0-bin-release.zip下载。(其他如source为需要自己编译的版本)
4. 上传到Linux并解压
分别上传rocketmq-all-4.2.0-bin-release.zip到100和200服务器的/usr/local/geek/rocketMQ-2m2s/目录下:
1 | cd /usr/local/geek/rocketMQ-2m2s |
5. 创建持久化存储目录
分别在 100 和 200 的 /usr/local/geek/rocketMQ-2m2s
目录中设置:
Master目录设置
1 | mkdir store |
Slave目录设置
1 | mkdir store-s |
6. RocketMQ配置文件
100 服务器配置
1 | vim conf/2m-2s-async/broker-a.properties |
200 服务器配置
1 | vim conf/2m-2s-async/broker-b.properties |
broker-a.properties文件配置
1 | # brokerClusterName=DefaultCluster |
broker-a-s.properties文件配置
1 | #brokerClusterName=DefaultCluster |
broker-b.properties文件配置(但要更改IP)
参考broker-a.properties
broker-b-s.properties文件配置(但要更改IP)
参考broker-a-s.properties
7. 启动参数设置
RocketMQ启动文件位于/usr/local/geek/rocketMQ-2m2s/bin/目录下,Linux中nameserver启动文件为:mqnamesrv,broker启动文件为:mqbroker,mqnamesrv和mqbroker启动文件分别调用了runserver.sh和runbroker.sh文件,这两个文件分别设置了nameserver和broker的启动内存,目前内存启动参数分别为nameserver启动内存4G,最大内存4G,新生代2G,broker启动内存8G,最大内存8G,新生代4G。
8. 端口及防火墙设置
RokcetMQ启动默认使用3个端口9875,10911,10912,三个端口分别代表nameserver服务器端口,broker端口,broker HA端口。需注意的是在多Master多Slave模式下10911和10912是Master的使用端口,但Slave端口的设置与Master的端口不同,具体端口约束为:Slave - Master > 2,否则可能导致同一台服务器无法同时启动Master和Slave。
如果服务器启动了防火墙,为了端口不被屏蔽,需将Master和Slave对应端口加入到iptables表以开放对应端口号,添加完成后重启防火墙。命令行开放端口操作如下:
分别打开100和200终端,在root用户下执行命令:
1 | [root@haoransun rocketMQ-2m2s]# firewall-cmd --zone=public --add-port=9876/tcp --permanent |
9. 启动Nameserver
分别启动100、200的Nameserver
1 | cd /usr/local/geek/rocketMQ-2m2s/bin/ |
10. 启动Broker
100上Master启动:
1 | nohup sh mqbroker -c /usr/local/geek/rocketMQ-2m2s/conf/2m-2s-async/broker-a.properties |
200上Master启动:
1 | nohup sh mqbroker -c /usr/local/geek/rocketMQ-2m2s/conf/2m-2s-async/broker-b.properties |
100上对应200Master的Slave启动:
1 | nohup sh mqbroker -c /usr/local/geek/rocketMQ-2m2s/conf/2m-2s-async/broker-b-s.properties |
200上对应100Master的Slave启动:
1 | nohup sh mqbroker -c /usr/local/geek/rocketMQ-2m2s/conf/2m-2s-async/broker-a-s.properties |
至此,Nameserver、Broker启动完成,可以用jobs命令查看当前运行进程,如下是服务端相关shutdown,即在bin目录下:
1 | sh mqshutdown namesrvsh mqshutdown broker |
6. RocketMQ监控平台部署
Apache版的RocketMQ管理界面部署工具可以从github上下载源码,地址:https://github.com/apache/rocketmq-externals, 部署流程如下:
1 | git clone https://github.com/apache/rocketmq-externals |
1. 修改配置文件,关联rocketMQ集群到管理界面
首先解压并进入解压后rockemq-externals-master目录rocketmq-externals-master/rocketmq-externals-master/rocketmq-console/src/main/resources,修改目录下application.properties配置文件内容如下图:
2. 编译rocketmq-console
mvn clean package -Dmaven.test.skip=true
编译需用maven命令进行编译,如下图,显示BIUD SUCCESS,则编译成功,成功后会在rocketmq-externals-master/rocketmq-console/target目录下产生一个rocketmq-console-ng-1.0.0.jar文件。
3. 将编译好的rocketmq-console-ng-0.0.jar包上传linux服务器
这里上传服务器地址为192.168.162.100,路径为:/usr/local/geek/jar
4. 运行jar包
java -jar rocketmq-console-ng-1.0.1.jar