RabbitMQ系列【16】AmqpTemplate接口详解
创始人
2024-03-01 23:31:48
0

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 前言
    • AmqpTemplate
    • API
      • send
      • convertAndSend
      • receive
      • receiveAndConvert
      • receiveAndReply
      • sendAndReceive
      • convertSendAndReceive

前言

RabbitTemplatespring-amqp提供的一个 RabbitMQ 消息操作模板类,在之前我们使用它完成了简单的消息发送。

RabbitTemplate 主要提供了发送消息、接收消息以及其他附加功能,内部封装了RabbitMQ原生API,大大简化了使用 RabbitMQ操作。

RabbitTemplate 主要实现了AmqpTemplateRabbitOperations接口:
在这里插入图片描述

AmqpTemplate

AmqpTemplate接口主要声明了三类方法:

public interface AmqpTemplate {// 发送消息void send(Message var1) throws AmqpException;// 接收消息Message receive() throws AmqpException;// 发送消息并接收回复Message sendAndReceive(Message var1) throws AmqpException;
}

API

首先,我们看下AmqpTemplate中声明的各种方法。

send

send 方法一共有三个,需要创建Message消息对象,将消息封装到该对象内发送,如果没有指定交换机、路由键,将使用默认值,也就是空字符串。

	// 发送消息到默认交换机、默认路由KEYvoid send(Message message) throws AmqpException;// 发送消息到默认交换机、使用指定路由KEYvoid send(String routingKey, Message message) throws AmqpException;// 发送消息到指定交换机、使用指定路由KEYvoid send(String exchange, String routingKey, Message message) throws AmqpException;

示例:

        Message message = new Message("消息".getBytes());rabbitTemplate.send(message);rabbitTemplate.send("route.key", message);rabbitTemplate.send("exchange_name", "route.key", message);

convertAndSend

convertAndSend 方法可以转换对象并发送,并可以添加一个消息处理器MessagePostProcessor

	// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用默认路由KEYvoid convertAndSend(Object message) throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEYvoid convertAndSend(String routingKey, Object message) throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEYvoid convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessor void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessor void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessorvoid convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)throws AmqpException;

MessagePostProcessor 是一个函数型接口,提供了一个postProcessMessage方法处理消息,由于直接发送的是对象,如果需要设置一些消息的属性,就需要使用该接口进行设置,例如:

        MessagePostProcessor messagePostProcessor = message1 -> {MessageProperties messageProperties = message1.getMessageProperties();messageProperties.setExpiration("1000");return message1;};rabbitTemplate.convertAndSend("","","消息",messagePostProcessor);

receive

一般获取消息有两种处理模式:

  • push:由RabbitMQ主动将消息推送给订阅队列的消费者,调用channel.basicConsume方法。
  • pull:主动从指定队列中拉取消息,需要消费者调用channel.basicGet方法。

receive 方法,就是从主动队列中获取消息。

	// 如果默认队列中有消息,则接收消息。立即返回,可能有NULL值@NullableMessage receive() throws AmqpException;// 从指定队列中获取消息。立即返回,可能有NULL值@NullableMessage receive(String queueName) throws AmqpException;// 如果默认队列中有消息,则接收消息。可能有NULL值,并指定一个超时时间@NullableMessage receive(long timeoutMillis) throws AmqpException;// 从指定队列中获取消息。可能有NULL值,并指定一个超时时间@NullableMessage receive(String queueName, long timeoutMillis) throws AmqpException;

receiveAndConvert

receiveAndConvert可以拉取消息并进行对象转换。

// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。@NullableObject receiveAndConvert() throws AmqpException;// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。@NullableObject receiveAndConvert(String queueName) throws AmqpException;// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能有NULL值,并指定一个超时时间@NullableObject receiveAndConvert(long timeoutMillis) throws AmqpException;// 从指定队列中接收消息并将其转换为Java对象,并指定一个超时时间,可能为null 值。@NullableObject receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;//  如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。@Nullable T receiveAndConvert(ParameterizedTypeReference type) throws AmqpException;// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。@Nullable T receiveAndConvert(String queueName, ParameterizedTypeReference type) throws AmqpException;// 如果默认队列中有消息,则接收消息并将其转换为Java对象。可能有NULL值,并指定一个超时时间及消息转换器SmartMessageConverter。@Nullable T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference type) throws AmqpException;// 从指定队列中接收消息并将其转换为Java对象。可能为null 值。并指定一个超时时间及消息转换器SmartMessageConverter。@Nullable T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference type)throws AmqpException;

