消息中间件-2(ActiveMQ使用1)
创始人
2025-05-29 07:18:32
0

1、JMS和ActiveMQ

JMS(Java Message Service)是java平台上有关与面向消息中间件的技术规范,实际上是一套api接口,他便与消息系统中的Java应用程序进行消息交换,并且提供标准的产生、发送、接收消息的接口简化

JMS规范

1)连接工厂。
2)JMS连接。
3)JMS会话。
4)JMS目的。
5)JMS生产者和消费者。

JMS中消息的定义

消息头
消息属性
消息体

JMS消息模型

point to point(点对点)

在这里插入图片描述
每条队列里面的消息只能被一个消费者消费,消息一旦被消费,消息就不在消息队列中

Topic主题(发布订阅模式)

在这里插入图片描述
topic的消息会被所有的消费者消费

2、ActiveMQ安装、部署和运行

在这里插入图片描述

下载 Windows版 ActiveMQ,解压,运行bin目录下的activemq.bat即可。Linux下操作类似(进入bin目录运行./activemq start启动,./activemq stop关闭)。
下载地址:http://activemq.apache.org/activemq-580-release.html
运行后在浏览器中访问http://127.0.0.1:8161/admin,即可看到ActiveMQ的管理控制台
ActiveMQ中,61616为服务端口,8161为管理控制台端口。
启动成功界面如下:
在这里插入图片描述

3、使用ActiveMQ

1)原生API使用

详情见代码:no-spring下usemq包
1、pom文件配置

  org.apache.activemqactivemq-all5.9.0

2、消息生成端代码

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 生产者端*/
public class JmsProducer {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/*默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/*默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;/*发送数量*/private static final int SENDNUM = 3;public static void main(String[] args){/*连接工厂*/ConnectionFactory connectionFactory;/*连接*/Connection connection = null;/*会话*/Session session;/*消息的目的地*/Destination destination;/*消息的生产者*/MessageProducer messageProducer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try {/*** 通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*** 启动连接*/connection.start();/*** 创建session会话,* 第一个参数是否使用事务,当消息发送者向消息提供者发送消息时,消息发送者等待消息代理的确认* 没有回应则抛出异常,消息发送程序会处理这个错误* 第二个参数消息的确认模式:* AUTO_ACKNOWLEDGE:指定消息接收者在每次收到消息时自动发送确认,* 消息只向目标发送一次,但传输过程中可能因为错误而丢失消息(会通知消息提供者收到消息)* CLIENT_ACKNOWLEDGE:由消息接收者确认收到消息,通过调用消息的acknowledge()方法* DUPS_OK_ACKNOWLEDGE:指定消息提供者在消息接收者没有确认发送时重新发送消息* (这种确认不在乎接收者收到重复的消息)*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*** 创建一个名为HelloWorld的消息队列*///创建一个topic// destination = session.createTopic("HelloTopic");destination = session.createQueue("HelloActiveMqQueue");/*** 创建消息生产者*/messageProducer = session.createProducer(destination);/*** 循环发送消息*/for (int i = 0; i < SENDNUM; i++){String msg = "发送消息" + i + " " + System.currentTimeMillis();TextMessage textMessage = session.createTextMessage(msg);System.out.println("标准用法"+ msg);/*** 消息生产者发送消息*/messageProducer.send(textMessage);}}catch (Exception e){e.printStackTrace();}finally {//关闭mq连接if(connection != null){try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}

3、同步消息消费端

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 消费者端-同步接受消息*/
public class JmsConsumer {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/* 默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/* 默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args){/* 连接工厂*/ConnectionFactory connectionFactory;/* 连接*/Connection connection = null;/* 会话*/Session session;/* 消息的目的地*/Destination destination;/*消息的消费者*/MessageConsumer messageConsumer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try{/*通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*启动连接*/connection.start();/*创建session*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*创建一个名为HelloWorld的消息队列*/destination = session.createQueue("HelloActiveMqQueue");//创建一个topic// destination = session.createTopic("HelloTopic");/*创建消息消费者*/messageConsumer = session.createConsumer(destination);Message message;while ((message = messageConsumer.receive()) != null){System.out.println(((TextMessage)message).getText());}}catch (JMSException e){e.printStackTrace();}finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}
}

4、异步消费端代码

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 消费者端--异步接收消息*/
public class JmsConsumerAsyn {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/* 默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/* 默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args){/* 连接工厂*/ConnectionFactory connectionFactory;/* 连接*/Connection connection = null;/* 会话*/Session session;/* 消息的目的地*/Destination destination;/*消息的消费者*/MessageConsumer messageConsumer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try{/*通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*启动连接*/connection.start();/*创建session*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*创建一个名为HelloWorld的消息队列*///destination = session.createQueue("HelloActiveMqQueue");//创建一个topicdestination = session.createTopic("HelloTopic");/*创建消息消费者*/messageConsumer = session.createConsumer(destination);/*设置消费者监听器,监听消息*/messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {try {System.out.println(((TextMessage) message).getText());} catch (JMSException e) {e.printStackTrace();}}});}catch (JMSException e){e.printStackTrace();}}
}

2)与Spring结合

详情见代码:am_with_spring_p;am_with_spring_c

3)与SpringBoot结合

详情见代码:am_with_springboot

4、请求应答模式(request-reponse模式)

在这里插入图片描述

5、用户注册的异步处理代码

在这里插入图片描述
在这里插入图片描述
1、串行实现
2、并行实现
3、使用MQ实现
4、使用MQ实现消息回调
详情见代码:asyncApp

相关内容

热门资讯

原创 中... 联合国的最新表态令人精神一振,这种明确态度其实本就顺理成章。台湾自古属于中国,这是铁一般的事实,中国...
花800元就能买自己的死亡证明... 花800元就能买到 本人的精神诊断报告和死亡证明? 近日,“假证定制”业务 在多个电商和社交平台 死...
智能平台支撑政策落地 实达集团... 11月17日,福建省发展和改革委员会网站发布《福建省数据管理局关于印发〈福建省数据流通交易管理办法(...
祥明智能:制定对外投资管理制度 祥明智能公告称,为规范公司及控股子公司对外投资、资产处置的程序及审批权限,建立有效控制机制保障资金运...
美国9月非农数据受政府关门扰动... 11月21日,中国银河证券发布研报对美国9月非农数据进行点评。研报指出,9月新增就业回到增长区间,失...
原创 高... 近日,随着日本政坛极端言论频频出现,尤其是汉奸石平的发声,再次引发了人们对中日关系未来走势的广泛关注...
日媒曝光:日本曾制定3套“夺岛... 据央视新闻报道,随着日本首相高市早苗涉台挑衅言论持续发酵,日本自卫队在靠近台海的岛屿加强军力部署的情...
舞蹈家黄豆豆获破格提拔,已任副... 今年4月拟破格提拔的舞蹈家黄豆豆,已有新消息。 澎湃新闻注意到,中国舞蹈家协会官网近日更新后显示,黄...
李霄鹏告别青岛海牛:战术调整与... 随着2023赛季的落幕,李霄鹏教练组已正式与青岛海牛球员告别,确认下赛季将不再继续执教。这一决定不仅...