Mobility

聚沙成塔

对于一个java object的序列化,想测一下使用json和使用一般序列化工具,在时间性能、空间性能上的区别。

json选择用fastjson.

序列化工具使用了protostuff和kyro. 为什么不用protobuf呢?因为感觉对于一个已有的上百个属性的java class来说,再去新建一个匹配的proto文件有点反人类。protostuff是protobuf的改良版本,可以直接将一个java object进行序列化,使用方法与kyro有点类似,没有protobuf那么多中间过程。其他的,hession, java自带序列化之类的,据说性能比kryo和protobuf差很多,就不测了。

简单测了一下,发现差距还挺明显的,所以感觉也不需要做具体的评测了。把日志截一段发出来,大家感受下。

fastjson serilise cost <span class="hljs-number">555805</span>  <span class="hljs-built_in">length</span>: <span class="hljs-number">1740</span>
kyro serilise cost <span class="hljs-number">227375</span>   length502
protostuff serilise cost <span class="hljs-number">78950</span>   length633
fastjson deserilise cost <span class="hljs-number">130662</span>
kyro deserilise cost <span class="hljs-number">201716</span>
protostuff deserilise cost <span class="hljs-number">230533</span>
fastjson serilise cost <span class="hljs-number">727915</span>  <span class="hljs-built_in">length</span>: <span class="hljs-number">1740</span>
kyro serilise cost <span class="hljs-number">378958</span>   length502
protostuff serilise cost <span class="hljs-number">94739</span>   length633
fastjson deserilise cost <span class="hljs-number">154346</span>
kyro deserilise cost <span class="hljs-number">373432</span>
protostuff deserilise cost <span class="hljs-number">219085</span>
fastjson serilise cost <span class="hljs-number">804892</span>  <span class="hljs-built_in">length</span>: <span class="hljs-number">1740</span>
kyro serilise cost <span class="hljs-number">392380</span>   length502
protostuff serilise cost <span class="hljs-number">220664</span>   length633
fastjson deserilise cost <span class="hljs-number">243560</span>
kyro deserilise cost <span class="hljs-number">360010</span>
protostuff deserilise cost <span class="hljs-number">132241</span>
fastjson serilise cost <span class="hljs-number">601991</span>  <span class="hljs-built_in">length</span>: <span class="hljs-number">1740</span>
kyro serilise cost <span class="hljs-number">244349</span>   length502
protostuff serilise cost <span class="hljs-number">80924</span>   length633
fastjson deserilise cost <span class="hljs-number">241191</span>
kyro deserilise cost <span class="hljs-number">230928</span>
protostuff deserilise cost <span class="hljs-number">127109</span>

cost的时间用的是System.nanoTime(); 三种用的都是不加任何配置的默认配置。

序列化之后的占用空间,kryo略低于protostuff, 两者都远高于json. 这是很好理解的,毕竟json串是可读的,不要强求太多。

而序列化和反序列化的耗时,都是protostuff优于kyro优于fastjson, 而且差别挺明显。

所以结论呢,如果对空间没有极其苛刻的要求,protostuff也许是最佳选择。protostuff相比于kyro还有一个额外的好处,就是如果序列化之后,反序列化之前这段时间内,java class增加了字段(这在实际业务中是无法避免的事情),kyro就废了。但是protostuff只要保证新字段添加在类的最后,而且用的是sun系列的JDK, 是可以正常使用的。因此,如果序列化是用在缓存等场景下,序列化对象需要存储很久,也就只能选择protostuff了。

当然,如果有可读性之类的需求,就只能用json了。

 

这篇文章只讲使用,不讲原理,简单粗暴。

分布式锁,顾名思义,就是分布式的锁,应用于一些分布式系统中。例如,有一个服务部在数太机器上,然后有可能操作数据库中的同一条记录。这时,就需要分布式锁。

分布式锁实现的方式很多,一般来说需要一个实体来代表一个锁,占用锁时就新建这个实体,锁释放时也对应将相应实体删除。同时,一般还需要一个锁超时过期的策略,避免一些异常情况造成锁无法被释放。

zookeeper和redis都是常用的实现分布式锁的方式。接下来就简单介绍一下这两种方式的使用。

