消息中间件-2(ActiveMQ使用1)
创始人
2025-05-29 07:18:32
0

1、JMS和ActiveMQ

JMS(Java Message Service)是java平台上有关与面向消息中间件的技术规范,实际上是一套api接口,他便与消息系统中的Java应用程序进行消息交换,并且提供标准的产生、发送、接收消息的接口简化

JMS规范

1)连接工厂。
2)JMS连接。
3)JMS会话。
4)JMS目的。
5)JMS生产者和消费者。

JMS中消息的定义

消息头
消息属性
消息体

JMS消息模型

point to point(点对点)

在这里插入图片描述
每条队列里面的消息只能被一个消费者消费,消息一旦被消费,消息就不在消息队列中

Topic主题(发布订阅模式)

在这里插入图片描述
topic的消息会被所有的消费者消费

2、ActiveMQ安装、部署和运行

在这里插入图片描述

下载 Windows版 ActiveMQ,解压,运行bin目录下的activemq.bat即可。Linux下操作类似(进入bin目录运行./activemq start启动,./activemq stop关闭)。
下载地址:http://activemq.apache.org/activemq-580-release.html
运行后在浏览器中访问http://127.0.0.1:8161/admin,即可看到ActiveMQ的管理控制台
ActiveMQ中,61616为服务端口,8161为管理控制台端口。
启动成功界面如下:
在这里插入图片描述

3、使用ActiveMQ

1)原生API使用

详情见代码:no-spring下usemq包
1、pom文件配置

  org.apache.activemqactivemq-all5.9.0

2、消息生成端代码

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 生产者端*/
public class JmsProducer {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/*默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/*默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;/*发送数量*/private static final int SENDNUM = 3;public static void main(String[] args){/*连接工厂*/ConnectionFactory connectionFactory;/*连接*/Connection connection = null;/*会话*/Session session;/*消息的目的地*/Destination destination;/*消息的生产者*/MessageProducer messageProducer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try {/*** 通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*** 启动连接*/connection.start();/*** 创建session会话,* 第一个参数是否使用事务,当消息发送者向消息提供者发送消息时,消息发送者等待消息代理的确认* 没有回应则抛出异常,消息发送程序会处理这个错误* 第二个参数消息的确认模式:* AUTO_ACKNOWLEDGE:指定消息接收者在每次收到消息时自动发送确认,* 消息只向目标发送一次,但传输过程中可能因为错误而丢失消息(会通知消息提供者收到消息)* CLIENT_ACKNOWLEDGE:由消息接收者确认收到消息,通过调用消息的acknowledge()方法* DUPS_OK_ACKNOWLEDGE:指定消息提供者在消息接收者没有确认发送时重新发送消息* (这种确认不在乎接收者收到重复的消息)*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*** 创建一个名为HelloWorld的消息队列*///创建一个topic// destination = session.createTopic("HelloTopic");destination = session.createQueue("HelloActiveMqQueue");/*** 创建消息生产者*/messageProducer = session.createProducer(destination);/*** 循环发送消息*/for (int i = 0; i < SENDNUM; i++){String msg = "发送消息" + i + " " + System.currentTimeMillis();TextMessage textMessage = session.createTextMessage(msg);System.out.println("标准用法"+ msg);/*** 消息生产者发送消息*/messageProducer.send(textMessage);}}catch (Exception e){e.printStackTrace();}finally {//关闭mq连接if(connection != null){try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}

3、同步消息消费端

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 消费者端-同步接受消息*/
public class JmsConsumer {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/* 默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/* 默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args){/* 连接工厂*/ConnectionFactory connectionFactory;/* 连接*/Connection connection = null;/* 会话*/Session session;/* 消息的目的地*/Destination destination;/*消息的消费者*/MessageConsumer messageConsumer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try{/*通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*启动连接*/connection.start();/*创建session*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*创建一个名为HelloWorld的消息队列*/destination = session.createQueue("HelloActiveMqQueue");//创建一个topic// destination = session.createTopic("HelloTopic");/*创建消息消费者*/messageConsumer = session.createConsumer(destination);Message message;while ((message = messageConsumer.receive()) != null){System.out.println(((TextMessage)message).getText());}}catch (JMSException e){e.printStackTrace();}finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}
}

4、异步消费端代码

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 消费者端--异步接收消息*/
public class JmsConsumerAsyn {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/* 默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/* 默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args){/* 连接工厂*/ConnectionFactory connectionFactory;/* 连接*/Connection connection = null;/* 会话*/Session session;/* 消息的目的地*/Destination destination;/*消息的消费者*/MessageConsumer messageConsumer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try{/*通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*启动连接*/connection.start();/*创建session*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*创建一个名为HelloWorld的消息队列*///destination = session.createQueue("HelloActiveMqQueue");//创建一个topicdestination = session.createTopic("HelloTopic");/*创建消息消费者*/messageConsumer = session.createConsumer(destination);/*设置消费者监听器,监听消息*/messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {try {System.out.println(((TextMessage) message).getText());} catch (JMSException e) {e.printStackTrace();}}});}catch (JMSException e){e.printStackTrace();}}
}

2)与Spring结合

详情见代码:am_with_spring_p;am_with_spring_c

3)与SpringBoot结合

详情见代码:am_with_springboot

4、请求应答模式(request-reponse模式)

在这里插入图片描述

5、用户注册的异步处理代码

在这里插入图片描述
在这里插入图片描述
1、串行实现
2、并行实现
3、使用MQ实现
4、使用MQ实现消息回调
详情见代码:asyncApp

相关内容

热门资讯

重庆优化创业担保贷款政策 近日,重庆市人力社保局、市财政局、中国人民银行重庆市分行联合印发《关于调整优化我市创业担保贷款工作的...
鼓励村民承包村道养护工作!新修... 日前,省十四届人大常委会第十九次会议修订通过《四川省农村公路条例》(以下简称《条例》),自2025年...
原创 狗... 广州一位52岁阿姨的操作直接冲上热搜——她立遗嘱专门划出10多万元,留给自己收养的4只流浪狗!网友炸...
6月A股行情展望:政策驱动与结... 投资信息太多太杂,不知道什么是重点?「华彬金融观察」公众号,深度研判市场动态。从热点追踪、走势分析、...
自然人网店监管难题如何解(政策... 制图:张芳曼 农家特产从田间地头直达城市餐桌,设计师独家定制手作陶艺……近年来,自然人网店快速发展,...
以制度牵引完善职业健康保障 近日,新版《职业病分类和目录》正式发布,将职业性腕管综合征(俗称“鼠标手”)和创伤后应激障碍纳入职业...
借政策东风,创美好生活(今日谈... 端午假期,选购新能源汽车的消费者络绎不绝,一些门店看车的客流量显著增加。得益于消费品以旧换新政策,消...
亿利达:诉讼事项进展 金融界4月23日消息,亿利达公告称,公司于近日收到安徽省合肥市包河区人民法院送达的(2022)皖 0...
菲媒:菲律宾副总统称,不优先考... 【环球网报道】综合菲律宾《马尼拉标准报》等媒体6月1日报道,菲律宾副总统莎拉·杜特尔特称,她不优先考...
原创 美... 特朗普再次执掌白宫后,他的“地盘扩张梦”可谓是雷声大雨点小,搞得沸沸扬扬却未见实效。他本想一口气吞掉...