storm ui 中一些关键属性的含义
Storm UI对于排查storm使用过程中遇到的问题会很有帮助,但是有些属性的含义不是很明确,虽然都是很简单的概念,如果不知道的话也会很难受。
先说一点,鼠标只到UI上的标题栏时,是可以看到这一属性的具体属性的,几篇google rank很高的文章,其实就是把这个信息整理了下来。
其实大部分属性都是很直白的,看到名字就知道是什么意思,我在这儿之把一些可能造成困扰的属性列一下,方便大家查问题。
Storm UI对于排查storm使用过程中遇到的问题会很有帮助,但是有些属性的含义不是很明确,虽然都是很简单的概念,如果不知道的话也会很难受。
先说一点,鼠标只到UI上的标题栏时,是可以看到这一属性的具体属性的,几篇google rank很高的文章,其实就是把这个信息整理了下来。
其实大部分属性都是很直白的,看到名字就知道是什么意思,我在这儿之把一些可能造成困扰的属性列一下,方便大家查问题。
最近在使用activemq 的连接池时,发现它存在很严重的内存泄露问题。
通过jmap监控,可以看到java.util.concurrent.locks.ReentrantLock, org.apache.activemq.pool.PooledConnection这两个类占用的空间非常大,而且增长速度也很快。
网上查了一下,正好找到activemq的bug 报告.:https://issues.apache.org/jira/browse/AMQ-3997
这个bug 在5.7中已经修复,可以通过升级版本解决。
同时,也有另一种解决方式,就是使用spring带的连接池替换activemq自带的连接池,配置如下:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://205-amq-broker2?create=false&waitForStart=10000" /> </bean> <!-- <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean>--> <bean id="cachedConnectionFactory" class="<span style="color:#ff0000;">org.springframework.jms.connection.CachingConnectionFactory</span>"> <property name="targetConnectionFactory" ref="jmsConnectionFactory"></property> <property name="sessionCacheSize" value="10"></property> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="cachedConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/>
公司使用activemq和camel做消息的分发,之前数据量不是很大,所以一直没怎么考虑效率问题,对camel的工作原理研究也不深。单是最近随着业务量的增加,camel的效率逐渐成了瓶颈,所以根据日志大概了解了camel的工作原理。虽然camel是被嵌入到activemq中,但在工作过程中,camel和activemq其实还是相对独立的。我们在camel中会配置一个到activemq的连接.
http://camel.apache.org/activemq.html
关于vm这种传输方式,参考http://activemq.apache.org/vm-transport-reference.html
看了下日志,发现这种配置下camel会有一个很严重的问题: camel每次执行转发操作时,都会新建一个到activemq的连接,之后再将其关闭。这严重拖慢了转发效率,因为事实上每次转发都可以使用同一个连接。
因此查了一下camel文档,找到了 http://camel.apache.org/activemq.html 。 里边有关于线程池的配置:
<pre name="code" class="html"><bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> <!-- if we are using transacted then enable CACHE_CONSUMER (if not using XA) to run faster see more details at: http://camel.apache.org/jms <property name="transacted" value="true"/> <property name="cacheLevelName" value="CACHE_CONSUMER" /> --> </bean>
这个正好符合我们的需要。而且顺便把连接换成了多线程,可以进一步提升效率。
需要注意的是,如果使用的是activemq5.6, 这样做会导致内存泄露,我会在下一篇博客中详述。
转自http://leyew.blog.51cto.com/5043877/860255#559183-tsina-1-46862-ed0973a0c870156ed15f06a6573c8bf0
前几天开始学习lda,走了不少弯路,对lda仍然是一头雾水。看了这篇文档以后总算明白lda是干啥的了
最近在看LDA算法,经过了几天挣扎,总算大致了解了这个算法的整体框架和流程。
LDA要干的事情简单来说就是为一堆文档进行聚类(所以是非监督学习),一种topic就是一类,要聚成的topic数目是事先指定的。聚类的结果是一个概率,而不是布尔型的100%属于某个类。国外有个博客[1]上有一个清晰的例子,直接引用:
Suppose you have the following set of sentences:
- I like to eat broccoli and bananas.
- I ate a banana and spinach smoothie for breakfast.
- Chinchillas and kittens are cute.
- My sister adopted a kitten yesterday.
- Look at this cute hamster munching on a piece of broccoli.
What is latent Dirichlet allocation? It’s a way of automatically discovering topics that these sentences contain. For example, given these sentences and asked for 2 topics, LDA might produce something like
- Sentences 1 and 2: 100% Topic A
- Sentences 3 and 4: 100% Topic B
- Sentence 5: 60% Topic A, 40% Topic B
- Topic A: 30% broccoli, 15% bananas, 10% breakfast, 10% munching, … (at which point, you could interpret topic A to be about food)
- Topic B: 20% chinchillas, 20% kittens, 20% cute, 15% hamster, … (at which point, you could interpret topic B to be about cute animals)
上面关于sentence 5的结果,可以看出来是一个明显的概率类型的聚类结果(sentence 1和2正好都是100%的确定性结果)。
再看例子里的结果,除了为每句话得出了一个概率的聚类结果,而且对每个Topic,都有代表性的词以及一个比例。以Topic A为例,就是说所有对应到Topic A的词里面,有30%的词是broccoli。在LDA算法中,会把每一个文档中的每一个词对应到一个Topic,所以能算出上面这个比例。这些词为描述这个Topic起了一个很好的指导意义,我想这就是LDA区别于传统文本聚类的优势吧。
先定义一些字母的含义:
LDA以文档集合D作为输入(会有切词,去停用词,取词干等常见的预处理,略去不表),希望训练出的两个结果向量(设聚成k个Topic,VOC中共包含m个词):
LDA的核心公式如下:
*p(w|d) = p(w|t)p(t|d)
直观的看这个公式,就是以Topic作为中间层,可以通过当前的θd和φt给出了文档d中出现单词w的概率。其中p(t|d)利用θd计算得到,p(w|t)利用φt计算得到。
实际上,利用当前的θd和φt,我们可以为一个文档中的一个单词计算它对应任意一个Topic时的p(w|d),然后根据这些结果来更新这个词应该对应的topic。然后,如果这个更新改变了这个单词所对应的Topic,就会反过来影响θd和φt。
LDA算法开始时,先随机地给θd和φt赋值(对所有的d和t)。然后上述过程不断重复,最终收敛到的结果就是LDA的输出。
再详细说一下这个迭代的学习过程:
针对一个特定的文档ds中的第i单词wi,如果令该单词对应的topic为tj,可以把上述公式改写为:
pj(wi|ds) = p(wi|tj)*p(tj|ds)
先不管这个值怎么计算(可以先理解成直接从θds和φtj中取对应的项。实际没这么简单,但对理解整个LDA流程没什么影响,后文再说。)。现在我们可以枚举T中的topic,得到所有的pj(wi|ds),其中j取值1~k。然后可以根据这些概率值结果为ds中的第i个单词wi选择一个topic。最简单的想法是取令pj(wi|ds)最大的tj(注意,这个式子里只有j是变量),即
argmax[j]pj(wi|ds)
当然这只是一种方法(好像还不怎么常用),实际上这里怎么选择t在学术界有很多方法,我还没有好好去研究。
然后,如果ds中的第i个单词wi在这里选择了一个与原先不同的topic,就会对θd和φt有影响了(根据前面提到过的这两个向量的计算公式可以很容易知道)。它们的影响又会反过来影响对上面提到的p(w|d)的计算。对D中所有的d中的所有w进行一次p(w|d)的计算并重新选择topic看作一次迭代。这样进行n次循环迭代之后,就会收敛到LDA所需要的结果了。 【在这里突然想到了一个问题,就是对θd和φt这两个向量的更新究竟是在一次迭代对所有的d中的所有w更新之后统一更新(也就是在一次迭代中,θd和φt不变,统一在迭代结束时更新),还是每对一个d中的一个w更新topic之后,就马上对θd和φt进行更新呢?这个看来要去看一下那篇LDA最原始的论文了】
待续……
【1】Introduction to Latent Dirichlet Allocation:国外博客,很不错的入门文章
参考http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ //继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法
private final static IntWritable one = new IntWritable(1); //Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
private Text word = new Text();
public void map(Object key, Text value, Context context) //Called once for each key/value pair in the input split
throws IOException, InterruptedException { //value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量
StringTokenizer itr = new StringTokenizer(value.toString()); //拆分成单词
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one); //输出<word,1>
}
}
}
//系统自动对map结果进行排序等处理,reduce输入例 (asd,1-1-1)
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> { //Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException { //reducer输入为Map过程输出,<key,values>中key为单个单词,而values是对应单词的计数值
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println(“Usage: wordcount <in> <out>”);
System.exit(2);
}
Job job = new Job(conf, “word count”);
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class); //setMapperClass:设置Mapper,默认为IdentityMapper
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);//setReducerClass:设置Reducer,默认为IdentityReducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//FileInputFormat.addInputPath:设置输入文件的路径,可以是一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
//setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html wordcount运行过程详解
http://www.cnblogs.com/gpcuster/archive/2010/06/04/1751538.html hdfs命令介绍
http://hi.baidu.com/gkf8605/item/d6b8af09c3463512eafe38b1 hdfs命令
http://www.cnblogs.com/forfuture1978/archive/2010/11/14/1877086.html mapreduce入门
http://hadoop.apache.org/docs/r0.20.2/api/index.html hadoop0.20.2 api