基于zookeeper的分布式锁

使用zookeeper的话,建议直接使用curator客户端.

        <span class="hljs-tag">&lt;<span class="hljs-title">dependency</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-title">groupId</span>&gt;</span>org.apache.curator<span class="hljs-tag">&lt;/<span class="hljs-title">groupId</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-title">artifactId</span>&gt;</span>curator-recipes<span class="hljs-tag">&lt;/<span class="hljs-title">artifactId</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-title">version</span>&gt;</span>2.9.1<span class="hljs-tag">&lt;/<span class="hljs-title">version</span>&gt;</span>
        <span class="hljs-tag">&lt;/<span class="hljs-title">dependency</span>&gt;</span>`</pre>

curator中实现了一个InterProcessSemaphoreMutex类,用作分布式锁。

实现原理其实也很直白:建立锁的时候约定一个路径新建一个节点,作为锁的实体;锁释放时就将这个节点删除。

代码例子片段:

<pre class="prettyprint">`InterProcessSemaphoreMutex lock;
lock.acquire(&hellip;);  <span class="hljs-comment">//acquire获取锁</span>
&hellip;.
lock.release();释放锁

详细的使用例子代码可以参考 https://github.com/lcy362/Scenes/blob/master/src/main/java/com/mallow/concurrent/zklock/InterProcessMutexExample.java

基于redis的分布式锁

同样推荐一个第三方的redis客户端redisson, https://github.com/redisson/redisson. redisson的知名度不如curator高,但也是一个非常优秀的开源工具,支持各种集群、数据结构。

redis锁的原理就是占用锁时新建一个key, 锁释放时key删除。

代码示例可以参考https://github.com/lcy362/Scenes/blob/master/src/main/java/com/mallow/concurrent/redislock/ValuelockExample.java

题目

Given a string, find the length of the longest substring without repeating characters.

Examples:

Given “abcabcbb”, the answer is “abc”, which the length is 3.

Given “bbbbb”, the answer is “b”, with the length of 1.

Given “pwwkew”, the answer is “wke”, with the length of 3. Note that the answer must be a substring, “pwke” is a subsequence and not a substring.

也就是说给定一个字符串,输出不包含重复字母的最长子串长度。

思路

遍历一次字符串,O(n)复杂度下可以解决。主要思路就是在遍历的过程中

1. 记录每个字母上一次出现的位置

2. 维持一个从当前位置往前数不包含重复字母的子串,记录这个字串的起止位置start, end

遍历的过程中就是根据相应位置字母是否出现过,以及上次出现的位置,不断更新start, end的过程。

代码

可以到github上查看: https://github.com/lcy362/Algorithms/tree/master/src/main/java/com/mallow/algorithm

<span class="hljs-keyword">import</span> java.util.HashMap;

<span class="hljs-javadoc">/**
 * leetcode 3
 * https://leetcode.com/problems/longest-substring-without-repeating-characters/
 * Created by lcy on 2017/2/15.
 */</span>
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LongestSubstringNotRepeat</span> {</span>
    <span class="hljs-keyword">public</span> <span class="hljs-keyword">int</span> <span class="hljs-title">lengthOfLongestSubstring</span>(String s) {
        <span class="hljs-keyword">if</span> (s.length() &lt;= <span class="hljs-number">1</span>) {
            <span class="hljs-keyword">return</span> s.length();
        }
        HashMap&lt;Character, Integer&gt; charPos = <span class="hljs-keyword">new</span> HashMap&lt;&gt;();
        <span class="hljs-keyword">char</span>[] chars = s.toCharArray();
        <span class="hljs-keyword">int</span> len = <span class="hljs-number">0</span>;
        <span class="hljs-keyword">int</span> max = <span class="hljs-number">0</span>;
        <span class="hljs-keyword">int</span> start = <span class="hljs-number">0</span>;
        <span class="hljs-keyword">int</span> end = <span class="hljs-number">0</span>;
        <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">0</span>; i &lt; chars.length; i++) {
            <span class="hljs-keyword">if</span> (charPos.containsKey(chars[i])) {
                <span class="hljs-keyword">int</span> tempstart = charPos.get(chars[i]) + <span class="hljs-number">1</span>;
                <span class="hljs-keyword">if</span> (tempstart &gt; start) {
                    start = tempstart;
                }
                end++;
                len = end - start;
            } <span class="hljs-keyword">else</span> {
                len++;
                end++;
            }
            charPos.put(chars[i], i);
            <span class="hljs-keyword">if</span> (len &gt; max) {
                max = len;
            }
        }
        <span class="hljs-keyword">return</span> max;
    }

    <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String args[]) {
        LongestSubstringNotRepeat l = <span class="hljs-keyword">new</span> LongestSubstringNotRepeat();
        System.out.println(l.lengthOfLongestSubstring(<span class="hljs-string">"abcabcbb"</span>));
        System.out.println(l.lengthOfLongestSubstring(<span class="hljs-string">"bbbbb"</span>));
        System.out.println(l.lengthOfLongestSubstring(<span class="hljs-string">"pwwkew"</span>));
        System.out.println(l.lengthOfLongestSubstring(<span class="hljs-string">"abba"</span>));
    }

