消息中间件-2(ActiveMQ使用1)
创始人
2025-05-29 07:18:32
0

1、JMS和ActiveMQ

JMS(Java Message Service)是java平台上有关与面向消息中间件的技术规范,实际上是一套api接口,他便与消息系统中的Java应用程序进行消息交换,并且提供标准的产生、发送、接收消息的接口简化

JMS规范

1)连接工厂。
2)JMS连接。
3)JMS会话。
4)JMS目的。
5)JMS生产者和消费者。

JMS中消息的定义

消息头
消息属性
消息体

JMS消息模型

point to point(点对点)

在这里插入图片描述
每条队列里面的消息只能被一个消费者消费,消息一旦被消费,消息就不在消息队列中

Topic主题(发布订阅模式)

在这里插入图片描述
topic的消息会被所有的消费者消费

2、ActiveMQ安装、部署和运行

在这里插入图片描述

下载 Windows版 ActiveMQ,解压,运行bin目录下的activemq.bat即可。Linux下操作类似(进入bin目录运行./activemq start启动,./activemq stop关闭)。
下载地址:http://activemq.apache.org/activemq-580-release.html
运行后在浏览器中访问http://127.0.0.1:8161/admin,即可看到ActiveMQ的管理控制台
ActiveMQ中,61616为服务端口,8161为管理控制台端口。
启动成功界面如下:
在这里插入图片描述

3、使用ActiveMQ

1)原生API使用

详情见代码:no-spring下usemq包
1、pom文件配置

  org.apache.activemqactivemq-all5.9.0

2、消息生成端代码

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 生产者端*/
public class JmsProducer {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/*默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/*默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;/*发送数量*/private static final int SENDNUM = 3;public static void main(String[] args){/*连接工厂*/ConnectionFactory connectionFactory;/*连接*/Connection connection = null;/*会话*/Session session;/*消息的目的地*/Destination destination;/*消息的生产者*/MessageProducer messageProducer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try {/*** 通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*** 启动连接*/connection.start();/*** 创建session会话,* 第一个参数是否使用事务,当消息发送者向消息提供者发送消息时,消息发送者等待消息代理的确认* 没有回应则抛出异常,消息发送程序会处理这个错误* 第二个参数消息的确认模式:* AUTO_ACKNOWLEDGE:指定消息接收者在每次收到消息时自动发送确认,* 消息只向目标发送一次,但传输过程中可能因为错误而丢失消息(会通知消息提供者收到消息)* CLIENT_ACKNOWLEDGE:由消息接收者确认收到消息,通过调用消息的acknowledge()方法* DUPS_OK_ACKNOWLEDGE:指定消息提供者在消息接收者没有确认发送时重新发送消息* (这种确认不在乎接收者收到重复的消息)*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*** 创建一个名为HelloWorld的消息队列*///创建一个topic// destination = session.createTopic("HelloTopic");destination = session.createQueue("HelloActiveMqQueue");/*** 创建消息生产者*/messageProducer = session.createProducer(destination);/*** 循环发送消息*/for (int i = 0; i < SENDNUM; i++){String msg = "发送消息" + i + " " + System.currentTimeMillis();TextMessage textMessage = session.createTextMessage(msg);System.out.println("标准用法"+ msg);/*** 消息生产者发送消息*/messageProducer.send(textMessage);}}catch (Exception e){e.printStackTrace();}finally {//关闭mq连接if(connection != null){try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}

3、同步消息消费端

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 消费者端-同步接受消息*/
public class JmsConsumer {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/* 默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/* 默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args){/* 连接工厂*/ConnectionFactory connectionFactory;/* 连接*/Connection connection = null;/* 会话*/Session session;/* 消息的目的地*/Destination destination;/*消息的消费者*/MessageConsumer messageConsumer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try{/*通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*启动连接*/connection.start();/*创建session*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*创建一个名为HelloWorld的消息队列*/destination = session.createQueue("HelloActiveMqQueue");//创建一个topic// destination = session.createTopic("HelloTopic");/*创建消息消费者*/messageConsumer = session.createConsumer(destination);Message message;while ((message = messageConsumer.receive()) != null){System.out.println(((TextMessage)message).getText());}}catch (JMSException e){e.printStackTrace();}finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}
}

4、异步消费端代码

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 消费者端--异步接收消息*/
public class JmsConsumerAsyn {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/* 默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/* 默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args){/* 连接工厂*/ConnectionFactory connectionFactory;/* 连接*/Connection connection = null;/* 会话*/Session session;/* 消息的目的地*/Destination destination;/*消息的消费者*/MessageConsumer messageConsumer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try{/*通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*启动连接*/connection.start();/*创建session*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*创建一个名为HelloWorld的消息队列*///destination = session.createQueue("HelloActiveMqQueue");//创建一个topicdestination = session.createTopic("HelloTopic");/*创建消息消费者*/messageConsumer = session.createConsumer(destination);/*设置消费者监听器,监听消息*/messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {try {System.out.println(((TextMessage) message).getText());} catch (JMSException e) {e.printStackTrace();}}});}catch (JMSException e){e.printStackTrace();}}
}

2)与Spring结合

详情见代码:am_with_spring_p;am_with_spring_c

3)与SpringBoot结合

详情见代码:am_with_springboot

4、请求应答模式(request-reponse模式)

在这里插入图片描述

5、用户注册的异步处理代码

在这里插入图片描述
在这里插入图片描述
1、串行实现
2、并行实现
3、使用MQ实现
4、使用MQ实现消息回调
详情见代码:asyncApp

相关内容

热门资讯

香港金管局:对三三金融违反支付... 【财华社讯】7月22日,香港金管局今日宣布,已根据《支付系统及储值支付工具条例》(香港法例第584章...
【安心行动 法援故事】漯河:一... 法 律 援 助 7月17日,退役军人杨某来到漯河市司法局,郑重地将一面写有“法援秉公担道义 拥军护民...
中国女篮亚洲杯奖金:张子宇、王... 这届亚洲杯,中国女篮最终以获得铜牌告终,相比于上届亚洲杯夺冠后的风光和重奖,这届亚洲杯低调了许多。 ...
CF40报告:扩大内需仍是下半... 蓝鲸新闻7月22日讯(记者 李丹萍)记者获悉,日前,中国金融四十人论坛(CF40)发布二季度宏观政策...
阿塞拜疆总统建议乌克兰:永不接... 【文/观察者网 熊超然】当地时间7月19日,第三届舒沙全球媒体论坛(Shusha Global Me...
两连板东方电气:行业政策不存在... 人民财讯7月22日电,两连板东方电气(600875)7月22日晚间发布股价异动公告称,经自查,并向公...
刑辩律师法律法规庭前准备的两个... 刑事律师在庭前做好法律法规的准备至关重要,毕竟律师在法庭上最大的武器就是法律法规。若控辩审三方都能严...
湖南一律师因散布不实消息被罚款... 华声在线7月22日讯(文/视频 全媒体记者 虢灿 通讯员 蒋艾林 朱洁茹)故意歪曲事实,恶意误导舆论...
提醒!北京明起一周,几乎天天都... 市气象台22日14时发布天气预报: 今天下午晴转多云,山区分散小阵雨, 北转南风二三级,最高气温34...
春兴精工最新公告:福能东方就仙... 春兴精工(002547.SZ)公告称,公司近日收到中山市第二法院出具的《应诉通知书》,福能东方装备科...