示例:

        // 接收消息,队列存在是,会报错;队列中没有消息,返回NULLMessage bootQueue1= rabbitTemplate.receive("bizQueue");Message bootQueue2 = rabbitTemplate.receive("bizQueue",1000);Message bootQueue3= rabbitTemplate.receive("backupQueue");

使用消息转换器,可以直接发送、接收对象:

        // User 需要实现SerializableUser user = new User();user.setName("张三");rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());rabbitTemplate.convertAndSend(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,user);User receiveUser = rabbitTemplate.receiveAndConvert("bizQueue", new ParameterizedTypeReference() {});

receiveAndReply

receiveAndReply支持在获取消息时传入一个回调函数ReceiveAndReplyCallback,处理接收到消息和回复消息的业务逻辑。

receiveAndReply应用于RPC模式Server端,Server收到消息,并回复消息给客户端:
在这里插入图片描述
该模式用的比较少,实现起来也比较麻烦,这里就不演示了。

    // 收到消息并回复,R:接收到的消息 S: 返回的消息 boolean receiveAndReply(ReceiveAndReplyCallback callback) throws AmqpException; boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback) throws AmqpException; boolean receiveAndReply(ReceiveAndReplyCallback callback, String replyExchange, String replyRoutingKey)throws AmqpException; boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback, String replyExchange,String replyRoutingKey) throws AmqpException; boolean receiveAndReply(ReceiveAndReplyCallback callback,ReplyToAddressCallback replyToAddressCallback) throws AmqpException; boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback,ReplyToAddressCallback replyToAddressCallback) throws AmqpException;

sendAndReceive

sendAndReceive也属于RPC模式,发送消息并接收回复消息,属于Client端。

	@NullableMessage sendAndReceive(Message message) throws AmqpException;@NullableMessage sendAndReceive(String routingKey, Message message) throws AmqpException;@NullableMessage sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException;

convertSendAndReceive

convertSendAndReceive以及convertSendAndReceiveAsType是对sendAndReceive的扩展,可以直接发送对象消息,并可以设置类型转换器:

	@NullableObject convertSendAndReceive(Object message) throws AmqpException;@NullableObject convertSendAndReceive(String routingKey, Object message) throws AmqpException;@NullableObject convertSendAndReceive(String exchange, String routingKey, Object message) throws AmqpException;@NullableObject convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;@NullableObject convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor)throws AmqpException;@NullableObject convertSendAndReceive(String exchange, String routingKey, Object message,MessagePostProcessor messagePostProcessor) throws AmqpException;@Nullable T convertSendAndReceiveAsType(Object message, ParameterizedTypeReference responseType)throws AmqpException;@Nullable T convertSendAndReceiveAsType(String routingKey, Object message,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String routingKey, Object message,MessagePostProcessor messagePostProcessor, ParameterizedTypeReference responseType)throws AmqpException;@Nullable T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,MessagePostProcessor messagePostProcessor, ParameterizedTypeReference responseType)throws AmqpException;

相关内容

热门资讯

(图表)十年来全国检察机关共办... 新华社图表,北京,2025年12月22日 记者从最高人民法院、最高人民检察院12月22日举行的发布...
反思独居蒋女士离世事件:补齐制... 家住上海虹口区的46岁独居人士蒋女士,12月14日因病离世,但她的遗产不能用来购买墓地,引发舆论广泛...
俄副外长:保证不进攻欧盟北约,... 【文/观察者网 柳白】 据俄新社报道,当地时间12月22日,俄罗斯外交部副部长谢尔盖·里亚布科夫在...
贵州一策律师事务所:让法治精神... 今年10月,贵州一策律师事务所荣获2025年度贵州省绿色网络工作室荣誉称号,成为全省20家获此殊荣的...
日本多名教职人员因性暴力或性犯... 新华社东京12月22日电(记者陈泽安 李子越)据日本文部科学省22日公布的数据,日本2024财年(2...
贵州仁山律师事务所:以法为炬传... 今年10月,贵州仁山律师事务所凭借深耕婚姻法律网络科普的扎实成效、线上线下融合的普法创新实践,斩获贵...
法治青年说|涉及网约车、公用电... 12月22日,全国人大常委会法工委关于2025年备案审查工作情况的报告提请全国人大常委会会议审议。在...
原创 再... 电视人朴娜莱向前任经纪人追加提起刑事诉讼,使得原本就紧绷的法律纠纷进一步升级。 据韩国媒体报道称:...
因涉嫌犯罪,交建股份实控人俞发... 12月22日晚间,交建股份(603815.SH)公告,公司收到公司实际控制人俞发祥家属通知,俞发祥因...
新疆火炬(603080)披露公... 截至2025年12月22日收盘,新疆火炬(603080)报收于22.81元,较前一交易日上涨0.35...