主要还是通过一个例子加深一下对java多线程里wait,notify的理解,因此写了一个例子,三个线程分别输出A,B,C三个字母,控制这三个线程的执行顺序,从而实现ABCABCABC..这样的输出。

这个问题主要还是需要设计一下锁的策略,这里只是提供了一种方式:

每个线程占用两把锁,分别代表自己(self)和前一个线程(prev), 三个线程的持有锁情况如下表所示:

线程号prev锁self锁
A c a
B a b
C b c

A 首先启动,持有ac, 运行后先释放a, b可以执行。

线程run方法代码如下:

    <span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">run</span>() {
        <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
            <span class="hljs-keyword">synchronized</span> (prev) {
                <span class="hljs-keyword">synchronized</span> (self) {
                    System.out.print(name);
                    self.notify();
                }
                <span class="hljs-keyword">try</span> {
                    <span class="hljs-comment">//如果想控制输出速度, 可以将sleep加在此处</span>
                    <span class="hljs-comment">//如果加在sout之后,会导致c线程启动并占有b锁之后,a线程才会释放a锁,输出顺序会变成acbacb</span>
                    <span class="hljs-comment">//也可以加大三个线程启动的间隔时间解决这一问题</span>
                    <span class="hljs-keyword">try</span> {
                        Thread.sleep(<span class="hljs-number">1000</span>);
                    } <span class="hljs-keyword">catch</span> (InterruptedException e) {
                        e.printStackTrace();
                    }
                    prev.wait();
                } <span class="hljs-keyword">catch</span> (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

需要注意的是,如果想控制输出速度,需要考虑一下sleep的位置和时间,避免在A线程执行完并释放a锁之前,C线程已经启动并持有了B锁,导致B线程无法正常启动。

问题过程

我司需要接收很多外部数据,数据源的形式很多,ibmmq, activemq, redis pubsub, 等等都有。为了将这些数据接到内部amq/kafka,之前运行了一大批进程,管理起来十分复杂,因此最近用apache-camel对这些进程作了整合。

上线几个小时之后,kafka磁盘空间开始报警。初步断定是这次上线导致的。

排查流程

主要还是对kafka不熟悉,只是能用而已,因此排查过程走了不少弯路。

由于camel本身文档很不完善,一开始配置参数时也主要靠看源码+猜,所以首先怀疑配置的压缩参数compressionCodec=gzip是否无效。

因此进行了一次简单的测试(别问我为什么一开始不测)。建了一个单分区的测试topic, 然后分别使用gzip和none模式发送相同的数据,观察kafka的log file文件大小变化趋势。发现压缩确实是有效的。

因此又跟直接使用kafka api作了对比,发现差别非常大,即使是压缩之后,使用camel-kafka占用的空间也比使用api大数倍。然后去查了两边源码,发现最底层的代码是完全一致的,所以还是怀疑某些参数配的不对。

之后又注意到一个细节,使用camel时,log的大小跟消息数呈线性关系,比如一条占1字节,10条就占10字节。但使用api的话,1条也占1字节,连续发10条可能才占两字节。

这时候就怀疑kafka是不是有批量发送之类的机制,然后咨询了负责kafka的同事,果然是这个原因,而造成占用空间差别大的原因就是是否同步发的区别,同步发的话就不存在批量发送了。批量发送的话,这一批消息会被压缩在一起,而单条发时,就是每一条分别压缩。我们知道,在文件非常小的时候,使用gzip压缩的效果是很差的,甚至可能压完比源文件还大。然后又做了些测试,确定了是这个问题。这个参数被同事封装在了kafka的client接口里,因此导致我照着之前代码改参数时漏掉了这一个。

一些感悟

其实回想起来,这个问题挺low的,如果对kafka多些了解,是不会有这种问题的。

首先,对kafka没做过全面的了解,只是学会了怎么用,大概了解了一下它是什么。而很多基本的机制都没有概念。使用一个开源工具时,对它做一次全面的了解还是很重要的,虽然每个工具都深入研究底层代码不现实,但是系统性的了解一遍这些工具有什么机制,确实花不了多少时间。

再一个,公司内一般会封装一些访问各种组件的工具包,以提升效率,这些工具包最好也了解一下怎么实现的,否则可能不经意间就掉坑里了。

任重而道远啊..

Integer类实质上也是一个普通的java类,即使值相同,也是不同的对象。

例如

        Integer a = <span class="hljs-number">148</span>;
        Integer b = <span class="hljs-number">148</span>;
        System.out.println(a==b);`</pre>

这时输出为false. 很容易理解。

但是如果把值换成128以下的数,比如48.

<pre class="prettyprint">`        Integer a = <span class="hljs-number">48</span>;
        Integer b = <span class="hljs-number">48</span>;
        System.out.println(a==b);`</pre>

这时就会发现输出变成了true。原因是jdk对128以下的整数作了缓存,当声明两个值为48的Integer对象时,其实是指向同一位置。

当然也可以强制声明一个新的Integer对象。

<pre class="prettyprint">`        Integer a = <span class="hljs-number">48</span>;
        Integer b = <span class="hljs-keyword">new</span> Integer(<span class="hljs-number">48</span>);
        System.out.println(a==b); 

这时输出就变成false了

storm的周边生态非常丰富,与kafka,activemq,hdfs,hbase等的交互都有现成的工具包可以使用。大部分工具,包括今天介绍的这几个,在jstorm中也可以完全正常的使用。

storm-jms

实现了与activemq等jms实现的交互。

这里主要介绍JmsSpout。由于storm中发送队列数据与普通java程序没有任何区别,专门封装一个bolt显得有些多此一举。

https://github.com/ptgoetz/storm-jms

包中自带了使用spring方式加载队列配置。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
JmsProvider jmsQueueProvider = new SpringJmsProvider(
"jms-activemq.xml", "jmsConnectionFactory",
"TEST_QUEUE");
JmsTupleProducer producer = new JsonTupleProducer();

// JMS Queue Spout
JmsSpout queueSpout = new JmsSpout();
queueSpout.setJmsProvider(jmsQueueProvider);
queueSpout.setJmsTupleProducer(producer);
queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
queueSpout.setDistributed(true);
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("jms", queueSpout, 30);

配置文件示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<amq:queue id="detrUpdateQueue" physicalName="TEST_QUEUE" />

<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="failover:(tcp://xxx:61616)?jms.prefetchPolicy.queuePrefetch=10" />

</beans>

使用说明

新建一个xml配置,amq:queue 节点配置队列名。

amq:connectionFactory节点配置activemq的连接url。

代码中调用

1
2
3
JmsProvider jmsQueueProvider = new SpringJmsProvider(
"jms-activemq.xml", "jmsConnectionFactory",
"TEST_QUEUE");

三个参数依次为配置文件名, connectionFactory和queue节点的名称。

自定义输出

通过实现JmsTupleProducer可以实现个性化的输出。

以JsonTupleProducer 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class JsonTupleProducer implements JmsTupleProducer {

@Override
public Values toTuple(Message msg) throws JMSException {
if(msg instanceof TextMessage){
String json = ((TextMessage) msg).getText();
return new Values(json);
} else {
return null;
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("json"));
}

}

即将整条消息作为一个字段emit出去。

declareOutputFields和new Values 即storm中的对应函数。

通过在toTuple中做相应处理,可以实现定制化的输出。

关闭ack

除了将ack数设为0外,conf.setNumAckers(0);

还需要将jms的确认模式修改为自动ack:

1
queueSpout.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);

自定义jms连接器

JmsProvider类是JmsSpout中用于建立到队列的连接所用的类。

默认的SpringJmsProvider是通过spring配置来获取jms连接。

如果有特殊需求,也可以自己实现JmsProvider中的两个接口,用于获取连接ConnectionFactory和队列Destination

storm-kafka

https://github.com/apache/storm/tree/master/external/storm-kafka

从kafka中获取数据。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
String topic = ""; //队列名
String zkRoot = "/kafkastorm"; //kafka在zookeeper中记录offset的根目录,无需改变
String id = ""; // consumer id
String kafkaZk = ""; //kafka的zookeeper地址
BrokerHosts brokerHosts = new ZkHosts(kafkaZk);
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, topic, zkRoot, id);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.ignoreZkOffsets = true; //false时使用kafka中存储的offset信息
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); //读取数据开始位置,只有ignoreZkOffsets设为true时startOffsetTime才能生效
kafkaConfig.zkServers = new ArrayList<String>(){ {
add("1"); //存储kafkaoffset的zookeeper节点
} };
kafkaConfig.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(topic, new KafkaSpout(kafkaConfig), 10); //并行度不能高于kafka分区数
builder.setBolt....

