有道无术,术尚可求,有术无道,止于术。
RabbitTemplate 是spring-amqp提供的一个 RabbitMQ 消息操作模板类,在之前我们使用它完成了简单的消息发送。
RabbitTemplate 主要提供了发送消息、接收消息以及其他附加功能,内部封装了RabbitMQ原生API,大大简化了使用 RabbitMQ操作。
RabbitTemplate 主要实现了AmqpTemplate和RabbitOperations接口:

AmqpTemplate接口主要声明了三类方法:
public interface AmqpTemplate {// 发送消息void send(Message var1) throws AmqpException;// 接收消息Message receive() throws AmqpException;// 发送消息并接收回复Message sendAndReceive(Message var1) throws AmqpException;
}
首先,我们看下AmqpTemplate中声明的各种方法。
send 方法一共有三个,需要创建Message消息对象,将消息封装到该对象内发送,如果没有指定交换机、路由键,将使用默认值,也就是空字符串。
// 发送消息到默认交换机、默认路由KEYvoid send(Message message) throws AmqpException;// 发送消息到默认交换机、使用指定路由KEYvoid send(String routingKey, Message message) throws AmqpException;// 发送消息到指定交换机、使用指定路由KEYvoid send(String exchange, String routingKey, Message message) throws AmqpException;
示例:
Message message = new Message("消息".getBytes());rabbitTemplate.send(message);rabbitTemplate.send("route.key", message);rabbitTemplate.send("exchange_name", "route.key", message);
convertAndSend 方法可以转换对象并发送,并可以添加一个消息处理器MessagePostProcessor 。
// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用默认路由KEYvoid convertAndSend(Object message) throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEYvoid convertAndSend(String routingKey, Object message) throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEYvoid convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessor void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessor void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)throws AmqpException;// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY// 在发送消息之前添加一个消息处理器MessagePostProcessorvoid convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)throws AmqpException;
MessagePostProcessor 是一个函数型接口,提供了一个postProcessMessage方法处理消息,由于直接发送的是对象,如果需要设置一些消息的属性,就需要使用该接口进行设置,例如:
MessagePostProcessor messagePostProcessor = message1 -> {MessageProperties messageProperties = message1.getMessageProperties();messageProperties.setExpiration("1000");return message1;};rabbitTemplate.convertAndSend("","","消息",messagePostProcessor);
一般获取消息有两种处理模式:
RabbitMQ主动将消息推送给订阅队列的消费者,调用channel.basicConsume方法。channel.basicGet方法。receive 方法,就是从主动队列中获取消息。
// 如果默认队列中有消息,则接收消息。立即返回,可能有NULL值@NullableMessage receive() throws AmqpException;// 从指定队列中获取消息。立即返回,可能有NULL值@NullableMessage receive(String queueName) throws AmqpException;// 如果默认队列中有消息,则接收消息。可能有NULL值,并指定一个超时时间@NullableMessage receive(long timeoutMillis) throws AmqpException;// 从指定队列中获取消息。可能有NULL值,并指定一个超时时间@NullableMessage receive(String queueName, long timeoutMillis) throws AmqpException;
receiveAndConvert可以拉取消息并进行对象转换。
// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。@NullableObject receiveAndConvert() throws AmqpException;// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。@NullableObject receiveAndConvert(String queueName) throws AmqpException;// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能有NULL值,并指定一个超时时间@NullableObject receiveAndConvert(long timeoutMillis) throws AmqpException;// 从指定队列中接收消息并将其转换为Java对象,并指定一个超时时间,可能为null 值。@NullableObject receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。@Nullable T receiveAndConvert(ParameterizedTypeReference type) throws AmqpException;// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。@Nullable T receiveAndConvert(String queueName, ParameterizedTypeReference type) throws AmqpException;// 如果默认队列中有消息,则接收消息并将其转换为Java对象。可能有NULL值,并指定一个超时时间及消息转换器SmartMessageConverter。@Nullable T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference type) throws AmqpException;// 从指定队列中接收消息并将其转换为Java对象。可能为null 值。并指定一个超时时间及消息转换器SmartMessageConverter。@Nullable T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference type)throws AmqpException;
示例:
// 接收消息,队列存在是,会报错;队列中没有消息,返回NULLMessage bootQueue1= rabbitTemplate.receive("bizQueue");Message bootQueue2 = rabbitTemplate.receive("bizQueue",1000);Message bootQueue3= rabbitTemplate.receive("backupQueue");
使用消息转换器,可以直接发送、接收对象:
// User 需要实现SerializableUser user = new User();user.setName("张三");rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());rabbitTemplate.convertAndSend(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,user);User receiveUser = rabbitTemplate.receiveAndConvert("bizQueue", new ParameterizedTypeReference() {});
receiveAndReply支持在获取消息时传入一个回调函数ReceiveAndReplyCallback,处理接收到消息和回复消息的业务逻辑。
receiveAndReply应用于RPC模式Server端,Server收到消息,并回复消息给客户端:

该模式用的比较少,实现起来也比较麻烦,这里就不演示了。
// 收到消息并回复,R:接收到的消息 S: 返回的消息 boolean receiveAndReply(ReceiveAndReplyCallback callback) throws AmqpException; boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback) throws AmqpException; boolean receiveAndReply(ReceiveAndReplyCallback callback, String replyExchange, String replyRoutingKey)throws AmqpException; boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback, String replyExchange,String replyRoutingKey) throws AmqpException; boolean receiveAndReply(ReceiveAndReplyCallback callback,ReplyToAddressCallback replyToAddressCallback) throws AmqpException; boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback,ReplyToAddressCallback replyToAddressCallback) throws AmqpException;
sendAndReceive也属于RPC模式,发送消息并接收回复消息,属于Client端。
@NullableMessage sendAndReceive(Message message) throws AmqpException;@NullableMessage sendAndReceive(String routingKey, Message message) throws AmqpException;@NullableMessage sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException;
convertSendAndReceive以及convertSendAndReceiveAsType是对sendAndReceive的扩展,可以直接发送对象消息,并可以设置类型转换器:
@NullableObject convertSendAndReceive(Object message) throws AmqpException;@NullableObject convertSendAndReceive(String routingKey, Object message) throws AmqpException;@NullableObject convertSendAndReceive(String exchange, String routingKey, Object message) throws AmqpException;@NullableObject convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;@NullableObject convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor)throws AmqpException;@NullableObject convertSendAndReceive(String exchange, String routingKey, Object message,MessagePostProcessor messagePostProcessor) throws AmqpException;@Nullable T convertSendAndReceiveAsType(Object message, ParameterizedTypeReference responseType)throws AmqpException;@Nullable T convertSendAndReceiveAsType(String routingKey, Object message,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,ParameterizedTypeReference responseType) throws AmqpException;@Nullable T convertSendAndReceiveAsType(String routingKey, Object message,MessagePostProcessor messagePostProcessor, ParameterizedTypeReference responseType)throws AmqpException;@Nullable T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,MessagePostProcessor messagePostProcessor, ParameterizedTypeReference responseType)throws AmqpException;