RabbitMQ基本使用
创始人
2024-03-26 03:03:33
0

先会用,知道mq是干嘛的,怎么用RabbitMQ  再去考虑一系列深入东西

 一:什么是MQ

MQ消息队列详解、四大MQ的优缺点分析_从百草园杀到三味书屋&的博客-CSDN博客

理解什么是MQ  MQ在企业/程序中的作用是什么?

 MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。

消息队列中间件(MQ)是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 

好处:

        1.吞吐量提升,无需等待订阅者处理完成,给客户端响应更快

        2.耦合度降低,每个服务都可以灵活拔插,可替换

        3.流量晓峰,不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

        ...

缺点:

        1.架构复杂了,业务线没有明显的流程线,不好管理

        2.需要依赖于Broker的可靠、安全、性能

比较常见的MQ(ActiveMQ  RabbitMQ  RocketMQ  Kafka )

几种常见MQ的对比:

RabbitMQActiveMQ(一般不用了)RocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

二:RabbitMQ

2.1安装RabbitMQ

linux(docker)安装:docker安装RabbitMQ_落后挨打的博客-CSDN博客_docker 安装rabbitmq 

 docker search rabbitmq   搜寻镜像版本

docker pull rabbitmq        拉取镜像到宿主机

将镜像生成容器
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

      
docker ps  查看运行的容器 
docker exec -it 镜像ID /bin/bash     进入容器内部
rabbitmq-plugins enable rabbitmq_management  运行
访问地址: http://linuxip地址:15672   (用户guest,密码guest)

 注意:  

        RabbitMQ启动成功后暴露两个端口

        java应用端连接:192.168.136.160:5672

        管理后台地址 :地址: 15672

        管理员账号/密码 guest/guest

三:RabbitMQ消息模型(队列消息 发布/订阅消息)

 四:Springboot演示RabbitMQ

1.导依赖  amqp

org.springframework.bootspring-boot-starter-amqporg.springframework.bootspring-boot-starter-testtestcom.fasterxml.jackson.corejackson-databind

2.添加配置文件:

#配置RabbitMQ的基本信息
spring:rabbitmq:host: 192.168.136.160port: 5672virtual-host: tjtcusername: huapassword: hua

 首先介绍两种:SimpleQueue        WorkQueue  (没有交换机)

 SimpleQueue:一个生产者,一个消费者

 

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序

  • C:消费者:消息的接受者,会一直等待消息到来。

  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

 1.定义发送消息(生产者)

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitTemplateTest {/*** 使用SpringAMQP发送消息:*    1、依赖*    2、在配置文件中加入rabbitmq的相关配置*    3、注入对象RabbitTemplate,调用convertAndSend方法发送消息*/@Autowiredprivate RabbitTemplate template;/*** 发送基本(简单)队列消息* springamqp发送队列消息,需要在后台系统中手动创建一个队列*/@Testpublic void testSendBaseMessage() {String ququeName="simple.queue";String message = "hahhah";template.convertAndSend(ququeName,message); //队列名称,消息内容}
}

 消费者:

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 自定义的消息监听器:*    1、此类交给容器管理*    2、编写方法:监听消息队列(队列中一旦有最新的消息,自动执行方法)*/
@Component
public class MessageListener {/*** 消费者的监听器方法:获取消息并消费(完成业务逻辑)* 1、方法没有返回值* 2、方法有参数(参数=发送的消息内容)*      参数类型和发送的消息类型一致* 3、在方法上通过@RabbitListener绑定队列*/@RabbitListener(queues = "simple.queue")   //有队列就是用 没队列就报错//@RabbitListener(queuesToDeclare ={@Queue("abc")} ) //有对象就使用 没队列就创建一个队列public void recBaseMessage(String message) {System.out.println("消费者1获取消息:"+message);}
}

 注意:   

    @RabbitListener(queues = "simple.queue")   //有队列就是用 没队列就报错
    @RabbitListener(queuesToDeclare ={@Queue("abc")} ) //有队列就使用 没队列就创建一个队列

WorkQueue:让多个消费者绑定到一个队列,共同消费队列中的消息

 

 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。

 消息发送:

  /*** 发送工作队列消息*   发送多次*/@Testpublic void testSendWorkMessage() throws InterruptedException {for (int i = 0; i < 50; i++) {String message = "凡凡有难,八方点赞,点赞数:"+i;String queueName = "simple.queue";rabbitTemplate.convertAndSend(queueName,message); //队列,消息Thread.sleep(10);}}

 消费者:

    /*** 消费者1*/@RabbitListener(queues="simple.queue")public void recBaseMessage(String message) throws InterruptedException {//性能较高System.out.println("1号消费者获取消息:"+message);Thread.sleep(50);}/*** 消费者2*/@RabbitListener(queues="simple.queue")public void recBaseMessage(String message) throws InterruptedException {//性能较低System.out.println("2号消费者获取消息:"+message);Thread.sleep(1000);}

 注意: RabbitMQ默认的是,平均分配给每个消费者,并没有考虑到消费者的处理能力

那么可以通过配置解决:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

上边这两种消息队列  

发送者:

        rabbitTemplate.convertAndSend(queueName,message); //队列名称,消息

消费者: (定义一个方法,放一个参数)

         @RabbitListener(queues="simple.queue")

Fanout

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列

    • Direct:定向,把消息交给符合指定routing key 的队列

    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

  • C:消费者,与以前一样,订阅队列,没有变化

  • Queue:消息队列也与以前一样,接收消息、缓存消息。

 消息发送:

  /*** 测试发送Fanout模式消息(广播模式)*/@Testpublic void testSendFanoutMessage() {String exchangeName = "heima165fanout";String message = "凡凡有难,八方点赞";template.convertAndSend(exchangeName,"",message); //交换机名称,空,消息}

 消息消费者:

    /*** 监听Fanout模式消息*      @RabbitListener:*          bindings :绑定队列和交换机*          value:*              @Queue : 指定队列*                  value:队列名称*          exchange:指定交换机*              @Exchange*                  value:交换机名称*                  type:类型*//*** 获取Fanout模式消息(广播)*/
package cn.itcast.mq.fanoutqueue;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerFanOut {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "fanout.queue1"),exchange = @Exchange(value = "fanout",type = ExchangeTypes.FANOUT)))public void recBaseMessage(String message) {
//        Thread.sleep(1000);System.out.println("消费者1获取消息:"+message);}
}

 2

package cn.itcast.mq.fanoutqueue;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerFanOut2 {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "fanout.queue2"),exchange = @Exchange(value = "fanout",type = ExchangeTypes.FANOUT)))public void recBaseMessage(String message) throws InterruptedException {Thread.sleep(1000);System.out.println("消费者1获取消息:"+message);}
}