storm-hdfs

HdfsBolt支持向hdfs写入数据

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SyncPolicy syncPolicy = new CountSyncPolicy(100); //每100条hdfs同步一次磁盘
DailyRotationPolicy rotationPolicy = new DailyRotationPolicy(); //文件每天归档

FileFolderByDateNameFormat fileNameFormat = new FileFolderByDateNameFormat()
.withPath("/user/lcy/jdata"); //hdfs路径
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(","); //不同field之间的分隔符

UmeHdfsBolt hdfsbolt = new UmeHdfsBolt()
.withFsUrl("hdfs://:8020") //hdfs url
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);

示例说明

按照示例编码,输出到hdfs形式如下:

bolt接收到的每条数据为hdfs中的一行;

输出到hdfs路径为 设定的路径/日期/默认文件名;

hdfsbolt的每个线程会输出到一个文件,topology重启会产生新文件;

文件每天归档

其他功能

FileSizeRotationPolicy 支持按固定文件大小归档;TimedRotationPolicy支持按固定时间归档;

如有其他归档方式需求,可以实现FileRotationPolicy接口,参考DailyRotationPolicy源码。

通过实现FileNameFormat接口自定义文件路径及文件名,参考FileFolderByDateNameFormat

FileFolderByDateNameFormat

FileFolderByDateNameFormat实现目前常用的存储路径。
使用示例

