描述:在消息投递的过程中可能会存在消息丢失的行为产生,生产者到交换机,交换机到队列的过程都有可能出现这个现象。所以我们要有个发布确认的操作来防止消息丢西。
确认机制方案:

配置文件配置交换机发布确认模式:
publisher-confirm-type=correlated
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换器后会触发回调方法
SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

交换机发布确认和路由回退分别需要实现RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback接口
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {@ResourceRabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//将类注入接口rabbitTemplate.setConfirmCallback(this); //交换机发布确认//路由回退配置rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) { //id ,ack ,结果描述if (b){log.info("收到回调id:{}",correlationData.getId());}else {log.info("交换机还未收到 id 为:{}消息,由于原因:{}",correlationData.getId(),s);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("路由:{}---回退的信息:{}",returnedMessage.getRoutingKey(),returnedMessage.getMessage());}
}
测试:
路由和交换机声明与绑定
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";@Beanpublic DirectExchange confirmExchange(){Map map = new HashMap<>();map.put("alternate-exchange", BACKUP_EXCHANGE_NAME);ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//持久化.withArguments(map);//设置备份交换机return (DirectExchange) exchangeBuilder.build();}@Beanpublic Binding bindingConfirmExchange(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("key");}//声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchangebackupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
生产者:
@GetMapping("sendConfirmMsg/{message}")public void sendConfirmMsg(@PathVariable String message) {CorrelationData correlationData1 = new CorrelationData("1");rabbitTemplate.convertAndSend(TtlQueueConfig.CONFIRM_EXCHANGE_NAME, "key", message,correlationData1);log.info(" 当 前 时 间 : {}, 发送一条发布确认信息给队列 confirm.queue:{}", new Date(), message);CorrelationData correlationData2 = new CorrelationData("2");rabbitTemplate.convertAndSend(TtlQueueConfig.CONFIRM_EXCHANGE_NAME, "key2", message,correlationData2);log.info(" 当 前 时 间 : {}, 发送一条发布确认信息给路由不存在的队列 confirm.queue:{}", new Date(), message);}
队列监听消费者:
@RabbitListener(queues = CONFIRM_QUEUE_NAME)public void receiveCONFIRMQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到发布确认队列的消息:{}", new Date().toString(), msg);}@RabbitListener(queues = "warning.queue")public void receiveWarningQueue(Message message){String msg = new String(message.getBody());//log.info("当前时间:{},收到备份交换机的警告队列的消息:{}", new Date().toString(), msg);log.error("报警发现不可路由消息:{}", msg);}
总结:监听者收到的ack为false则消息没有投递成功,交换机配置了备份交换机,优先级比路由回退的高,如果交换机到队列的消息没有投递成功,则可以通过备份交换机的监听队列再次去投递消息。