底层原理计划--RocketMq
创始人
2025-05-29 20:44:04
0

why
1、解耦:以前那些A服务调用BCDEF服务,万一一个服务挂了怎么办?要不要重发?解决耦合性
2、异步:有些类型的报告生成时间太长了,让用户一直卡着在不太好吧。异步后,BCDEF自己去执行写库的时间,消息队列中间件自己本身执行快。
3、削峰:节假日退出活动,用户访问量增多,如果说系统一秒钟可以处理500请求,这一秒超过500请求怎么办,mq让系统平缓处理突然增加的请求。用时间换空间。让用户请求过来,我们消费者在根据先进先出原则依次消费消息。
4、解耦、异步、削峰是mq主要的优点,mq的缺点是要保证mq可用的,不然也会影响业务。增加了系统的复杂性和开发时间。还有一致性问题。
What
Rocketmq是阿里巴巴开源的消息队列中间件,模仿kafka开发,有生产者、消费者、主题、tag等属性构成,优点是解耦、异步、削峰,提高请求速度,减少响应事件,是高并发解决方案之一。
性能:kafka>rokcketmq>rabbitmq 阿里支撑,双十一就是用的rocketmq,阿里系都在用,性能高,可靠性好,可用性高
功能:rocketmq>kafka 功能完善,基本都有开源功能:延迟时间、事务消息、消息重试
使用:用起来好用,易用,开源框架工具类多

How

rocketmq的工作流程是怎样的?
RocketMq的工作流程如下:
1)首先启动NameServer。NameServer启动后监听端口,等待Broker、Producer以及Consumer连上来
2)启动Broker。启动之后,会跟所有的NameServer建立并保持一个长连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系
3)创建Topic。创建时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
4)Producer发送消息。启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic所在的Broker;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送
5)Consumer消费消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,进行消息的消费
生产发送消息
生产者、消费者怎么实现的?原理是什么?
发送消息(同步、异步(源码在同步基础上加了普通线程池,成功、失败不同处理方案)、单向)
同步:请求成功了才会执行下一个线程,可以保证发送消息的顺序和可靠性
异步:在同步的基础上增加线程池操作,不能保证顺序和可靠性,但是可以保证速度
单向:直接发送,不要求响应了。

消费
https://blog.csdn.net/weixin_27252001/article/details/112730756
push:
理下流程:

创建消费对象,设置相关参数。
首先 new DefaultMQPushConsumer 对象,并指定一个消费组名。
然后设置相关参数,例如 nameSrvAdd、消费失败重试次数、线程数等
通过调用 setConsumeFromWhere 方法指定初次启动时从什么地方消费,默认是最新的消息开始消费。
通过调用 setAllocateMessageQueueStrategy 指定队列负载机制,默认平均分配。
通过调用 registerMessageListener 设置消息监听器,即消息处理逻辑,最终返回 CONSUME_SUCCESS(成功消费)或 RECONSUME_LATER(需要重试)。
2、设置消费的方式是pull还是push,消费的模式是集群还是广播
pull
首先根据 MQConsumer 的 fetchSubscribeMessageQueues 的方法获取 Topic 的所有队列信息
然后遍历所有队列,依次通过 MQConsuemr 的 PULL 方法从 Broker 端拉取消息。
对拉取的消息进行消费处理
通过调用 MQConsumer 的 updateConsumeOffset 方法更新位点,但需要注意的是这个方法并不是实时向 Broker 提交,而是客户端会启用以线程,默认每隔 5s 向 Broker 集中上报一次。

https://blog.csdn.net/weixin_41098980/article/details/79880957
1.集群消费方式
一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务
2.广播消费方式
一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费

//广播
BROADCASTING(“BROADCASTING”),
//集群
CLUSTERING(“CLUSTERING”);

创建的消费者订阅消息
consumerBean.subscribe(consumer.getTopic(), consumer.getTag(),
SpringUtil.getBean(consumer.getBeanName()));

源码
String messageModel = properties.getProperty(“MessageModel”, “CLUSTERING”);
this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));
MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
https://blog.csdn.net/dancheren/article/details/71325437

Action.CommitMessage不重试
Action.ReconsumeLater重试

RocketMq
是推模型还是拉模型push pull
RocketMq提供两种方式:pull和push进行消息的消费
而RocketMq的push方式,本质上也是采用pull的方式进行实现的。也就是说这两种方式本质上都是采用consumer轮询从broker拉取消息的
push方式里,consumer把轮询过程封装了一层,并注册了MessageListener监听器。当轮询取到消息后,便唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉好像消息是被推送过来的
其实想想,消息统一都发到了broker,而broker又不会主动去push消息,那么消息肯定都是需要消费者主动去拉的喽~

rocketmq不管是推模式push还是拉模式pull底层都是拉模式pull,推模式push也是在拉模式pull上做了一层封装.。
push 和 pull 的优缺点:

push

优点:

1.push模式采用长轮询阻塞的方式获取消息,实时性非常高,用户体验好

