0%

storm的周边生态非常丰富,与kafka,activemq,hdfs,hbase等的交互都有现成的工具包可以使用。大部分工具,包括今天介绍的这几个,在jstorm中也可以完全正常的使用。

storm-jms

实现了与activemq等jms实现的交互。

这里主要介绍JmsSpout。由于storm中发送队列数据与普通java程序没有任何区别,专门封装一个bolt显得有些多此一举。

https://github.com/ptgoetz/storm-jms

包中自带了使用spring方式加载队列配置。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
JmsProvider jmsQueueProvider = new SpringJmsProvider(
"jms-activemq.xml", "jmsConnectionFactory",
"TEST_QUEUE");
JmsTupleProducer producer = new JsonTupleProducer();

// JMS Queue Spout
JmsSpout queueSpout = new JmsSpout();
queueSpout.setJmsProvider(jmsQueueProvider);
queueSpout.setJmsTupleProducer(producer);
queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
queueSpout.setDistributed(true);
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("jms", queueSpout, 30);

配置文件示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<amq:queue id="detrUpdateQueue" physicalName="TEST_QUEUE" />

<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="failover:(tcp://xxx:61616)?jms.prefetchPolicy.queuePrefetch=10" />

</beans>

使用说明

新建一个xml配置,amq:queue 节点配置队列名。

amq:connectionFactory节点配置activemq的连接url。

代码中调用

1
2
3
JmsProvider jmsQueueProvider = new SpringJmsProvider(
"jms-activemq.xml", "jmsConnectionFactory",
"TEST_QUEUE");

三个参数依次为配置文件名, connectionFactory和queue节点的名称。

自定义输出

通过实现JmsTupleProducer可以实现个性化的输出。

以JsonTupleProducer 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class JsonTupleProducer implements JmsTupleProducer {

@Override
public Values toTuple(Message msg) throws JMSException {
if(msg instanceof TextMessage){
String json = ((TextMessage) msg).getText();
return new Values(json);
} else {
return null;
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("json"));
}

}

即将整条消息作为一个字段emit出去。

declareOutputFields和new Values 即storm中的对应函数。

通过在toTuple中做相应处理,可以实现定制化的输出。

关闭ack

除了将ack数设为0外,conf.setNumAckers(0);

还需要将jms的确认模式修改为自动ack:

1
queueSpout.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);

自定义jms连接器

JmsProvider类是JmsSpout中用于建立到队列的连接所用的类。

默认的SpringJmsProvider是通过spring配置来获取jms连接。

如果有特殊需求,也可以自己实现JmsProvider中的两个接口,用于获取连接ConnectionFactory和队列Destination

storm-kafka

https://github.com/apache/storm/tree/master/external/storm-kafka

从kafka中获取数据。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
String topic = ""; //队列名
String zkRoot = "/kafkastorm"; //kafka在zookeeper中记录offset的根目录,无需改变
String id = ""; // consumer id
String kafkaZk = ""; //kafka的zookeeper地址
BrokerHosts brokerHosts = new ZkHosts(kafkaZk);
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, topic, zkRoot, id);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.ignoreZkOffsets = true; //false时使用kafka中存储的offset信息
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); //读取数据开始位置,只有ignoreZkOffsets设为true时startOffsetTime才能生效
kafkaConfig.zkServers = new ArrayList<String>(){ {
add("1"); //存储kafkaoffset的zookeeper节点
} };
kafkaConfig.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(topic, new KafkaSpout(kafkaConfig), 10); //并行度不能高于kafka分区数
builder.setBolt....

storm-hdfs

HdfsBolt支持向hdfs写入数据

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SyncPolicy syncPolicy = new CountSyncPolicy(100); //每100条hdfs同步一次磁盘
DailyRotationPolicy rotationPolicy = new DailyRotationPolicy(); //文件每天归档

