Mobility

聚沙成塔

位运算是计算机科学领域用的非常广泛的一种计算方式。再合理的使用方式下,会大大提升运算效率,今天就介绍一下位运算的一个巧妙应用:转换大小写字母。

阅读全文 »

前一段时间参加了优化一个老的计费系统,学习了一些高并发下做余额扣减的常用手段,也做了一些尝试,因此在这里总结记录一下, 在高并发下对。

阅读全文 »

概述

最近在找房子,因为想找一个去几个地方都相对方便的位置,自己去地图上看还挺麻烦的,所以想做个小工具,用来对北京地铁的路线做规划,本文就简单介绍一下实现过程。目前的功能还比较简单,主体方法就是根据一个输入的始发站,列出其他所有站点到这个地方的站数最少路线。

阅读全文 »

在之前的一篇文章通过实际操作理解redis cluster原理中,我们简单介绍过redis cluster的设计原理。redis cluster中的数据是根据一定规则分配到16384个slot中,这些slot又根据配置对应到不同的节点上。我们知道,在集群稳定运行后,仍然可以以slot为单位转移数据,不过对于具体的转移过程,包括转移过程中集群的可用性等问题,一直不是太确定,所以这次详细了解了一下。

阅读全文 »

概述

AsyncLoopThread是jstorm里自定义的一个循环执行任务的工具,实现不复杂,本来是不值当的专门开一篇文章介绍。不过这个在jstorm里应用实在太广泛了,诸如uspervisor/nimbus心跳,获取新topology,更新worker状态等大量功能都是利用AsyncLoopThread实现的,所以还是介绍一下吧,也方便后续看其他部分代码。

阅读全文 »

介绍

数据的持久化是很多系统都会涉及到的一个问题,尤其是redis,activemq这些数据主要是存储在内存中的。既然存在内存中,就会面临宕机时数据丢失的风险。这一问题的解决方案就是通过某种方式将数据落到磁盘上,也就是所谓的持久化。

activemq提供了三种持久化方式,分别基于jdbc, kahadb和leveldb. 目前官方最推荐的是基于kahadb的持久化。

阅读全文 »

安装

官网下载最新版本,解压,其他的前期准备只需要安装jdk。 从activemq 5.14开始,只支持jdk8。

配置

核心配置文件是conf目录下的activemq.xml, 默认的配置无需修改即可使用,其他配置我们会在后续文章介绍activemq各种特性时详细介绍。

启动

bin目录下运行 activemq start 即可 ,启动之后浏览器打开localhost:8161 可以打开web管理页面。

基本api使用

这里只介绍java下的api使用,activemq默认的openwire协议是只支持java的,因为实现的JMS协议是java下的协议。如果有其他语言的访问需求,可以用stomp,amqp等协议。

依赖包

引入activemq官方的client包。

1
2
3
4
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>

actibemq收发消息的主要步骤类似,都需要先建立连接,然后建立会话,然后再新建需要的producer,consumer.

示例代码如下

发送消息

1
2
3
4
5
6
7
8
9
10
11
public void produce() throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://localhost:61616");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("123");
producer.send(message);
}

接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    public void consume() throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://localhost:61616");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST");
MessageConsumer consumer = session.createConsumer(queue);

// Message message = consumer.receive();
// TextMessage textMessage = (TextMessage) message;
// System.out.println(textMessage.getText());

consumer.setMessageListener(message -> {
try {
System.out.println("consumer1: " + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});



Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue2 = session2.createQueue("TEST");
MessageConsumer consumer2 = session2.createConsumer(queue2);
consumer2.setMessageListener(message -> {
try {
System.out.println("consumer2: " + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});

Queue queue3 = session2.createQueue("TEST");
MessageConsumer consumer3 = session2.createConsumer(queue3);
consumer3.setMessageListener(message -> {
try {
System.out.println("consumer3: " + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});



try {
TimeUnit.MINUTES.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

connection,session,consumer都可以创建多个,实现并行消费。一般来说需要使用多connection或者多session,因为只是多consumer的话,共用一个session,只是多个consumer轮流执行而已,不是真正的并行消费。至于connection和session的选择,一般看具体的流量需求。一个connection对应的是一个物理的tcp连接。

producer其实也类似,只是一般并行发消息意义不大,所以就不贴代码了。

浏览消息

activemq提供了一个有用的功能,brower, 作用与consumer类似,只是不会真正将消息消费掉,只是预览消息内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void brower() throws Exception{
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://localhost:61616");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST");
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
TextMessage message = (TextMessage) enumeration.nextElement();
System.out.println("Browsing: " + message.getText());
}

}

原文地址: https://lcy362.github.io/posts/48216/

Activemq是一种消息中间件(MOM),基于JMS协议实现。介绍activemq就不得不介绍消息中间件和jms.

消息中间件是分布式系统十分常见的组件,提供了以比较灵活的方式集成不同应用程序的一种机制,应用程序彼此不直接通信,而是与作为中介的消息中间件进行通信。

消息中间件主要有两个作用,一是解耦,二是平峰,都是大型系统中经常会遇到的问题。

所谓解耦,就是要保持系统内各个应用的相对独立性。例如,用户登录的时候,可能有多个应用要触发各种各样的操作,推荐啊,发消息啊之类的,这些操作跟登录这个操作本身是无关的,如果全都放在登录模块去做,显然不合适,而且用户体验会非常差。这个时候就可以在登陆的时候发一条消息出来,其他系统监听这个消息,再进行各自业务的处理。

平峰,就是为了应对突然的流量增长,毕竟系统分配资源时不可能按最大需求去分配。如果短时间流量太大,可以让消息先留在消息中间件里,业务慢慢处理。

消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)。

点对点模式在JMS里叫做Queue, 主要特点有三个:

  1. 每个消息只有一个消费者(Consumer)
  2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;
  3. 接收者在成功接收消息之后需向队列发送应答(ack)信号。

发布-订阅模式则对应JMS里的Topic,与点对点模式向对应,有如下特点:

  1. 每条消息可以被多个消费这消费
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态;

JMS是Sun定义的一种标准消息传送API。JMS自身并不是一种消息传送系统;它是消息传送客户端和消息传送系统通信时所需接口和类的一个抽象。与JDBC, JNDI等类似。

JMS对消息结构、连接、会话、生产者、消费者等都做了规定。

Activemq就是对JMS1.1的一种具体实现。在后续文章里,我们会对activemq的安装使用以及各种特性进行全面的介绍。

原文地址: https://lcy362.github.io/posts/12035/

0%