2.rocketMq处理了获取消息的细节,使用起来比较简单方便

缺点:

1.当消费者能力远远低于生产者能力的时候,会产生一定的消费者消息堆积,消息堆积会占用消费者服务的资源,主要在于内存资源

解决方案:

rocketMq针对push模式提供了流量控制,有三种,单个队列消息数量(默认1000),单个队列内存中的大小(默认100M), 消息跨度(2000), 通过这三种控制,可以有效的控制消息对消费者的影响,各位可以根据自己项目的实际情况进行调整。

pull

优点:

1.想消费多少就消费多少,想怎么消费就怎么消费,哈哈,灵活性较大,不存在过多占用消费者资源的问题

缺点:

1.实时性很低

2.拉取消息的间隔不好设置,太短则borker压力大,太长则实时性很低。

在实际生产环境中,笔者一直使用的push消息模式,pull模式这里随手写下,不做重点描述啦!

消息中间件mq场景问题
业务场景:秒杀 ——> 下单 ——> 支付
这三个核心流程中,真正并发量大的是秒杀功能,下单和支付功能实际并发量很小。
所以,我们在设计秒杀系统时,有必要把下单和支付功能从秒杀的主流程中拆分出来,
特别是下单功能要做成mq异步处理的。而支付功能,比如支付宝支付,是业务场景本身保证的异步。

1、消息丢失问题
网络超时导致消息发送失败,丢失了
解决:增加一张消息发送表
发送成功了,更新为已处理,
没有成功的再发一遍

如果写入成功了,发送到mq又失败了呢

使用job,增加重试机制

2、消费重复消费问题
增加一张消息处理表
消费者读到消息之后,先判断一下消息处理表,是否存在该消息,如果存在,表示重复消费,则直接返回。如果不存在,则正常操作,接着将消息写入消息处理表中,再返回

3、垃圾消费问题
业务场景:秒杀 ——> 下单 ——> 支付
这三个核心流程中,真正并发量大的是秒杀功能,下单和支付功能实际并发量很小。
所以,我们在设计秒杀系统时,有必要把下单和支付功能从秒杀的主流程中拆分出来,
特别是下单功能要做成mq异步处理的。而支付功能,比如支付宝支付,是业务场景本身保证的异步。

4、延时消费问题
问题背景:用户秒杀成功,下单之后,30分钟之内未进行支付,该订单会被自动取消,回退库存。
实现方法可以用job,但job有个问题,需要每隔一段时间处理一次,实时性不是很好我们还可以用延时队列,rocketMq自带了延时队列的功能
具体实现:下单时消息生产者会生成一张订单,此时的状态为待支付,然后向延时队列中发送一条消息,当到达延时时间,消息消费者读取消息之后,会查询该订单的状态是否为待支付。如 果是待支付状态,则更新订单状态为取消状态。如果不是待支付,说明该订单已经支付过了,则直接返回;
注意点: 用户完成支付之后,会修改订单状态为已支付。

Mq常见的问题:
生产者发送消息丢失、重复消费消息、消息延迟时间

Rocketmq源码解决了消息重试、延迟时间

生产者消息丢失怎么处理

怎么防止重复提交
1、提交的时候:生产唯一key先不执行业务sql, 去判断有没有存在一样的key;
或者status状态
2、提交前:判断是否符合条件

相关内容

热门资讯

跨境电商高发!多部门共织“天罗... 东方网11月23日消息:11月21日,上海市人民检察院第三分院与上海海关缉私局、上海海警局、上海市第...
法律顾问能处理旅游法律事务吗 在旅游行业蓬勃发展的当下,各类旅游法律事务也日益增多,许多人会疑惑法律顾问是否能处理旅游法律事务。答...
法律顾问能处理哪些法律事务 在当今复杂多变的商业环境中,企业面临着各种各样的法律风险和挑战,法律顾问的重要性日益凸显。那么,法律...
原创 美... 美军的撤离速度印证了威慑效果。“尼米兹”号10月中旬进入南海后,10月26日接连发生两起舰载机坠机事...
王毅:中方绝不允许日本右翼势力... 当地时间2025年11月22日,中共中央政治局委员、外交部长王毅在杜尚别同塔吉克斯坦外长穆赫里丁举行...
上海特大案件曝光!涉案金额超亿... 今年以来,公安部会同金融监管总局开展打击金融领域黑灰产违法犯罪专项工作,对保险等领域违法犯罪进行重点...
关于单方面免签政策常见问题,国... 今天(11月23日),国家移民管理局针对单方面免签政策常见问题进行解答。 1.哪些国家人员可适用单方...
“最快护士”张水华再夺冠 新京报记者 刘锦涵
 制作 葛佳丹 ▲新京报我们视频出品(ID:wevideo) 11月23日,“最...
原创 沉... 在国际关系的复杂舞台上,每一个动作都可能引发连锁反应。近期,高市早苗的发言无疑是这一舞台上的一次重要...