storm的周边生态非常丰富,与kafka,activemq,hdfs,hbase等的交互都有现成的工具包可以使用。大部分工具,包括今天介绍的这几个,在jstorm中也可以完全正常的使用。
storm-jms
实现了与activemq等jms实现的交互。
这里主要介绍JmsSpout。由于storm中发送队列数据与普通java程序没有任何区别,专门封装一个bolt显得有些多此一举。
https://github.com/ptgoetz/storm-jms
包中自带了使用spring方式加载队列配置。
使用示例
1 | JmsProvider jmsQueueProvider = new SpringJmsProvider( |
配置文件示例
1 | <?xml version="1.0"?> |
使用说明
新建一个xml配置,
代码中调用1
2
3JmsProvider jmsQueueProvider = new SpringJmsProvider(
"jms-activemq.xml", "jmsConnectionFactory",
"TEST_QUEUE");
三个参数依次为配置文件名, connectionFactory和queue节点的名称。
自定义输出
通过实现JmsTupleProducer可以实现个性化的输出。
以JsonTupleProducer 为例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class JsonTupleProducer implements JmsTupleProducer {
public Values toTuple(Message msg) throws JMSException {
if(msg instanceof TextMessage){
String json = ((TextMessage) msg).getText();
return new Values(json);
} else {
return null;
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("json"));
}
}
即将整条消息作为一个字段emit出去。
declareOutputFields和new Values 即storm中的对应函数。
通过在toTuple中做相应处理,可以实现定制化的输出。
关闭ack
除了将ack数设为0外,conf.setNumAckers(0);
还需要将jms的确认模式修改为自动ack:
1 | queueSpout.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); |
自定义jms连接器
JmsProvider类是JmsSpout中用于建立到队列的连接所用的类。
默认的SpringJmsProvider是通过spring配置来获取jms连接。
如果有特殊需求,也可以自己实现JmsProvider中的两个接口,用于获取连接ConnectionFactory和队列Destination
storm-kafka
https://github.com/apache/storm/tree/master/external/storm-kafka
从kafka中获取数据。
使用示例
1 | String topic = ""; //队列名 |
storm-hdfs
HdfsBolt支持向hdfs写入数据
使用示例
1 | SyncPolicy syncPolicy = new CountSyncPolicy(100); //每100条hdfs同步一次磁盘 |
示例说明
按照示例编码,输出到hdfs形式如下:
bolt接收到的每条数据为hdfs中的一行;
输出到hdfs路径为 设定的路径/日期/默认文件名;
hdfsbolt的每个线程会输出到一个文件,topology重启会产生新文件;
文件每天归档
其他功能
FileSizeRotationPolicy 支持按固定文件大小归档;TimedRotationPolicy支持按固定时间归档;
如有其他归档方式需求,可以实现FileRotationPolicy接口,参考DailyRotationPolicy源码。
通过实现FileNameFormat接口自定义文件路径及文件名,参考FileFolderByDateNameFormat
FileFolderByDateNameFormat
FileFolderByDateNameFormat实现目前常用的存储路径。
使用示例1
2FileFolderByDateNameFormat fileNameFormat = new FileFolderByDateNameFormat()
.withPath("/user/test").withName("");
会自动在路径之后增加日期,文件名中也会增加日期。
源码参考
除了storm-hdfs本身源码以外,可以参考https://github.com/lcy362/StormTrooper 查看FileFolderByDateNameFormat和DailyRotationPolicy的实现。