Mobility

聚沙成塔

Storm UI对于排查storm使用过程中遇到的问题会很有帮助,但是有些属性的含义不是很明确,虽然都是很简单的概念,如果不知道的话也会很难受。

先说一点,鼠标只到UI上的标题栏时,是可以看到这一属性的具体属性的,几篇google rank很高的文章,其实就是把这个信息整理了下来。

其实大部分属性都是很直白的,看到名字就知道是什么意思,我在这儿之把一些可能造成困扰的属性列一下,方便大家查问题。

 

emitted和transfered: emitted,就是发射出的数据条数,也就是调用OutputCollector的emit方法的次数。transferred则是实际tuple发送到下一个task的数目。乍一看是一样的对吗。其实一般情况下也确实是一样的。但是,比如,一个bolt 发射了数据,但是下游并没有其他bolt取这个数,这个bolt的transfer数就会是0\. 又比如,如果一个bolt A使用all group的方式(每一个bolt都要接收到)向bolt B发射tuple,那么transfered就会是emitted的数倍。
excute latency和process latency : excute latency 很直白,就是代码里excute()这个方法的执行时间, 而process latency则是excute方法执行,直到调用ack方法的时间,可以认为是业务代码执行所需的时间,正常情况下,excute latency是会大于process latency的,但是如果你一直不去ack,process latency会远远大于excute latency。

spout的complete latency: 这个可以参考storm的[ack机制](http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/) ,这个时间就是一个tuple从被发射到这个tuple被ack所需的时间,确切的说,是从spout调用emit方法到调用ack方法的时间差,其实也就是这个由这个tuple生成的tuple树被完全处理所需的时间。

 

最近在使用activemq 的连接池时,发现它存在很严重的内存泄露问题。

通过jmap监控,可以看到java.util.concurrent.locks.ReentrantLock, org.apache.activemq.pool.PooledConnection这两个类占用的空间非常大,而且增长速度也很快。

网上查了一下,正好找到activemq的bug 报告.:https://issues.apache.org/jira/browse/AMQ-3997

这个bug 在5.7中已经修复,可以通过升级版本解决。

同时,也有另一种解决方式,就是使用spring带的连接池替换activemq自带的连接池,配置如下:

  <bean id="jmsConnectionFactory"
                class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="vm://205-amq-broker2?create=false&waitForStart=10000" />
        </bean>

<!--        <bean id="pooledConnectionFactory"
                class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
                <property name="maxConnections" value="8" />
                <property name="connectionFactory" ref="jmsConnectionFactory" />
        </bean>-->
      <bean id="cachedConnectionFactory"
                class="<span style="color:#ff0000;">org.springframework.jms.connection.CachingConnectionFactory</span>">
                        <property name="targetConnectionFactory" ref="jmsConnectionFactory"></property>
                        <property name="sessionCacheSize" value="10"></property>
        </bean>
        <bean id="jmsConfig"
                class="org.apache.camel.component.jms.JmsConfiguration">
                <property name="connectionFactory" ref="cachedConnectionFactory"/>
                <property name="concurrentConsumers" value="10"/>
        </bean>

        <bean id="activemq"
                class="org.apache.activemq.camel.component.ActiveMQComponent">
                <property name="configuration" ref="jmsConfig"/>

 

公司使用activemq和camel做消息的分发,之前数据量不是很大,所以一直没怎么考虑效率问题,对camel的工作原理研究也不深。单是最近随着业务量的增加,camel的效率逐渐成了瓶颈,所以根据日志大概了解了camel的工作原理。虽然camel是被嵌入到activemq中,但在工作过程中,camel和activemq其实还是相对独立的。我们在camel中会配置一个到activemq的连接.

http://camel.apache.org/activemq.html

关于vm这种传输方式,参考http://activemq.apache.org/vm-transport-reference.html

看了下日志,发现这种配置下camel会有一个很严重的问题: camel每次执行转发操作时,都会新建一个到activemq的连接,之后再将其关闭。这严重拖慢了转发效率,因为事实上每次转发都可以使用同一个连接。

因此查了一下camel文档,找到了 http://camel.apache.org/activemq.html 。 里边有关于线程池的配置:

 

