jstorm源码解析之循环任务AsyncLoopThread

概述

AsyncLoopThread是jstorm里自定义的一个循环执行任务的工具,实现不复杂,本来是不值当的专门开一篇文章介绍。不过这个在jstorm里应用实在太广泛了,诸如uspervisor/nimbus心跳,获取新topology,更新worker状态等大量功能都是利用AsyncLoopThread实现的,所以还是介绍一下吧,也方便后续看其他部分代码。

使用

AsyncLoopThread其实完全可以拿出来用在我们自己的工程里,使用比较方便,可以参考 https://github.com/lcy362/Scenes/blob/4d8ec4ff166060cf5d491c33a96f5c86e8389333/src/main/java/com/mallow/jstormcode/AsyncLoop.java

只需要先定义一个RunnableCallback类,RunnableCallback是jstorm里封装的一个线程类,对Runable接口做了一些增强,可以回调,可以主动关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestThread extends RunnableCallback {
@Override
public void run() {
System.out.println("thread runs " + new Date());
}


@Override
public Object getResult() {
return 10;
}

}

这里我们只实现了两个方法,run和getResult, 其中,run方法就是线程执行方法,和一般的线程一样的,getResult返回的是任务执行间隔时间,单位是秒。

之后我们只要把这个TestThread作为一个参数传给AsyncLoopThread,就可以实现循环执行TestThread的run方法了。

1
2
3
4
5
6
7
8
9
10
public static void main(String args[]) {
AsyncLoopThread loop = new AsyncLoopThread(new TestThread());

try {
TimeUnit.MINUTES.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

设计

接下来,我们在具体看一下AsyncLoopThread的实现。

核心代码就是这个init方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void init(RunnableCallback afn, boolean daemon, RunnableCallback kill_fn, int priority, boolean start) {
if (kill_fn == null) {
kill_fn = new AsyncLoopDefaultKill();
}

Runnable runnable = new AsyncLoopRunnable(afn, kill_fn);
thread = new Thread(runnable);
String threadName = afn.getThreadName();
if (threadName == null) {
threadName = afn.getClass().getSimpleName();
}
thread.setName(threadName);
thread.setDaemon(daemon);
thread.setPriority(priority);
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("UncaughtException", e);
JStormUtils.halt_process(1, "UncaughtException");
}
});

this.afn = afn;

if (start) {
thread.start();
}
}

这里主要就是新建了一个AsyncLoopRunnable类,更核心的代码在这里。此外,要注意的一个是kill_fn,在AsyncLoopRunnable里会用到,负责杀掉任务,另外就是会对线程做几个配置,优先级,异常处理之类的,此外这里的线程都被设置成了守护线程,所以上边的例子里,我们需要主动让main线程sleep才能看到运行效果。

接下来再看一下AsyncLoopRunnable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void run() {
if (fn == null) {
LOG.error("fn==null");
throw new RuntimeException("AsyncLoopRunnable no core function ");
}

fn.preRun();

try {
while (!shutdown.get()) {
fn.run();

if (shutdown.get()) {
shutdown();
return;
}

Exception e = fn.error();
if (e != null) {
throw e;
}
Object rtn = fn.getResult();
if (this.needQuit(rtn)) {
shutdown();
return;
}
}
} catch (Throwable e) {
if (shutdown.get()) {
shutdown();
} else {
LOG.error("Async loop died!!!" + e.getMessage(), e);
killFn.execute(e);
}
}
}

这个代码也很简单,就是循环的去执行RunnableCallback的run方法,期间会有shutdown,needQuit(),异常几种情况导致任务中断,或者直接杀掉进程。其中,needQuit()方法里会根据前边说的getResult控制任务执行速度。

原文地址:https://lcy362.github.io/posts/31761