1
2
FileFolderByDateNameFormat fileNameFormat = new FileFolderByDateNameFormat()
.withPath("/user/test").withName("");

会自动在路径之后增加日期,文件名中也会增加日期。

源码参考

除了storm-hdfs本身源码以外,可以参考https://github.com/lcy362/StormTrooper 查看FileFolderByDateNameFormat和DailyRotationPolicy的实现。

UI说明

jstorm的UI相对于storm提供了更为丰富的监控项。UI本身是在tomcat中运行的一个war包,进行二次开发也相对容易。

cluster页

Cluster Summary, Cluster Stats, Topology Summary

cluster的整体信息, conf中是nimbus节点的配置。

Topology Summary

当前运行的所有topology列表及概要信息,conf中对应的是topology单独配置的配置项。

Supervisor Summary

所有supervisor列表, 可以查看supervisor的配置、日志等。

topology页

Topology Summary

主要关注Topology Graph 项,这是Component Metrics 页的概要信息,可以较直观的看出运行状况。

其中,节点大小和箭头粗细代表数据量,节点颜色用于区分spout和bolt。箭头颜色对应TupleLifeCycle属性,由绿到红越来越慢。箭头变红色时,可以适当增加下游节点的并行度。

Component Metrics

包含每个spout, bolt的统计信息。