FileFolderByDateNameFormat fileNameFormat = new FileFolderByDateNameFormat()
.withPath("/user/lcy/jdata"); //hdfs路径
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter(","); //不同field之间的分隔符

UmeHdfsBolt hdfsbolt = new UmeHdfsBolt()
.withFsUrl("hdfs://:8020") //hdfs url
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);

示例说明

按照示例编码,输出到hdfs形式如下:

bolt接收到的每条数据为hdfs中的一行;

输出到hdfs路径为 设定的路径/日期/默认文件名;

hdfsbolt的每个线程会输出到一个文件,topology重启会产生新文件;

文件每天归档

其他功能

FileSizeRotationPolicy 支持按固定文件大小归档;TimedRotationPolicy支持按固定时间归档;

如有其他归档方式需求,可以实现FileRotationPolicy接口,参考DailyRotationPolicy源码。

通过实现FileNameFormat接口自定义文件路径及文件名,参考FileFolderByDateNameFormat

FileFolderByDateNameFormat

FileFolderByDateNameFormat实现目前常用的存储路径。
使用示例

1
2
FileFolderByDateNameFormat fileNameFormat = new FileFolderByDateNameFormat()
.withPath("/user/test").withName("");

会自动在路径之后增加日期,文件名中也会增加日期。

源码参考

除了storm-hdfs本身源码以外,可以参考https://github.com/lcy362/StormTrooper 查看FileFolderByDateNameFormat和DailyRotationPolicy的实现。

UI说明

jstorm的UI相对于storm提供了更为丰富的监控项。UI本身是在tomcat中运行的一个war包,进行二次开发也相对容易。

cluster页

Cluster Summary, Cluster Stats, Topology Summary

cluster的整体信息, conf中是nimbus节点的配置。

Topology Summary

当前运行的所有topology列表及概要信息,conf中对应的是topology单独配置的配置项。

Supervisor Summary

所有supervisor列表, 可以查看supervisor的配置、日志等。

topology页

Topology Summary

主要关注Topology Graph 项,这是Component Metrics 页的概要信息,可以较直观的看出运行状况。

其中,节点大小和箭头粗细代表数据量,节点颜色用于区分spout和bolt。箭头颜色对应TupleLifeCycle属性,由绿到红越来越慢。箭头变红色时,可以适当增加下游节点的并行度。

Component Metrics

包含每个spout, bolt的统计信息。

各参数说明:

tasks : 并行度。对应代码中为每个spout/bolt设置的并行度。

emitted, acked, failed : 发射/收到ack/未收到ack的消息数。

sendTps, recvTps: 发送/接收数据tps.

需要注意,以上数据交互类的指标包含了与storm本身组件,topology_master,acker之间的交互,只能用来查看运行状况,不适合统计用

process: 对于bolt来说,是excute()函数的执行时间,对于spout来说,是整条消息(包含下游各步)被完整处理的时间,体现系统运行效率的主要参数。

TupleLifeCycle: 从上游消息被emit,到消息处理完所需时间。和process相比相差太大时,说明在bolt队列中等待了较长时间,可以适当增加并发。

deser, ser: 序列化/反序列化耗时,一般无需关注. storm中每一个emit出来的数据都需要做序列化和反序列化。

error: 鼠标悬浮在E上,可以查看具体的报错信息。主要寻找带异常堆栈的数据, 一些诸如is dead, no response之类的信息无需关注。

运行机制

topology里spout/bolt的整体结构不再细讲,主要说说storm/jstorm topology运行时与传统java程序可能存在的区别。其实区别非常少,主要也体现在初始化上,本文的目的在于帮助开发人员在无需了解storm内核原理的情况下,排查topology程序可能出现的问题。

1个topology会包含多个spout线程和bolt线程,分散运行在数个worker(进程)中。同一个worker中可能同时运行多个bolt/spout的数个线程。

与普通java程序的区别

main方法

main方法只在启动时运行在nimbus中,因此除了storm本身的配置项外,其他程序相关的配置,如spring配置等,配置在main方法中不会起作用

