0%

在之前的一篇文章通过实际操作理解redis cluster原理中,我们简单介绍过redis cluster的设计原理。redis cluster中的数据是根据一定规则分配到16384个slot中,这些slot又根据配置对应到不同的节点上。我们知道,在集群稳定运行后,仍然可以以slot为单位转移数据,不过对于具体的转移过程,包括转移过程中集群的可用性等问题,一直不是太确定,所以这次详细了解了一下。

阅读全文 »

概述

AsyncLoopThread是jstorm里自定义的一个循环执行任务的工具,实现不复杂,本来是不值当的专门开一篇文章介绍。不过这个在jstorm里应用实在太广泛了,诸如uspervisor/nimbus心跳,获取新topology,更新worker状态等大量功能都是利用AsyncLoopThread实现的,所以还是介绍一下吧,也方便后续看其他部分代码。

阅读全文 »

介绍

数据的持久化是很多系统都会涉及到的一个问题,尤其是redis,activemq这些数据主要是存储在内存中的。既然存在内存中,就会面临宕机时数据丢失的风险。这一问题的解决方案就是通过某种方式将数据落到磁盘上,也就是所谓的持久化。

activemq提供了三种持久化方式,分别基于jdbc, kahadb和leveldb. 目前官方最推荐的是基于kahadb的持久化。

阅读全文 »

安装

官网下载最新版本,解压,其他的前期准备只需要安装jdk。 从activemq 5.14开始,只支持jdk8。

配置

核心配置文件是conf目录下的activemq.xml, 默认的配置无需修改即可使用,其他配置我们会在后续文章介绍activemq各种特性时详细介绍。

启动

bin目录下运行 activemq start 即可 ,启动之后浏览器打开localhost:8161 可以打开web管理页面。

基本api使用

这里只介绍java下的api使用,activemq默认的openwire协议是只支持java的,因为实现的JMS协议是java下的协议。如果有其他语言的访问需求,可以用stomp,amqp等协议。

依赖包

引入activemq官方的client包。

1
2
3
4
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>

actibemq收发消息的主要步骤类似,都需要先建立连接,然后建立会话,然后再新建需要的producer,consumer.

示例代码如下

发送消息

1
2
3
4
5
6
7
8
9
10
11
public void produce() throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://localhost:61616");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("123");
producer.send(message);
}

接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    public void consume() throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://localhost:61616");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST");
MessageConsumer consumer = session.createConsumer(queue);

// Message message = consumer.receive();
// TextMessage textMessage = (TextMessage) message;
// System.out.println(textMessage.getText());

