(十)死信队列
创始人
2024-02-14 12:54:14
0

死信队列

  • 1、概念
  • 2、死信产生的原因
  • 3、代码实现
    • 3.1. 流程图
    • 3.2. 消息TTL 过期
    • 3.3. 队列达到最大长度
    • 3.4. 消息被拒

1、概念

某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有
后续的处理,就变成了死信,有死信自然就有了死信队列

**应用场景:**为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效,这种就放入死信队列进行处理。

2、死信产生的原因

  • 消息 TTL(Time to Live存活时间) 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

3、代码实现

3.1. 流程图

在这里插入图片描述

3.2. 消息TTL 过期

消费者1 (最复杂的)

package com.feng.deadQueue;import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @Author Feng* @Date 2022/11/25 15:09* @Version 1.0* @Description 死信队列消费者01*/
public class Consumer01 {//普通交换机名字public static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名字public static final String DEAD_EXCHANGE = "dead_exchange";//普通队列名public static final String NORMAL_QUEUE = "normal_queue";//死信队列名public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtil.getChannel();//声明交换机:普通和死信channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);/***  声明普通队列*///设置声明队列的参数Map arguments = new HashMap<>();//设置过期时间,单位是毫秒,这里设置10秒arguments.put("x-message-ttl",10000);//设置队列的死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信交换机的RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//绑定死信换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");channel.basicConsume(NORMAL_QUEUE,true,(String consumerTag, Delivery message) -> {System.out.println("消费者1接收消息是:" + new String(message.getBody(), "UTF-8"));System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());}, consumerTag -> {});}
}

生产者

package com.feng.deadQueue;import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @Author Feng* @Date 2022/11/25 15:37* @Version 1.0* @Description 死信队列生产者*/
public class ProductDead {//普通交换机名字public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtil.getChannel();//死信消息,设置ttl时间AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000")//单位是ms.build();for (int i = 1; i < 11; i++) {String msg = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes());}}
}

只需要启动消费者01再宕机,然后启动生产者发消息,消息内容就会从普通队列到死信队列

在这里插入图片描述
在这里插入图片描述

死信队列消费者

package com.feng.deadQueue;import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @Author Feng* @Date 2022/11/25 15:09* @Version 1.0* @Description 死信队列消费者02*/
public class Consumer02 {//死信交换机名字public static final String DEAD_EXCHANGE = "dead_exchange";//死信队列名public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtil.getChannel();channel.basicConsume(DEAD_QUEUE,true,(String consumerTag, Delivery message) -> {System.out.println("消费者1接收消息是:" + new String(message.getBody(), "UTF-8"));System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());}, consumerTag -> {});}
}

3.3. 队列达到最大长度

声明队列的时候设置参数就行如下

//设置队列长度限制,这里是6
arguments.put("x-max-length",6);

注意此时需要把原先队列删除 因为参数改变了

在这里插入图片描述

发十条消息测试如下

在这里插入图片描述
这里注意超出队列长度进入死信队列的消息是先入队的消息

3.4. 消息被拒

  • 消息生产者代码同上生产者一致
  • C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息。让消息阻塞在队列中)
package com.feng.deadQueue;import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @Author Feng* @Date 2022/11/25 15:09* @Version 1.0* @Description 死信队列消费者01*/
public class Consumer01 {//普通交换机名字public static final String NORMAL_EXCHANGE = "normal_exchange";//死信交换机名字public static final String DEAD_EXCHANGE = "dead_exchange";//普通队列名public static final String NORMAL_QUEUE = "normal_queue";//死信队列名public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtil.getChannel();//声明交换机:普通和死信channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);/***  声明普通队列*///设置声明队列的参数Map arguments = new HashMap<>();//设置过期时间,单位是毫秒,这里设置10秒
//        arguments.put("x-message-ttl",10000);//设置队列的死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);//设置死信交换机的RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");//设置队列长度限制
//        arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机与队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");//绑定死信换机与死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");//开启手动应答channel.basicConsume(NORMAL_QUEUE,false,(String consumerTag, Delivery message) -> {String msg = new String(message.getBody(), "UTF-8");if("info5".equals(msg)){System.out.println("消费者1接收消息是:" + msg+"这个是拒绝消息");//第二个参数是不重新入队channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}System.out.println("消费者1接收消息是:" + msg);channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}, consumerTag -> {});}
}

相关内容

热门资讯

处理消费纠纷不能止于“喊话平台... 维 辰 政府部门公开喊话互联网平台,少见。 12月23日,丽江市古城区文化和旅游局公开发布《丽江市古...
金能科技:按照国家及当地政策落... 有投资者在互动平台向金能科技提问:“您好,请问公司是否针对员工生育或育儿设有相关的福利或激励政策?如...
AI赋能基层调解 老调解员有了... 今年6月以来,随着许多地方速裁法庭的建立,部分民商事纠纷,从法院前移到综治中心进行调解。各类商业纠纷...
起诉中信证券理财产品违规!获赔... 认购中信证券理财产品逾期超三年,富安娜(002327)追回款项的工作取得进展。 12月25日,富安娜...
释放购房需求、优化贷款政策!北... 24日,北京出台楼市新政,放宽非京籍家庭购房条件,支持多子女家庭住房需求。业内人士认为,北京楼市新政...
952名中国籍涉电诈犯罪嫌疑人... 近日,公安部派出工作组会同缅甸、泰国执法部门在缅甸妙瓦底地区,联合开展新一轮赌诈园区集中清剿行动,9...
报告显示:政策持续推动 全国二... 人民网北京12月25日电 (记者乔雪峰)当前,在以旧换新政策持续推动下,二手车市场正从昔日的区域性交...
字节跳动通报三季度内部违规情况... 红星资本局12月25日消息,今日,字节跳动披露2025年三季度内部违规案例的处理情况。通报显示,三季...
中纪委网站:集中清理规范性文件... 记者从全国人大常委会法制工作委员会法规备案审查室了解到,为破除民营经济发展的各类制度障碍,依法保护民...