RabbitMQ初步到精通-第四章-RabbitMQ工作模式-WORK
创始人
2024-01-31 07:41:15
0

第四章-RabbitMQ工作模式-WORK

1.模式介绍

1.1 work模式

Work模式与前面的Simple模式一致,也是消息经由生产者发到Exchange再到queue再被消费者消费。不同点在于SIMPL模式是一个队列对应的一个消费者,此模式会由一个队列对应两个消费者或大于两个消费者。

 

1.2 work模式模拟

此模式下,我们的消费者实现了对消息的平均消费,但如果消费者1消费能力若于消费者2,那就会造成消费者1 的消息积压,这时候我们就会想到使用公平模式,能者多劳,消费快的多消费,消费少的少消费。合理消息。后面代码会涉及到。

 

2.验证代码

2.1 work平均模式

我们还是举小明洗澡的例子,小明洗澡比较孤独,这时候小明的女朋友小丽也来一块洗澡了,但两个人不想公用一个喷头,便又接入了一个喷头,一人一个开始洗,但是小明洗澡比较快,虽然洗完了,但是喷头关不上,只好把水浪费掉,小丽洗澡比较慢,但感觉水又不太够用。一共就20升水,一人10升。

2.1.1 生产者


/*** @author rabbit* @version 1.0.0* @Description 一个生产者,一个默认的交换机,一个队列,两个消费者* @createTime 2022/07/27 19:34:00*/
public class WaterProducer {public static final String QUEUE_NAME = "SolarWaterHeater";//生产者public static void main(String[] args) throws Exception {//1、获取connectionConnection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();for (int i = 1; i <= 20; i++) {sendMsg(channel, i);Thread.sleep(200);}//4、关闭管道和连接channel.close();connection.close();}private static void sendMsg(Channel channel, int k) throws IOException {//3、发送消息到exchangeString msg = k + "升";/*** 参数1:指定exchange,使用“”。默认的exchange* 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中* 参数3:指定传递的消息携带的properties* 参数4:指定传递的消息,byte[]类型*/channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("水龙头放水成功!" + k + "升");}}

2.1.2 消费者

小明洗澡:


/*** @author rabbit* @version 1.0.0* @Description 工作模式  一个生产者,一个默认的交换机,一个队列,两个消费者* 需要在consumer消费者端,平均分配* @createTime 2022/07/27 19:36:00*/
public class XMShowerConsumer {public static final String QUEUE_NAME = "SolarWaterHeater";//消费者public static void main(String[] args) throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();//3、创建队列-helloworld/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {int i = 1;@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}if (i > 5) {System.out.println("小明洗澡洗完,开始浪费-第:" + new String(body, "UTF-8"));} else {System.out.println("小明洗澡已用水-第: " + new String(body, "UTF-8"));}i++;}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调*/channel.basicConsume(QUEUE_NAME, true, consumer);System.out.println("小明开始洗澡-快速洗......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}}

小丽洗澡:


/*** @author rabbit* @version 1.0.0* @Description 工作模式  一个生产者,一个默认的交换机,一个队列,两个消费者* 需要在consumer消费者端,平均分配* @createTime 2022/07/27 19:36:00*/
public class XLShowerConsumer {public static final String QUEUE_NAME = "SolarWaterHeater";//消费者public static void main(String[] args) throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();//3、创建队列-helloworld/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("小丽洗澡已用水-第: " + new String(body, "UTF-8"));}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调*/channel.basicConsume(QUEUE_NAME, true, consumer);System.out.println("小丽开始洗澡-慢慢洗......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}}

2.1.3 结果验证

生产者放水消息:

水龙头放水成功!1升
水龙头放水成功!2升
水龙头放水成功!3升
水龙头放水成功!4升
水龙头放水成功!5升
水龙头放水成功!6升
水龙头放水成功!7升
水龙头放水成功!8升
水龙头放水成功!9升
水龙头放水成功!10升
水龙头放水成功!11升
水龙头放水成功!12升
水龙头放水成功!13升
水龙头放水成功!14升
水龙头放水成功!15升
水龙头放水成功!16升
水龙头放水成功!17升
水龙头放水成功!18升
水龙头放水成功!19升
水龙头放水成功!20升

小明洗澡

小明开始洗澡-快速洗......
小明洗澡已用水-第: 2升
小明洗澡已用水-第: 4升
小明洗澡已用水-第: 6升
小明洗澡已用水-第: 8升
小明洗澡已用水-第: 10升
小明洗澡洗完,开始浪费-第:12升
小明洗澡洗完,开始浪费-第:14升
小明洗澡洗完,开始浪费-第:16升
小明洗澡洗完,开始浪费-第:18升
小明洗澡洗完,开始浪费-第:20升

小丽洗澡

小丽洗澡已用水-第: 1升
小丽洗澡已用水-第: 3升
小丽洗澡已用水-第: 5升
小丽洗澡已用水-第: 7升
小丽洗澡已用水-第: 9升
小丽洗澡已用水-第: 11升
小丽洗澡已用水-第: 13升
小丽洗澡已用水-第: 15升
小丽洗澡已用水-第: 17升
小丽洗澡已用水-第: 19升

从结果来看,两个人都是平均用水,虽然有快有慢,慢的就会存在堆积情况。

2.2 公平模式

小丽就有意见了,自己不够洗,小明还白白浪费那么多水。小明给出了建议,让小丽把碰头开关开大点,自己开小点,这样就让小丽多用点水,自己少用点也就够了。

2.2.1 生产者

同上面生产者一致

2.2.2 消费者

小明洗澡:


/*** @author rabbit* @version 1.0.0* @Description 工作模式  一个生产者,一个默认的交换机,一个队列,两个消费者* 需要在consumer消费者端,添加Qos能力以及更改为ACK手动即可让消费者根据自己的能力消费,不是RabbitMQ默认的平均分配了* @createTime 2022/07/27 19:36:00*/
public class XMShowerConsumer {public static final String QUEUE_NAME = "SolarWaterHeater";//消费者public static void main(String[] args) throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();channel.basicQos(1);//3、创建队列-helloworld/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("小明洗澡已用水-第: " + new String(body, "UTF-8"));//手动ACK(接收信息,指定是否批量操作)channel.basicAck(envelope.getDeliveryTag(), false);}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调*/channel.basicConsume(QUEUE_NAME, false, consumer);System.out.println("小明开始洗澡-慢慢洗......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}}

小丽洗澡:


/*** @author rabbit* @version 1.0.0* @Description 工作模式  一个生产者,一个默认的交换机,一个队列,两个消费者* 需要在consumer消费者端,添加Qos能力以及更改为ACK手动即可让消费者根据自己的能力消费,不是RabbitMQ默认的平均分配了* @createTime 2022/07/27 19:36:00*/
public class XLShowerConsumer {public static final String QUEUE_NAME = "SolarWaterHeater";//消费者public static void main(String[] args) throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();channel.basicQos(1);//3、创建队列-helloworld/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("小丽洗澡已用水-第: " + new String(body, "UTF-8"));//手动ACK(接收信息,指定是否批量操作)channel.basicAck(envelope.getDeliveryTag(), false);}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调*/channel.basicConsume(QUEUE_NAME, false, consumer);System.out.println("小丽开始洗澡-快速洗......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}}

2.2.3 结果验证

生产者:

水龙头放水成功!1升
水龙头放水成功!2升
水龙头放水成功!3升
水龙头放水成功!4升
水龙头放水成功!5升
水龙头放水成功!6升
水龙头放水成功!7升
水龙头放水成功!8升
水龙头放水成功!9升
水龙头放水成功!10升
水龙头放水成功!11升
水龙头放水成功!12升
水龙头放水成功!13升
水龙头放水成功!14升
水龙头放水成功!15升
水龙头放水成功!16升
水龙头放水成功!17升
水龙头放水成功!18升
水龙头放水成功!19升
水龙头放水成功!20升

小明:

小明开始洗澡-慢慢洗......
小明洗澡已用水-第: 2升
小明洗澡已用水-第: 7升
小明洗澡已用水-第: 12升
小明洗澡已用水-第: 17升

小丽:

小丽开始洗澡-快速洗......
小丽洗澡已用水-第: 1升
小丽洗澡已用水-第: 3升
小丽洗澡已用水-第: 4升
小丽洗澡已用水-第: 5升
小丽洗澡已用水-第: 6升
小丽洗澡已用水-第: 8升
小丽洗澡已用水-第: 9升
小丽洗澡已用水-第: 10升
小丽洗澡已用水-第: 11升
小丽洗澡已用水-第: 13升
小丽洗澡已用水-第: 14升
小丽洗澡已用水-第: 15升
小丽洗澡已用水-第: 16升
小丽洗澡已用水-第: 18升
小丽洗澡已用水-第: 19升
小丽洗澡已用水-第: 20升

从结果看:实现了我们期望的结果,小丽用的水多了,小明用的水少,大家都洗好了。

3.模式总结

此模式我们最应该注意的就是平均模式与公平模式的实现,这里是靠消费者的手工确认机制来实现的。

    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

如上代码中的 第二个参数 autoAck,

true的时候,属于自动确认,消费者一旦接收到此消息,就会发回确认消息给Broker,Broker会从队列中删除掉此消息。当有多个消费者注册到同一个queue时,会默认轮询分发。

false的时候,属于手动确认,消费者虽然接收到了消息,还需要执行一个 方法来告诉Broker才行

channel.basicAck(envelope.getDeliveryTag(), false);

若没有告诉Broker,Broker还会将此消息再次发送。

同时此种情况下还需要使用此方法:

void basicQos(int prefetchCount) throws IOException;

指定每次消费抓取的数量

maximum number of messages that the server will deliver

这样我们通过这些改造,实现了,消费者消费的时候,消费一个告诉Broker删除一个,没消费的时候就不要给消费者投递了,最终实现了公平消费。

相关内容

热门资讯

美联储“收传票”前夕,特朗普“... 在上周的一场白宫活动上,特朗普公开批评联邦检察官行动迟缓、不愿起诉其指定目标,指责这些检察官"软弱无...
许凯名誉权纠纷案最新进展:黑粉... 搜狐娱乐讯 1月14日,法院刊登刘某某致歉声明。刘某某表示,其在涉案账号上侮辱、诽谤许凯先生的言论,...
电池法规丨欧盟拟修订电池法规标... 2025年12月22日,WTO发布了一份技术性贸易壁垒(TBT)通报,通报编号G/TBT/N/EU/...
“新国补”政策落地 消费市场热... 来聊聊我们身边的一项福利——以旧换新补贴。新年伊始,河南积极落实2026年国家“以旧换新”补贴新政,...
救援善意与法律风险如何平衡? 羊城晚报记者 王隽杰 近日,秦岭牛马救援队被遇难驴友家属起诉案一审第三次庭审结束。秦岭牛马队的代理律...
上海提振消费政策“三连发”有何... 岁末年初,促消费、扩内需,上海一口气连发3份重磅政策文件。 去年底,16个部门联合印发《关于进一步扩...
政策解读|天气预报正成为“健康... 人民日报记者 李红梅 蒋雪鸿 1月初,气象预报显示,天津市将接连出现两次弱冷空气过程。同时,天津市环...
广昌一站式综治中心高效化解基层... “以前办事不知道找哪个部门,跑几趟都没结果,现在综治中心一个窗口就能受理,太方便了。”近日,在广昌县...
上海嘉定:以案促治打造实用法律... 王英鸽 李迪明子 近日,上海市嘉定区人民法院召开新闻发布会,发布2025年度《司法护航高质量发展案例...