安装
从官网下载最新版本,解压,其他的前期准备只需要安装jdk。 从activemq 5.14开始,只支持jdk8。
配置
核心配置文件是conf目录下的activemq.xml, 默认的配置无需修改即可使用,其他配置我们会在后续文章介绍activemq各种特性时详细介绍。
启动
bin目录下运行 activemq start
即可 ,启动之后浏览器打开localhost:8161 可以打开web管理页面。
基本api使用
这里只介绍java下的api使用,activemq默认的openwire协议是只支持java的,因为实现的JMS协议是java下的协议。如果有其他语言的访问需求,可以用stomp,amqp等协议。
依赖包
引入activemq官方的client包。
1 2 3 4
| <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> </dependency>
|
actibemq收发消息的主要步骤类似,都需要先建立连接,然后建立会话,然后再新建需要的producer,consumer.
示例代码如下
发送消息
1 2 3 4 5 6 7 8 9 10 11
| public void produce() throws JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setBrokerURL("tcp://localhost:61616"); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("TEST"); MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage("123"); producer.send(message); }
|
接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public void consume() throws JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setBrokerURL("tcp://localhost:61616"); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("TEST"); MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> { try { System.out.println("consumer1: " + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } });
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue2 = session2.createQueue("TEST"); MessageConsumer consumer2 = session2.createConsumer(queue2); consumer2.setMessageListener(message -> { try { System.out.println("consumer2: " + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } });
Queue queue3 = session2.createQueue("TEST"); MessageConsumer consumer3 = session2.createConsumer(queue3); consumer3.setMessageListener(message -> { try { System.out.println("consumer3: " + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } });
try { TimeUnit.MINUTES.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
}
|
connection,session,consumer都可以创建多个,实现并行消费。一般来说需要使用多connection或者多session,因为只是多consumer的话,共用一个session,只是多个consumer轮流执行而已,不是真正的并行消费。至于connection和session的选择,一般看具体的流量需求。一个connection对应的是一个物理的tcp连接。
producer其实也类似,只是一般并行发消息意义不大,所以就不贴代码了。
浏览消息
activemq提供了一个有用的功能,brower, 作用与consumer类似,只是不会真正将消息消费掉,只是预览消息内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void brower() throws Exception{ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); cf.setBrokerURL("tcp://localhost:61616"); Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("TEST"); QueueBrowser browser = session.createBrowser(queue); Enumeration<?> enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { TextMessage message = (TextMessage) enumeration.nextElement(); System.out.println("Browsing: " + message.getText()); }
}
|
原文地址: https://lcy362.github.io/posts/48216/