why
1、解耦:以前那些A服务调用BCDEF服务,万一一个服务挂了怎么办?要不要重发?解决耦合性
2、异步:有些类型的报告生成时间太长了,让用户一直卡着在不太好吧。异步后,BCDEF自己去执行写库的时间,消息队列中间件自己本身执行快。
3、削峰:节假日退出活动,用户访问量增多,如果说系统一秒钟可以处理500请求,这一秒超过500请求怎么办,mq让系统平缓处理突然增加的请求。用时间换空间。让用户请求过来,我们消费者在根据先进先出原则依次消费消息。
4、解耦、异步、削峰是mq主要的优点,mq的缺点是要保证mq可用的,不然也会影响业务。增加了系统的复杂性和开发时间。还有一致性问题。
What
Rocketmq是阿里巴巴开源的消息队列中间件,模仿kafka开发,有生产者、消费者、主题、tag等属性构成,优点是解耦、异步、削峰,提高请求速度,减少响应事件,是高并发解决方案之一。
性能:kafka>rokcketmq>rabbitmq 阿里支撑,双十一就是用的rocketmq,阿里系都在用,性能高,可靠性好,可用性高
功能:rocketmq>kafka 功能完善,基本都有开源功能:延迟时间、事务消息、消息重试
使用:用起来好用,易用,开源框架工具类多
How
rocketmq的工作流程是怎样的?
RocketMq的工作流程如下:
1)首先启动NameServer。NameServer启动后监听端口,等待Broker、Producer以及Consumer连上来
2)启动Broker。启动之后,会跟所有的NameServer建立并保持一个长连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系
3)创建Topic。创建时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic
4)Producer发送消息。启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic所在的Broker;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送
5)Consumer消费消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,进行消息的消费
生产发送消息
生产者、消费者怎么实现的?原理是什么?
发送消息(同步、异步(源码在同步基础上加了普通线程池,成功、失败不同处理方案)、单向)
同步:请求成功了才会执行下一个线程,可以保证发送消息的顺序和可靠性
异步:在同步的基础上增加线程池操作,不能保证顺序和可靠性,但是可以保证速度
单向:直接发送,不要求响应了。
消费
https://blog.csdn.net/weixin_27252001/article/details/112730756
push:
理下流程:
创建消费对象,设置相关参数。
首先 new DefaultMQPushConsumer 对象,并指定一个消费组名。
然后设置相关参数,例如 nameSrvAdd、消费失败重试次数、线程数等
通过调用 setConsumeFromWhere 方法指定初次启动时从什么地方消费,默认是最新的消息开始消费。
通过调用 setAllocateMessageQueueStrategy 指定队列负载机制,默认平均分配。
通过调用 registerMessageListener 设置消息监听器,即消息处理逻辑,最终返回 CONSUME_SUCCESS(成功消费)或 RECONSUME_LATER(需要重试)。
2、设置消费的方式是pull还是push,消费的模式是集群还是广播
pull
首先根据 MQConsumer 的 fetchSubscribeMessageQueues 的方法获取 Topic 的所有队列信息
然后遍历所有队列,依次通过 MQConsuemr 的 PULL 方法从 Broker 端拉取消息。
对拉取的消息进行消费处理
通过调用 MQConsumer 的 updateConsumeOffset 方法更新位点,但需要注意的是这个方法并不是实时向 Broker 提交,而是客户端会启用以线程,默认每隔 5s 向 Broker 集中上报一次。
https://blog.csdn.net/weixin_41098980/article/details/79880957
1.集群消费方式
一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务
2.广播消费方式
一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费
//广播
BROADCASTING(“BROADCASTING”),
//集群
CLUSTERING(“CLUSTERING”);
创建的消费者订阅消息
consumerBean.subscribe(consumer.getTopic(), consumer.getTag(),
SpringUtil.getBean(consumer.getBeanName()));
源码
String messageModel = properties.getProperty(“MessageModel”, “CLUSTERING”);
this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));
MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
https://blog.csdn.net/dancheren/article/details/71325437
Action.CommitMessage不重试
Action.ReconsumeLater重试
RocketMq
是推模型还是拉模型push pull
RocketMq提供两种方式:pull和push进行消息的消费
而RocketMq的push方式,本质上也是采用pull的方式进行实现的。也就是说这两种方式本质上都是采用consumer轮询从broker拉取消息的
push方式里,consumer把轮询过程封装了一层,并注册了MessageListener监听器。当轮询取到消息后,便唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉好像消息是被推送过来的
其实想想,消息统一都发到了broker,而broker又不会主动去push消息,那么消息肯定都是需要消费者主动去拉的喽~
rocketmq不管是推模式push还是拉模式pull底层都是拉模式pull,推模式push也是在拉模式pull上做了一层封装.。
push 和 pull 的优缺点:
push
优点:
1.push模式采用长轮询阻塞的方式获取消息,实时性非常高,用户体验好
2.rocketMq处理了获取消息的细节,使用起来比较简单方便
缺点:
1.当消费者能力远远低于生产者能力的时候,会产生一定的消费者消息堆积,消息堆积会占用消费者服务的资源,主要在于内存资源
解决方案:
rocketMq针对push模式提供了流量控制,有三种,单个队列消息数量(默认1000),单个队列内存中的大小(默认100M), 消息跨度(2000), 通过这三种控制,可以有效的控制消息对消费者的影响,各位可以根据自己项目的实际情况进行调整。
pull
优点:
1.想消费多少就消费多少,想怎么消费就怎么消费,哈哈,灵活性较大,不存在过多占用消费者资源的问题
缺点:
1.实时性很低
2.拉取消息的间隔不好设置,太短则borker压力大,太长则实时性很低。
在实际生产环境中,笔者一直使用的push消息模式,pull模式这里随手写下,不做重点描述啦!
消息中间件mq场景问题
业务场景:秒杀 ——> 下单 ——> 支付
这三个核心流程中,真正并发量大的是秒杀功能,下单和支付功能实际并发量很小。
所以,我们在设计秒杀系统时,有必要把下单和支付功能从秒杀的主流程中拆分出来,
特别是下单功能要做成mq异步处理的。而支付功能,比如支付宝支付,是业务场景本身保证的异步。
1、消息丢失问题
网络超时导致消息发送失败,丢失了
解决:增加一张消息发送表
发送成功了,更新为已处理,
没有成功的再发一遍
如果写入成功了,发送到mq又失败了呢
使用job,增加重试机制
2、消费重复消费问题
增加一张消息处理表
消费者读到消息之后,先判断一下消息处理表,是否存在该消息,如果存在,表示重复消费,则直接返回。如果不存在,则正常操作,接着将消息写入消息处理表中,再返回
3、垃圾消费问题
业务场景:秒杀 ——> 下单 ——> 支付
这三个核心流程中,真正并发量大的是秒杀功能,下单和支付功能实际并发量很小。
所以,我们在设计秒杀系统时,有必要把下单和支付功能从秒杀的主流程中拆分出来,
特别是下单功能要做成mq异步处理的。而支付功能,比如支付宝支付,是业务场景本身保证的异步。
4、延时消费问题
问题背景:用户秒杀成功,下单之后,30分钟之内未进行支付,该订单会被自动取消,回退库存。
实现方法可以用job,但job有个问题,需要每隔一段时间处理一次,实时性不是很好我们还可以用延时队列,rocketMq自带了延时队列的功能
具体实现:下单时消息生产者会生成一张订单,此时的状态为待支付,然后向延时队列中发送一条消息,当到达延时时间,消息消费者读取消息之后,会查询该订单的状态是否为待支付。如 果是待支付状态,则更新订单状态为取消状态。如果不是待支付,说明该订单已经支付过了,则直接返回;
注意点: 用户完成支付之后,会修改订单状态为已支付。
Mq常见的问题:
生产者发送消息丢失、重复消费消息、消息延迟时间
Rocketmq源码解决了消息重试、延迟时间
生产者消息丢失怎么处理
怎么防止重复提交
1、提交的时候:生产唯一key先不执行业务sql, 去判断有没有存在一样的key;
或者status状态
2、提交前:判断是否符合条件