各参数说明:

tasks : 并行度。对应代码中为每个spout/bolt设置的并行度。

emitted, acked, failed : 发射/收到ack/未收到ack的消息数。

sendTps, recvTps: 发送/接收数据tps.

需要注意,以上数据交互类的指标包含了与storm本身组件,topology_master,acker之间的交互,只能用来查看运行状况,不适合统计用

process: 对于bolt来说,是excute()函数的执行时间,对于spout来说,是整条消息(包含下游各步)被完整处理的时间,体现系统运行效率的主要参数。

TupleLifeCycle: 从上游消息被emit,到消息处理完所需时间。和process相比相差太大时,说明在bolt队列中等待了较长时间,可以适当增加并发。

deser, ser: 序列化/反序列化耗时,一般无需关注. storm中每一个emit出来的数据都需要做序列化和反序列化。

error: 鼠标悬浮在E上,可以查看具体的报错信息。主要寻找带异常堆栈的数据, 一些诸如is dead, no response之类的信息无需关注。

运行机制

topology里spout/bolt的整体结构不再细讲,主要说说storm/jstorm topology运行时与传统java程序可能存在的区别。其实区别非常少,主要也体现在初始化上,本文的目的在于帮助开发人员在无需了解storm内核原理的情况下,排查topology程序可能出现的问题。

1个topology会包含多个spout线程和bolt线程,分散运行在数个worker(进程)中。同一个worker中可能同时运行多个bolt/spout的数个线程。

与普通java程序的区别

main方法

main方法只在启动时运行在nimbus中,因此除了storm本身的配置项外,其他程序相关的配置,如spring配置等,配置在main方法中不会起作用

bolt

bolt 的主体结构包含prepare, excute, cleanup 三部分。

其中,prepare在初始化时执行一次,cleanup在退出前执行一次,excute每条消息执行。

一些配置,包括加密,spring加载等,建议都放到prepare中。多个bolt都需要加载spring时,建议使用同样的配置,避免一些诡异问题。

序列化

所有静态代码块中作了初始化的变量,emit的变量,由于都存在网络传输,需要能够被序列化。

storm默认使用kyro序列化,需要类有无参构造函数。如果无法增加无参构造函数,设置topology.fall.back.on.java.serialization: true使用java自带的序列化。

activemq的web console是基于jetty实现,其权限管理也是基于jetty. 根据需求,可以给不同的用户赋予不同的权限。jetty的权限管理还算灵活,虽然配起来比较麻烦,可以分别设定某个角色(role)下的用户是否有对某个页面的访问权限。
下面简要介绍一下配置方法,只需要修改/conf 下的 jetty.xml, jetty-realm.properties
1. jetty-realm.properties
  这里面配置了所有用户的用户名,密码和所属角色,按照如下格式:
username: password [,rolename ...]   
2\. jetty.xml
首先对每个角色配置一个Constraint 类,其中roles及对应 jetty-realm.properties中的rolename
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />

        <property name=”roles” value=”admin” />

        <property name=”authenticate” value=”true” />

    </bean>
然后配置securityConstraintMapping, 
    <bean id="securityConstraintMapping" class="org.eclipse.jetty.security.ConstraintMapping">
        <property name="constraint" ref="securityConstraint" />
        <property name="pathSpec" value="/admin/send.jsp/" />
    </bean>
这表示securityConstraint类对应的角色可以访问/admin/send.jsp 页面。
可以使用/* 代表所有未单独配置的页面
假设我们需要新建一个只读用户,就可以配置两个角色admin和readonly,这两个角色都需要增加/*的ConstraintMapping 条目,然后在admin上额外配置所有涉及写操作的页面,包括/admin/deleteDestination.action/*, /admin/purgeDestination.action/* 等。
最后,在ConstraintSecurityHandler的constraintMappings属性里,把所有的ConstraintMapping都列出来。
<bean id="securityHandler" class="org.eclipse.jetty.security.ConstraintSecurityHandler">
    <property name="constraintMappings">

            <list>

                <ref bean="securityConstraintMapping" />
            </list>
        </property>
 
这样,就实现了activemq web console用户的权限配置。

 

0%