<pre name="code" class="html"><bean id="jmsConnectionFactory" 
   class="org.apache.activemq.ActiveMQConnectionFactory">
   <property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="pooledConnectionFactory" 
   class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
   <property name="maxConnections" value="8" />
   <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

<bean id="jmsConfig" 
   class="org.apache.camel.component.jms.JmsConfiguration">
   <property name="connectionFactory" ref="pooledConnectionFactory"/>
   <property name="concurrentConsumers" value="10"/>
</bean>

<bean id="activemq" 
    class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig"/>

    <!-- if we are using transacted then enable CACHE_CONSUMER (if not using XA) to run faster
         see more details at: http://camel.apache.org/jms
    <property name="transacted" value="true"/>
    <property name="cacheLevelName" value="CACHE_CONSUMER" />
    -->
</bean>

这个正好符合我们的需要。而且顺便把连接换成了多线程,可以进一步提升效率。

 

需要注意的是,如果使用的是activemq5.6, 这样做会导致内存泄露,我会在下一篇博客中详述。



 

转自http://leyew.blog.51cto.com/5043877/860255#559183-tsina-1-46862-ed0973a0c870156ed15f06a6573c8bf0

前几天开始学习lda,走了不少弯路,对lda仍然是一头雾水。看了这篇文档以后总算明白lda是干啥的了

 

LDA(Latent Dirichlet Allocation)学习笔记

最近在看LDA算法,经过了几天挣扎,总算大致了解了这个算法的整体框架和流程。

示例

LDA要干的事情简单来说就是为一堆文档进行聚类(所以是非监督学习),一种topic就是一类,要聚成的topic数目是事先指定的。聚类的结果是一个概率,而不是布尔型的100%属于某个类。国外有个博客[1]上有一个清晰的例子,直接引用:

Suppose you have the following set of sentences:

  • I like to eat broccoli and bananas.
  • I ate a banana and spinach smoothie for breakfast.
  • Chinchillas and kittens are cute.
  • My sister adopted a kitten yesterday.
  • Look at this cute hamster munching on a piece of broccoli.

What is latent Dirichlet allocation? It’s a way of automatically discovering topics that these sentences contain. For example, given these sentences and asked for 2 topics, LDA might produce something like

  • Sentences 1 and 2: 100% Topic A
  • Sentences 3 and 4: 100% Topic B
  • Sentence 5: 60% Topic A, 40% Topic B
  • Topic A: 30% broccoli, 15% bananas, 10% breakfast, 10% munching, … (at which point, you could interpret topic A to be about food)
  • Topic B: 20% chinchillas, 20% kittens, 20% cute, 15% hamster, … (at which point, you could interpret topic B to be about cute animals)

上面关于sentence 5的结果,可以看出来是一个明显的概率类型的聚类结果(sentence 1和2正好都是100%的确定性结果)。

再看例子里的结果,除了为每句话得出了一个概率的聚类结果,而且对每个Topic,都有代表性的词以及一个比例。以Topic A为例,就是说所有对应到Topic A的词里面,有30%的词是broccoli。在LDA算法中,会把每一个文档中的每一个词对应到一个Topic,所以能算出上面这个比例。这些词为描述这个Topic起了一个很好的指导意义,我想这就是LDA区别于传统文本聚类的优势吧。

LDA整体流程

