消息发布确认
创始人
2024-03-27 05:48:31
0

描述:在消息投递的过程中可能会存在消息丢失的行为产生,生产者到交换机,交换机到队列的过程都有可能出现这个现象。所以我们要有个发布确认的操作来防止消息丢西。
确认机制方案:
在这里插入图片描述

配置文件配置交换机发布确认模式:
publisher-confirm-type=correlated
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换器后会触发回调方法
SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
在这里插入图片描述
交换机发布确认和路由回退分别需要实现RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback接口

@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {@ResourceRabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//将类注入接口rabbitTemplate.setConfirmCallback(this); //交换机发布确认//路由回退配置rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {  //id ,ack ,结果描述if (b){log.info("收到回调id:{}",correlationData.getId());}else {log.info("交换机还未收到 id 为:{}消息,由于原因:{}",correlationData.getId(),s);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("路由:{}---回退的信息:{}",returnedMessage.getRoutingKey(),returnedMessage.getMessage());}
}

测试:
路由和交换机声明与绑定

public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";@Beanpublic DirectExchange confirmExchange(){Map map = new HashMap<>();map.put("alternate-exchange", BACKUP_EXCHANGE_NAME);ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//持久化.withArguments(map);//设置备份交换机return (DirectExchange) exchangeBuilder.build();}@Beanpublic Binding bindingConfirmExchange(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("key");}//声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchangebackupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}

生产者:

@GetMapping("sendConfirmMsg/{message}")public void sendConfirmMsg(@PathVariable String message) {CorrelationData correlationData1 = new CorrelationData("1");rabbitTemplate.convertAndSend(TtlQueueConfig.CONFIRM_EXCHANGE_NAME, "key", message,correlationData1);log.info(" 当 前 时 间 : {}, 发送一条发布确认信息给队列 confirm.queue:{}", new Date(), message);CorrelationData correlationData2 = new CorrelationData("2");rabbitTemplate.convertAndSend(TtlQueueConfig.CONFIRM_EXCHANGE_NAME, "key2", message,correlationData2);log.info(" 当 前 时 间 : {}, 发送一条发布确认信息给路由不存在的队列 confirm.queue:{}", new Date(), message);}

队列监听消费者:

@RabbitListener(queues = CONFIRM_QUEUE_NAME)public void receiveCONFIRMQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到发布确认队列的消息:{}", new Date().toString(), msg);}@RabbitListener(queues = "warning.queue")public void receiveWarningQueue(Message message){String msg = new String(message.getBody());//log.info("当前时间:{},收到备份交换机的警告队列的消息:{}", new Date().toString(), msg);log.error("报警发现不可路由消息:{}", msg);}

总结:监听者收到的ack为false则消息没有投递成功,交换机配置了备份交换机,优先级比路由回退的高,如果交换机到队列的消息没有投递成功,则可以通过备份交换机的监听队列再次去投递消息。

相关内容

热门资讯

西藏多维度惩治虚假诉讼 近日,自治区人民检察院联合自治区高级人民法院、公安厅、司法厅召开首次虚假诉讼惩治联席会议,共同会签《...
制度创新激活跨境投资新活力 中... 12月26日,河北华海股权投资基金合伙企业(有限合伙)在中国(河北)自由贸易试验区正定片区(以下简称...
南宁全力推动各项社保惠企政策落... “我们企业享受到社保降费率和稳岗返还等资金,生产经营得以降本增效,也稳住了人才队伍。”12月23日,...
原告向法官出示证据,照片右下角... 近日,湖北孝感大悟法院民二庭在审理一起房屋租赁合同纠纷案时,精准识破原告方利用AI技术伪造证据的行为...
锚定“十五五”开局 专家建言宏... 12月27日,中国社会科学院财经战略研究院、浙江财经大学共同举办的“财经战略年会(2025)暨第二届...
每周股票复盘:锴威特(6886... 截至2025年12月26日收盘,锴威特(688693)报收于37.41元,较上周的37.28元上涨0...
资深征地律师助力维权,孙侠律师 在征地拆迁领域,遇到专业、靠谱的律师至关重要。资深征地律师、出名征地律师的专业服务,能为当事人在复杂...
推动法律监督新格局走深走实 推动法律监督新格局走深走实 ——专访二级大检察官,天津市检察院党组书记、检察长陈凤超 天津市检察院...
鱼全翻着白肚皮,水还变黑了!养... 图片来源:摄图网 本文为《方圆》杂志原创稿件 这天,养鱼户陆大军发现 鱼塘里的鱼竟然全部翻着白肚皮,...
问法预告|遇到网络纠纷不知如何... 如今,互联网已经成为人们生活工作的重要场所,随着应用场景的增多,与互联网有关的纠纷也呈现上升趋势。从...