bolt

bolt 的主体结构包含prepare, excute, cleanup 三部分。

其中,prepare在初始化时执行一次,cleanup在退出前执行一次,excute每条消息执行。

一些配置,包括加密,spring加载等,建议都放到prepare中。多个bolt都需要加载spring时,建议使用同样的配置,避免一些诡异问题。

序列化

所有静态代码块中作了初始化的变量,emit的变量,由于都存在网络传输,需要能够被序列化。

storm默认使用kyro序列化,需要类有无参构造函数。如果无法增加无参构造函数,设置topology.fall.back.on.java.serialization: true使用java自带的序列化。

activemq的web console是基于jetty实现,其权限管理也是基于jetty. 根据需求,可以给不同的用户赋予不同的权限。jetty的权限管理还算灵活,虽然配起来比较麻烦,可以分别设定某个角色(role)下的用户是否有对某个页面的访问权限。
下面简要介绍一下配置方法,只需要修改/conf 下的 jetty.xml, jetty-realm.properties
1. jetty-realm.properties
  这里面配置了所有用户的用户名,密码和所属角色,按照如下格式:
username: password [,rolename ...]   
2\. jetty.xml
首先对每个角色配置一个Constraint 类,其中roles及对应 jetty-realm.properties中的rolename
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />

        <property name=”roles” value=”admin” />

        <property name=”authenticate” value=”true” />

    </bean>
然后配置securityConstraintMapping, 
    <bean id="securityConstraintMapping" class="org.eclipse.jetty.security.ConstraintMapping">
        <property name="constraint" ref="securityConstraint" />
        <property name="pathSpec" value="/admin/send.jsp/" />
    </bean>