先定义一些字母的含义:

  • 文档集合D,topic集合T
  • D中每个文档d看作一个单词序列**< w1,w2,…,wn >**,wi表示第i个单词,设d有n个单词。(LDA里面称之为_word bag_,实际上每个单词的出现位置对LDA算法无影响)
  • D中涉及的所有不同单词组成一个大集合VOCABULARY(简称VOC

LDA以文档集合D作为输入(会有切词,去停用词,取词干等常见的预处理,略去不表),希望训练出的两个结果向量(设聚成k个Topic,VOC中共包含m个词):

  • **对每个D中的文档d,对应到不同topic的概率θd < pt1,…, ptk >**,其中,pti表示d对应T中第i个topic的概率。计算方法是直观的,pti=nti/n,其中nti表示d中对应第i个topic的词的数目,n是d中所有词的总数。
  • **对每个T中的topic t,生成不同单词的概率φt < pw1,…, pwm >**,其中,pwi表示t生成VOC中第i个单词的概率。计算方法同样很直观,pwi=Nwi/N,其中Nwi表示对应到topic t的VOC中第i个单词的数目,N表示所有对应到topic t的单词总数。

LDA的核心公式如下:

*p(w|d) = p(w|t)p(t|d)

直观的看这个公式,就是以Topic作为中间层,可以通过当前的θd和φt给出了文档d中出现单词w的概率。其中p(t|d)利用θd计算得到,p(w|t)利用φt计算得到。

实际上,利用当前的θd和φt,我们可以为一个文档中的一个单词计算它对应任意一个Topic时的p(w|d),然后根据这些结果来更新这个词应该对应的topic。然后,如果这个更新改变了这个单词所对应的Topic,就会反过来影响θd和φt

LDA算法开始时,先随机地给θd和φt赋值(对所有的d和t)。然后上述过程不断重复,最终收敛到的结果就是LDA的输出。

再详细说一下这个迭代的学习过程:

针对一个特定的文档ds中的第i单词wi,如果令该单词对应的topic为tj,可以把上述公式改写为:

pj(wi|ds) = p(wi|tj)*p(tj|ds)

先不管这个值怎么计算(可以先理解成直接从θds和φtj中取对应的项。实际没这么简单,但对理解整个LDA流程没什么影响,后文再说。)。现在我们可以枚举T中的topic,得到所有的pj(wi|ds),其中j取值1~k。然后可以根据这些概率值结果为ds中的第i个单词wi选择一个topic。最简单的想法是取令pj(wi|ds)最大的tj(注意,这个式子里只有j是变量),即

argmax[j]pj(wi|ds)

当然这只是一种方法(好像还不怎么常用),实际上这里怎么选择t在学术界有很多方法,我还没有好好去研究。

然后,如果ds中的第i个单词wi在这里选择了一个与原先不同的topic,就会对θd和φt有影响了(根据前面提到过的这两个向量的计算公式可以很容易知道)。它们的影响又会反过来影响对上面提到的p(w|d)的计算。对D中所有的d中的所有w进行一次p(w|d)的计算并重新选择topic看作一次迭代。这样进行n次循环迭代之后,就会收敛到LDA所需要的结果了。 【在这里突然想到了一个问题,就是对θd和φt这两个向量的更新究竟是在一次迭代对所有的d中的所有w更新之后统一更新(也就是在一次迭代中,θd和φt不变,统一在迭代结束时更新),还是每对一个d中的一个w更新topic之后,就马上对θd和φt进行更新呢?这个看来要去看一下那篇LDA最原始的论文了】

怎样计算p(w|t)和p(t|d)

待续……

 

参考资料

【1】Introduction to Latent Dirichlet Allocation:国外博客,很不错的入门文章

参考http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html

 

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper

      extends Mapper<Object, Text, Text, IntWritable>{  //继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法

      private final static IntWritable one = new IntWritable(1);   //Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

      private Text word = new Text();

 

      public void map(Object key, Text value, Context context)  //Called once for each key/value pair in the input split

        throws IOException, InterruptedException {  //value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量

        StringTokenizer itr = new StringTokenizer(value.toString());    //拆分成单词

        while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);  //输出<word,1>

      }

    }

  }

//系统自动对map结果进行排序等处理,reduce输入例 (asd,1-1-1)

  public static class IntSumReducer

      extends Reducer<Text,IntWritable,Text,IntWritable> {  //Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values,Context context)

           throws IOException, InterruptedException {   //reducer输入为Map过程输出,<key,values>中key为单个单词,而values是对应单词的计数值

        int sum = 0;

        for (IntWritable val : values) {

           sum += val.get();

        }

      result.set(sum);

      context.write(key, result);

    }

  }

 

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println(“Usage: wordcount <in> <out>”);

      System.exit(2);

    }

    Job job = new Job(conf, “word count”);

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class); //setMapperClass:设置Mapper,默认为IdentityMapper

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);//setReducerClass:设置Reducer,默认为IdentityReducer

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//FileInputFormat.addInputPath:设置输入文件的路径,可以是一个文件,一个路径,一个通配符。可以被调用多次添加多个路径

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在

    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

//setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text

setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数

 

setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数

setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式

setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式

setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数

 

setOutputFormat:设置任务的输出格式,默认为TextOutputFormat

 

0%