注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

Direct

 

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

发送者:

   @Testpublic void testSendDirectMessage() {for (int i = 0; i < 10; i++) {String exchangeName = "Direct";String routingKey = "red";String message = "今天太冷了" + i;rabbitTemplate.convertAndSend(exchangeName,routingKey,message); //交换机,routingkey(类型),消息}}

消费者:

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.queue"),exchange = @Exchange(value = "Direct",type = ExchangeTypes.DIRECT),key = {"blue"}  // key = {"blue","red","yellow"}))public void recBaseMessage(String message) throws InterruptedException {Thread.sleep(1000);System.out.println("消费者1获取消息:"+message);}
 @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.queue2"),exchange = @Exchange(value = "Direct",type = ExchangeTypes.DIRECT),
//                    key = { "red"}key = {"blue","red","yellow"}))public void recBaseMessage(String message) throws InterruptedException {Thread.sleep(1000);System.out.println("消费者1获取消息:"+message);}

 Topic  模式

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

 发送者:

    /*** 发送topic消息(主题模式)*/@Testpublic void testSendTopicMessage() {String exchangeName = "Topic";String routingKey = "china.weather";String message = "恭喜EDG";template.convertAndSend(exchangeName,routingKey,message); //交换机,routingkey(类型),消息}

消费者:

    /*** 获取主题模式消息*   工程,可以支持所有以china开头的消息*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.queue1"),exchange = @Exchange(value = "Topic",type = ExchangeTypes.TOPIC),key = "china.*"))public void recTopicMessage(String message) {System.out.println("消费者1获取topic消息:"+message);}
  /*** 获取主题模式消息*   工程,可以支持所有以news结尾的内容*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.queue2"),exchange = @Exchange(value = "Topic",type = ExchangeTypes.TOPIC),key = "*.news"))public void recTopicMessage(String message) {System.out.println("消费者2获取topic消息:"+message);}

相关内容

热门资讯

每周股票复盘:华依科技(688... 截至2025年12月26日收盘,华依科技(688071)报收于33.81元,较上周的32.34元上涨...
每周股票复盘:文灿股份(603... 截至2025年12月26日收盘,文灿股份(603348)报收于19.23元,较上周的19.5元下跌1...
每周股票复盘:中邮科技(688... 截至2025年12月26日收盘,中邮科技(688648)报收于58.51元,较上周的58.49元上涨...
每周股票复盘:蓝科高新(601... 截至2025年12月26日收盘,蓝科高新(601798)报收于8.99元,较上周的8.98元上涨0....
每周股票复盘:红豆股份(600... 截至2025年12月26日收盘,红豆股份(600400)报收于2.42元,较上周的2.5元下跌3.2...
每周股票复盘:金证股份(600... 截至2025年12月26日收盘,金证股份(600446)报收于15.75元,较上周的15.46元上涨...
每周股票复盘:日盈电子(603... 截至2025年12月26日收盘,日盈电子(603286)报收于59.5元,较上周的57.11元上涨4...
每周股票复盘:盐 田 港(00... 截至2025年12月26日收盘,盐 田 港(000088)报收于4.53元,较上周的4.52元上涨0...
每周股票复盘:广电网络(600... 截至2025年12月26日收盘,广电网络(600831)报收于4.2元,较上周的4.36元下跌3.6...
每周股票复盘:新疆火炬(603... 截至2025年12月26日收盘,新疆火炬(603080)报收于22.85元,较上周的22.73元上涨...