consumer.setMessageListener(message -> {
try {
System.out.println("consumer1: " + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});



Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue2 = session2.createQueue("TEST");
MessageConsumer consumer2 = session2.createConsumer(queue2);
consumer2.setMessageListener(message -> {
try {
System.out.println("consumer2: " + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});

Queue queue3 = session2.createQueue("TEST");
MessageConsumer consumer3 = session2.createConsumer(queue3);
consumer3.setMessageListener(message -> {
try {
System.out.println("consumer3: " + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});



try {
TimeUnit.MINUTES.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

connection,session,consumer都可以创建多个,实现并行消费。一般来说需要使用多connection或者多session,因为只是多consumer的话,共用一个session,只是多个consumer轮流执行而已,不是真正的并行消费。至于connection和session的选择,一般看具体的流量需求。一个connection对应的是一个物理的tcp连接。

producer其实也类似,只是一般并行发消息意义不大,所以就不贴代码了。

浏览消息

activemq提供了一个有用的功能,brower, 作用与consumer类似,只是不会真正将消息消费掉,只是预览消息内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void brower() throws Exception{
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://localhost:61616");
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST");
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
TextMessage message = (TextMessage) enumeration.nextElement();
System.out.println("Browsing: " + message.getText());
}

}

原文地址: https://lcy362.github.io/posts/48216/

Activemq是一种消息中间件(MOM),基于JMS协议实现。介绍activemq就不得不介绍消息中间件和jms.

消息中间件是分布式系统十分常见的组件,提供了以比较灵活的方式集成不同应用程序的一种机制,应用程序彼此不直接通信,而是与作为中介的消息中间件进行通信。

消息中间件主要有两个作用,一是解耦,二是平峰,都是大型系统中经常会遇到的问题。

所谓解耦,就是要保持系统内各个应用的相对独立性。例如,用户登录的时候,可能有多个应用要触发各种各样的操作,推荐啊,发消息啊之类的,这些操作跟登录这个操作本身是无关的,如果全都放在登录模块去做,显然不合适,而且用户体验会非常差。这个时候就可以在登陆的时候发一条消息出来,其他系统监听这个消息,再进行各自业务的处理。

平峰,就是为了应对突然的流量增长,毕竟系统分配资源时不可能按最大需求去分配。如果短时间流量太大,可以让消息先留在消息中间件里,业务慢慢处理。

消息中间件一般有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)。

点对点模式在JMS里叫做Queue, 主要特点有三个:

  1. 每个消息只有一个消费者(Consumer)
  2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;
  3. 接收者在成功接收消息之后需向队列发送应答(ack)信号。

发布-订阅模式则对应JMS里的Topic,与点对点模式向对应,有如下特点:

  1. 每条消息可以被多个消费这消费
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态;

JMS是Sun定义的一种标准消息传送API。JMS自身并不是一种消息传送系统;它是消息传送客户端和消息传送系统通信时所需接口和类的一个抽象。与JDBC, JNDI等类似。

JMS对消息结构、连接、会话、生产者、消费者等都做了规定。

Activemq就是对JMS1.1的一种具体实现。在后续文章里,我们会对activemq的安装使用以及各种特性进行全面的介绍。

原文地址: https://lcy362.github.io/posts/12035/

原文地址: https://lcy362.github.io/posts/19890/

Hexo是一款开源的博客系统。对于一个后端程序员来说,不想折腾前端的东西,但是csdn,博客园之类的用起来还是不太方便,自己搭博客又麻烦,做出来还丑。偶然间看到了hexo,这个对后端程序员来说可以说是非常友好了。所以也写篇文章记录一下hexo安装,一些关键配置,以及部署到github的过程。

安装及初始化

参考官方文档 就可以了。hexo是基于node.js的,用过node的自然没有任何问题,没用过也没关系,照着说明文档做就可以了。

hexo支持直接向github的page发布,只需要配置好自己的github信息就可以。

主题

hexo有很多定制主题, 按个人喜好使用吧, 我用的是next , 这款主题功能非常多,统计、搜索之类的都是一条配置都搞定了。不过这款的一些基础配置和其他主题似乎是有些区别的,所以用了以后如果以后想换别的可能会有点困难。

内容迁移

hexo提供了多个从其他博客迁移数据的插件,rss,blogger等等都可以。

以博客园为例,博客园的博客可以导出一个rss文件,然后我们用hexo-migrator-rss就可以生成hexo格式的文件了,不过有可能需要做一些微调。

插件及第三方服务

前边说了,hexo配合next主题,很多工具用起来会非常方便。

主要参考next的文档就可以了,不过这份文档有些老了,具体的还要参考next的主题配置文件,里边的说明也比较详细。

推荐一些比较有用的:

  • 百度、google等的统计工具: 在next里是把自己的id配上就可以,就不用去加js代码了
  • gitment: 基于github issue的评论系统, 用github账号登录以后就可以发评论了。毕竟看技术博客的人github账号大家都有,这样比其他评论系统方便些。
  • hexo-generator-searchdb: 一个本地搜索工具,使用之后在博客首页加个搜索框
  • hexo-generator-feed: 生成rss文件,以支持订阅
  • hexo-abbrlink:默认的文章地址是带文章标题的,特别是中文标题真的是反人类,这个会给每篇文章生成一个id,然后用id做地址
  • hexo-generator-robotstxt, hexo-generator-sitemap: 生成sitemap,robots.txt, 帮助搜索引擎爬数据,不多说了
  • leancloud: 统计每篇文章访问量,并且在页面上显示

包括部署到github, 也有现成的配置可以用,简单配置一下就好了。

弄完之后,就是大家现在看到的样子了。

Jstorm的UI中提供了大量非常详细的监控参数,对于我们排查问题帮助非常大,关于UI,可以参考我之前的另一篇文章: https://lcy362.github.io/posts/31996/ 。 不过,UI这种方式用起来有时可能会不太方便,比如需要查历史数据的时候。所以我们希望将监控数据输出到别的存储介质中,方便后续查询、分析。

由于jstorm的监控相比于apache-storm进行了完全的重写,所以网上查到的storm的监控输出方式并不适用于jstorm. 而jstorm除了官方文档以外实在缺少资料,官方文档又太简略,给的只是一些线索性的东西,具体还要结合这些线索去翻阅源码。所以我整理了一个jstorm监控数据输出的例子。

首先需要实现MetricUploader这个接口,不过其实我们并不会实际使用这个接口里的哪个方法,主要是要去用它的TopologyMetricsRunnable这个参数,然后用这个参数去取监控信息。所以理论上只要拿到TopologyMetricsRunnable就行,并不一定非要实现MetricUploader接口。我的做法是实现MetricUploader,然后自己起一个定时的线程池,定时去取监控数据。

jstorm的metric数据存在rocksdb里,这里取的数据实质上是用jstorm封装好的接口去查询rocksdb。

具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ClusterSummary clusterInfo = client.getClient().getClusterInfo();
//get list of topologies in this cluster
List<TopologySummary> topologies = clusterInfo.get_topologies();
for (TopologySummary topology : topologies) {
//get topology id and name
//the id is used for query, name for human reading
logger.info("topology info " + topology.get_id() + " " + topology.get_name());
TopologyMetric metric = metricsRunnable.getTopologyMetric(topology.get_id());
//get data of "component metrics" page in jstorm UI
MetricInfo componentMetric = metric.get_componentMetric();
Map<String, Map<Integer, MetricSnapshot>> metrics = componentMetric.get_metrics();
for (Map.Entry<String, Map<Integer, MetricSnapshot>> oneMetric : metrics.entrySet()) {
String[] key = oneMetric.getKey().split("@");
String metricKey = key[1] + "@" + key[2] + "@" + key[6];
//get(60) to get data in 1 min, also can get(600) for 10min, and so on
logger.info("metric one minute data for " + metricKey + " " + oneMetric.getValue().get(60));
}
}

整个流程比较清晰,首先需要去查询集群中topology的列表,然后使用每一个topology id去查询metric信息,得到一个TopologyMetric类, TopologyMetric里包含topologyMetric,componentMetric,workerMetric等属性,这个分别与UI页面里对应。

以componentMetric为例, 可以使用componentMetric.get_metrics(); 拿到具体的监控metric数据, 一个metric是一个Map<String, Map<Integer, MetricSnapshot>>, 其中key是一个@符分隔的字符串,里边包含topology名,component名,数据项等关键的key信息,value里这个map的key是一个时间,单位为秒,对应UI上1分钟,2分钟那几页,value就是具体的监控数据,这个数据其实比UI展示出来的更丰富,除了均值外,还有诸如95线,99线等。

在这个例子里,我只是用打日志的方式,将部分数据输出。具体用的时候,可以根据需求使用hbase, redis,mysql等存储介质。

具体代码可以查看 https://github.com/lcy362/StormTrooper/blob/master/src/main/java/com/trooper/storm/monitor/MetricUploaderTest.java

apache-camel 作为数据路由的利器,使用起来非常方便。不过与此同时,也有一个问题,就是由于封装的过于完善,隐藏了很多技术细节,所以一旦有问题,排查会比较困难。好在官方提供了一个debug工具,可以帮助我们正常的打断点、调试,http://camel.apache.org/debugger.html, 在本文中会对官方文档做一些补充。

首先需要引入camel-test包:

1
2
3
4
5
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<version>2.16.2</version>
</dependency>

之后新建一个类并实现CamelTestSupport 。

1
public class CamelDebugger extends CamelTestSupport {

CamelTestSupport 中有大量的方法,可以根据需要选择一些进行实现,介绍一下其中一些比较重要的。

1. createCamelContext() 这个方法可以定义自己的camelContext进行测试.

2. createRouteBuilder() 这个方法则是使用默认的camelContext,但是加入自己的route

3. debugBefore 和 debugAfter, 这两个方法分别在一条消息被处理前后被执行, 参数里包括exchange, processor等必要信息。真正debug时,也就是在这两个方法里写日志或者打断点。

具体例子代码可以查看: https://github.com/lcy362/CamelDemo/blob/7aef2cc7661236499896022f6976c160b73b68e7/src/main/java/com/mallow/demo/camel/debugger/CamelDebugger.java

问题

用过storm或者jstorm的都知道,如果在bolt代码中发生了没被catch住的异常,所在worker进程会退出。本文就从源码角度分析一下具体设计,其实并不是“有异常然后进程崩了”这么简单。

阅读全文 »

除了通过properties,xml等格式的配置文件对log4j进行配置外,log4j还提供了各种接口,可以用代码动态修改log4j的配置,例如给一个logger增加一个appender。方法很简单,就是新建一个appder,然后添加到logger上,示例代码如下:

1
2
3
4
5
6
7
8
9
KafkaLog4jAppender kafkaAppender = new KafkaLog4jAppender();
kafkaAppender.setBrokerList(broker);
kafkaAppender.setTopic(topic);
kafkaAppender.setCompressionType("gzip");
kafkaAppender.setSyncSend(false);
kafkaAppender.setLayout(new PatternLayout(layout));
kafkaAppender.activateOptions();
logger.addAppender(kafkaAppender);
logger.setLevel(Level.INFO);

这里以一个kafkaappender做例子,其他的,例如DailyRollingFileAppender等,都是类似的。