使用SpringBoot实现RabbitMQ各个模式
创始人
2024-03-06 08:33:15
0

实现了RabbitMQ各个模式(simple、topic、direct、fanout及发送方确认和接收方确认)的一个demo
源码:https://gitee.com/xunan29/study-rabbitmq-test-project

参考文章:
https://blog.csdn.net/K_kzj_K/article/details/106642250
https://blog.csdn.net/qq_41466440/article/details/118567253

项目介绍

这个项目主要是实现了rabbitMQ各个模式(simple、topic、direct、fanout及发送方确认和接收方确认)的一个demo

rabbitmq-publisher-model

rabbitmq-receiver-model

上面俩是一起的,一个生产者,一个消费者

mq-message-callback-publisher-model9092

mq-message-callback-consumer-model9093

上面俩是一起的,一个生产者,一个消费者,其主要是根据上面那两个模块实现了发送方确认和接收方确认模式功能

依赖

		org.springframework.bootspring-boot-starter-amqp2.5.3

配置文件

server:port: 9090#优雅关停shutdown: gracefulspring:lifecycle:#强制关停时间(配合优雅关停)timeout-per-shutdown-phase: 60sapplication:name: rabbitmq-publisher-model#配置rabbitMq 服务器rabbitmq:#    host: 172.2200.10.2host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟host 可以不设置,使用server默认hostvirtual-host: /

其中需要注意rabbitmq代码访问端口是5672,而不是15672,15672是用来访问可视化管理界面的

config
在这里插入图片描述

config是按各个模式需求,配置了相应的队列、交换机和binding,为什么在消费者中也配置了:是因为如果没配置在最开始时没有先启动生产者,消费者这边会报错,消费者这边配置了就避免了这个问题。

controller

生产者的controller用来生产消息

service

消费者的service用来消费信息

其中work模式我没有实现,如果要写的话它就是在simple模式上,多加个消费者就可,它只会有一个消费者消费消息,参考:https://blog.csdn.net/qq_39240694/article/details/106911755

还有需要注意,如果消息传递java对象则必须java对象是在同一个模块或者提取到一个公用模块中(即消费者和生产者在一个模块才能使用),我项目中这种虽然在两个模块但同包同类名同路径也是不可以使用的

发送方确认和接收方确认

配置文件中加一些新配置

#配置rabbitMq 服务器rabbitmq:#    host: 172.2200.10.2host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟host 可以不设置,使用server默认hostvirtual-host: /#确认消息已发送到交换机(Exchange)#springboot.rabbitmq.publisher-confirm 新版本(2.2.0之后)已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果#publisher-confirms: truepublisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true

配置相关的消息确认回调函数,RabbitConfig.java

	@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);System.out.println("ConfirmCallback:     "+"确认情况:"+ack);System.out.println("ConfirmCallback:     "+"原因:"+cause);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("ReturnCallback:     "+"消息:"+message);System.out.println("ReturnCallback:     "+"回应码:"+replyCode);System.out.println("ReturnCallback:     "+"回应信息:"+replyText);System.out.println("ReturnCallback:     "+"交换机:"+exchange);System.out.println("ReturnCallback:     "+"路由键:"+routingKey);}});

上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?

先从总体的情况分析,推送消息存在四种情况:

①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送到sever,交换机和队列啥都没找到
④消息推送成功

①消息推送到server,但是在server里找不到交换机

也就是这里有交换机但是没有创建也没有配置

结论: ①这种情况触发的是 ConfirmCallback 回调函数。

②消息推送到server,找到交换机了,但是没找到队列

这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作

结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

③消息推送到sever,交换机和队列啥都没找到

结论: ③这种情况触发的是 ConfirmCallback 回调函数。

④消息推送成功

结论: ④这种情况触发的是 ConfirmCallback 回调函数。

消费者接收到消息的消息确认机制

和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:①自动确认, 这也是默认的消息确认情况。  AcknowledgeMode.NONE
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。② 根据情况确认, 这个不做介绍
③ 手动确认 , 这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
basic.ack用于肯定确认 
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展) 
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息 消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
而basic.nack,basic.reject表示没有被正确处理:着重讲下reject,因为有时候一些场景是需要重新入列的。channel.basicReject(deliveryTag, true);  拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。顺便也简单讲讲 nack,这个也是相当于设置不消费某条消息。channel.basicNack(deliveryTag, false, true);
第一个参数依然是当前消息到的数据的唯一id;
第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

项目中MessageListener 的config类进行了消息接收 手动确认 配置

==MyAckReceiver ==(在service里)是对应的手动确认消息监听类

上述的类每个都有两个,一个对应处理一个队列,另一个对应处理多个队列

里面配置在注释中有详细介绍,

相关内容

热门资讯

关于健全对刑事案件犯罪嫌疑人、... “两高一部”发布《关于健全对刑事案件犯罪嫌疑人、被告人身份审查工作机制的意见》 为准确、及时查明案件...
完善幼儿园收费政策 三部门发通... 中新网12月23日电 据国家发展和改革委员会网站消息,23日,国家发展改革委、教育部、财政部发布关于...
男子4年强奸继女六七十次,被判... 日前,河北省石家庄市栾城区人民法院在中国裁判文书网公开了一份刑事判决书,男子刘冬(化名)在4年间竟六...
合肥一烤肉店回应宠物狗上桌吃饭... 12月22日,安徽合肥一家烤肉店有宠物狗上桌吃饭,餐桌上的餐盘里放有食物,宠物狗在不断啃食生肉。 2...
中国人民银行关于实施一次性信用... 中国人民银行上海总部,各省、自治区、直辖市及计划单列市分行,征信中心;国家开发银行,各政策性银行、国...
徐杰20分萨林杰32+11 广... 【搜狐体育战报】北京时间12月23日CBA常规赛第5轮,客场作战的广东东阳光以93-85击败广州朗肽...
完善幼儿园收费政策,三部门发通... 今天(12月23日),国家发展改革委、教育部、财政部发布关于完善幼儿园收费政策的通知,全文如下: 各...
男子被冒名贷款13年,导致征信... 封面新闻记者 钟晓璐 男子从未到某银行贷款,却在办理购车贷款时发现,自己在该银行张家界某支行有不良征...
冬窗转会动态:米兰低成本签约菲... 在白鹿巷球场,随着伊萨克将球送进热刺的网窝,利物浦的球迷欢呼声还未消散,这位创下队史转会费纪录的前锋...
ST华铭(300462)披露累... 截至2025年12月23日收盘,ST华铭(300462)报收于11.3元,较前一交易日下跌1.31%...