这表示securityConstraint类对应的角色可以访问/admin/send.jsp 页面。
可以使用/* 代表所有未单独配置的页面
假设我们需要新建一个只读用户,就可以配置两个角色admin和readonly,这两个角色都需要增加/*的ConstraintMapping 条目,然后在admin上额外配置所有涉及写操作的页面,包括/admin/deleteDestination.action/*, /admin/purgeDestination.action/* 等。
最后,在ConstraintSecurityHandler的constraintMappings属性里,把所有的ConstraintMapping都列出来。
<bean id="securityHandler" class="org.eclipse.jetty.security.ConstraintSecurityHandler">
    <property name="constraintMappings">

            <list>

                <ref bean="securityConstraintMapping" />
            </list>
        </property>
 
这样,就实现了activemq web console用户的权限配置。

 

Storm UI对于排查storm使用过程中遇到的问题会很有帮助,但是有些属性的含义不是很明确,虽然都是很简单的概念,如果不知道的话也会很难受。

先说一点,鼠标只到UI上的标题栏时,是可以看到这一属性的具体属性的,几篇google rank很高的文章,其实就是把这个信息整理了下来。

其实大部分属性都是很直白的,看到名字就知道是什么意思,我在这儿之把一些可能造成困扰的属性列一下,方便大家查问题。

 

emitted和transfered: emitted,就是发射出的数据条数,也就是调用OutputCollector的emit方法的次数。transferred则是实际tuple发送到下一个task的数目。乍一看是一样的对吗。其实一般情况下也确实是一样的。但是,比如,一个bolt 发射了数据,但是下游并没有其他bolt取这个数,这个bolt的transfer数就会是0\. 又比如,如果一个bolt A使用all group的方式(每一个bolt都要接收到)向bolt B发射tuple,那么transfered就会是emitted的数倍。
excute latency和process latency : excute latency 很直白,就是代码里excute()这个方法的执行时间, 而process latency则是excute方法执行,直到调用ack方法的时间,可以认为是业务代码执行所需的时间,正常情况下,excute latency是会大于process latency的,但是如果你一直不去ack,process latency会远远大于excute latency。

spout的complete latency: 这个可以参考storm的[ack机制](http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/) ,这个时间就是一个tuple从被发射到这个tuple被ack所需的时间,确切的说,是从spout调用emit方法到调用ack方法的时间差,其实也就是这个由这个tuple生成的tuple树被完全处理所需的时间。

 

最近在使用activemq 的连接池时,发现它存在很严重的内存泄露问题。

通过jmap监控,可以看到java.util.concurrent.locks.ReentrantLock, org.apache.activemq.pool.PooledConnection这两个类占用的空间非常大,而且增长速度也很快。

网上查了一下,正好找到activemq的bug 报告.:https://issues.apache.org/jira/browse/AMQ-3997

这个bug 在5.7中已经修复,可以通过升级版本解决。

同时,也有另一种解决方式,就是使用spring带的连接池替换activemq自带的连接池,配置如下:

  <bean id="jmsConnectionFactory"
                class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://205-amq-broker2?create=false&waitForStart=10000" />
        </bean>

<!--        <bean id="pooledConnectionFactory"
                class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
                <property name="maxConnections" value="8" />
                <property name="connectionFactory" ref="jmsConnectionFactory" />
        </bean>-->
      <bean id="cachedConnectionFactory"
                class="<span style="color:#ff0000;">org.springframework.jms.connection.CachingConnectionFactory</span>">
                        <property name="targetConnectionFactory" ref="jmsConnectionFactory"></property>
                        <property name="sessionCacheSize" value="10"></property>
        </bean>
        <bean id="jmsConfig"
                class="org.apache.camel.component.jms.JmsConfiguration">
                <property name="connectionFactory" ref="cachedConnectionFactory"/>
                <property name="concurrentConsumers" value="10"/>
        </bean>

        <bean id="activemq"
                class="org.apache.activemq.camel.component.ActiveMQComponent">
                <property name="configuration" ref="jmsConfig"/>

 

公司使用activemq和camel做消息的分发,之前数据量不是很大,所以一直没怎么考虑效率问题,对camel的工作原理研究也不深。单是最近随着业务量的增加,camel的效率逐渐成了瓶颈,所以根据日志大概了解了camel的工作原理。虽然camel是被嵌入到activemq中,但在工作过程中,camel和activemq其实还是相对独立的。我们在camel中会配置一个到activemq的连接.

http://camel.apache.org/activemq.html

关于vm这种传输方式,参考http://activemq.apache.org/vm-transport-reference.html

看了下日志,发现这种配置下camel会有一个很严重的问题: camel每次执行转发操作时,都会新建一个到activemq的连接,之后再将其关闭。这严重拖慢了转发效率,因为事实上每次转发都可以使用同一个连接。

因此查了一下camel文档,找到了 http://camel.apache.org/activemq.html 。 里边有关于线程池的配置:

 

<pre name="code" class="html"><bean id="jmsConnectionFactory" 
   class="org.apache.activemq.ActiveMQConnectionFactory">
   <property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="pooledConnectionFactory" 
   class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
   <property name="maxConnections" value="8" />
   <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

<bean id="jmsConfig" 
   class="org.apache.camel.component.jms.JmsConfiguration">
   <property name="connectionFactory" ref="pooledConnectionFactory"/>
   <property name="concurrentConsumers" value="10"/>
</bean>

<bean id="activemq" 
    class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig"/>

    <!-- if we are using transacted then enable CACHE_CONSUMER (if not using XA) to run faster
         see more details at: http://camel.apache.org/jms
    <property name="transacted" value="true"/>
    <property name="cacheLevelName" value="CACHE_CONSUMER" />
    -->
</bean>

这个正好符合我们的需要。而且顺便把连接换成了多线程,可以进一步提升效率。

 

需要注意的是,如果使用的是activemq5.6, 这样做会导致内存泄露,我会在下一篇博客中详述。



 

转自http://leyew.blog.51cto.com/5043877/860255#559183-tsina-1-46862-ed0973a0c870156ed15f06a6573c8bf0

前几天开始学习lda,走了不少弯路,对lda仍然是一头雾水。看了这篇文档以后总算明白lda是干啥的了

 

LDA(Latent Dirichlet Allocation)学习笔记

最近在看LDA算法,经过了几天挣扎,总算大致了解了这个算法的整体框架和流程。

示例

LDA要干的事情简单来说就是为一堆文档进行聚类(所以是非监督学习),一种topic就是一类,要聚成的topic数目是事先指定的。聚类的结果是一个概率,而不是布尔型的100%属于某个类。国外有个博客[1]上有一个清晰的例子,直接引用:

Suppose you have the following set of sentences:

  • I like to eat broccoli and bananas.
  • I ate a banana and spinach smoothie for breakfast.
  • Chinchillas and kittens are cute.
  • My sister adopted a kitten yesterday.
  • Look at this cute hamster munching on a piece of broccoli.

What is latent Dirichlet allocation? It’s a way of automatically discovering topics that these sentences contain. For example, given these sentences and asked for 2 topics, LDA might produce something like

  • Sentences 1 and 2: 100% Topic A
  • Sentences 3 and 4: 100% Topic B
  • Sentence 5: 60% Topic A, 40% Topic B
  • Topic A: 30% broccoli, 15% bananas, 10% breakfast, 10% munching, … (at which point, you could interpret topic A to be about food)
  • Topic B: 20% chinchillas, 20% kittens, 20% cute, 15% hamster, … (at which point, you could interpret topic B to be about cute animals)

上面关于sentence 5的结果,可以看出来是一个明显的概率类型的聚类结果(sentence 1和2正好都是100%的确定性结果)。

再看例子里的结果,除了为每句话得出了一个概率的聚类结果,而且对每个Topic,都有代表性的词以及一个比例。以Topic A为例,就是说所有对应到Topic A的词里面,有30%的词是broccoli。在LDA算法中,会把每一个文档中的每一个词对应到一个Topic,所以能算出上面这个比例。这些词为描述这个Topic起了一个很好的指导意义,我想这就是LDA区别于传统文本聚类的优势吧。

LDA整体流程

先定义一些字母的含义:

  • 文档集合D,topic集合T
  • D中每个文档d看作一个单词序列**< w1,w2,…,wn >**,wi表示第i个单词,设d有n个单词。(LDA里面称之为_word bag_,实际上每个单词的出现位置对LDA算法无影响)
  • D中涉及的所有不同单词组成一个大集合VOCABULARY(简称VOC

LDA以文档集合D作为输入(会有切词,去停用词,取词干等常见的预处理,略去不表),希望训练出的两个结果向量(设聚成k个Topic,VOC中共包含m个词):

  • **对每个D中的文档d,对应到不同topic的概率θd < pt1,…, ptk >**,其中,pti表示d对应T中第i个topic的概率。计算方法是直观的,pti=nti/n,其中nti表示d中对应第i个topic的词的数目,n是d中所有词的总数。
  • **对每个T中的topic t,生成不同单词的概率φt < pw1,…, pwm >**,其中,pwi表示t生成VOC中第i个单词的概率。计算方法同样很直观,pwi=Nwi/N,其中Nwi表示对应到topic t的VOC中第i个单词的数目,N表示所有对应到topic t的单词总数。

LDA的核心公式如下:

*p(w|d) = p(w|t)p(t|d)

直观的看这个公式,就是以Topic作为中间层,可以通过当前的θd和φt给出了文档d中出现单词w的概率。其中p(t|d)利用θd计算得到,p(w|t)利用φt计算得到。

实际上,利用当前的θd和φt,我们可以为一个文档中的一个单词计算它对应任意一个Topic时的p(w|d),然后根据这些结果来更新这个词应该对应的topic。然后,如果这个更新改变了这个单词所对应的Topic,就会反过来影响θd和φt

LDA算法开始时,先随机地给θd和φt赋值(对所有的d和t)。然后上述过程不断重复,最终收敛到的结果就是LDA的输出。

再详细说一下这个迭代的学习过程:

针对一个特定的文档ds中的第i单词wi,如果令该单词对应的topic为tj,可以把上述公式改写为:

pj(wi|ds) = p(wi|tj)*p(tj|ds)

先不管这个值怎么计算(可以先理解成直接从θds和φtj中取对应的项。实际没这么简单,但对理解整个LDA流程没什么影响,后文再说。)。现在我们可以枚举T中的topic,得到所有的pj(wi|ds),其中j取值1~k。然后可以根据这些概率值结果为ds中的第i个单词wi选择一个topic。最简单的想法是取令pj(wi|ds)最大的tj(注意,这个式子里只有j是变量),即

argmax[j]pj(wi|ds)

当然这只是一种方法(好像还不怎么常用),实际上这里怎么选择t在学术界有很多方法,我还没有好好去研究。

然后,如果ds中的第i个单词wi在这里选择了一个与原先不同的topic,就会对θd和φt有影响了(根据前面提到过的这两个向量的计算公式可以很容易知道)。它们的影响又会反过来影响对上面提到的p(w|d)的计算。对D中所有的d中的所有w进行一次p(w|d)的计算并重新选择topic看作一次迭代。这样进行n次循环迭代之后,就会收敛到LDA所需要的结果了。 【在这里突然想到了一个问题,就是对θd和φt这两个向量的更新究竟是在一次迭代对所有的d中的所有w更新之后统一更新(也就是在一次迭代中,θd和φt不变,统一在迭代结束时更新),还是每对一个d中的一个w更新topic之后,就马上对θd和φt进行更新呢?这个看来要去看一下那篇LDA最原始的论文了】

怎样计算p(w|t)和p(t|d)

待续……

 

参考资料

【1】Introduction to Latent Dirichlet Allocation:国外博客,很不错的入门文章

参考http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html

 

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper

      extends Mapper<Object, Text, Text, IntWritable>{  //继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法

      private final static IntWritable one = new IntWritable(1);   //Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

      private Text word = new Text();

 

      public void map(Object key, Text value, Context context)  //Called once for each key/value pair in the input split

        throws IOException, InterruptedException {  //value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量

        StringTokenizer itr = new StringTokenizer(value.toString());    //拆分成单词

        while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);  //输出<word,1>

      }

    }

  }

//系统自动对map结果进行排序等处理,reduce输入例 (asd,1-1-1)

  public static class IntSumReducer

      extends Reducer<Text,IntWritable,Text,IntWritable> {  //Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values,Context context)

           throws IOException, InterruptedException {   //reducer输入为Map过程输出,<key,values>中key为单个单词,而values是对应单词的计数值

        int sum = 0;

        for (IntWritable val : values) {

           sum += val.get();

        }

      result.set(sum);

      context.write(key, result);

    }

  }

 

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println(“Usage: wordcount <in> <out>”);

      System.exit(2);

    }

    Job job = new Job(conf, “word count”);

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class); //setMapperClass:设置Mapper,默认为IdentityMapper

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);//setReducerClass:设置Reducer,默认为IdentityReducer

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//FileInputFormat.addInputPath:设置输入文件的路径,可以是一个文件,一个路径,一个通配符。可以被调用多次添加多个路径

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在

    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

//setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text

setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数

 

setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数

setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式

setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式

setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数

 

setOutputFormat:设置任务的输出格式,默认为TextOutputFormat

 

http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html wordcount运行过程详解

http://www.cnblogs.com/gpcuster/archive/2010/06/04/1751538.html hdfs命令介绍

http://hi.baidu.com/gkf8605/item/d6b8af09c3463512eafe38b1 hdfs命令

http://www.cnblogs.com/forfuture1978/archive/2010/11/14/1877086.html mapreduce入门

 

http://hadoop.apache.org/docs/r0.20.2/api/index.html hadoop0.20.2 api