YARN源码分析之AsyncDispatcher事件调度器

YARN采用了事件驱动机制,其核心服务实际上都是一个异步调度器,包括Resourcemanager、Nodemanager、MRAppMaster等。本篇以MRAppMaster为例,其内部包含一个异步调度器AsyncDispatcher,AsyncDispatcher在yarn中的主要作用是对发生的一系列事件找到各个事件对应的handle进行处理,从其功能上可以看出其内部应该有一个队列,队列主要用来存放等待调度的事件,还应该有一个事件与handle的映射表,用来处理各个事件。

通过查看代码首先可以发现AsyncDispatcher是一个服务,继承了AbstractService,其次是通过阻塞队列存放事件,然后单独起一个线程从阻塞队列中消费事件,通过事先定义好的事件和处理器的映射表找到各自的处理器进行处理。

下面看下代码内容,首先看下属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 阻塞队列,用于存放发生的事件
private final BlockingQueue<Event> eventQueue;
// AsyncDispatcher event handler线程是否停止的标识
private volatile boolean stopped = false;
// Configuration flag for enabling/disabling draining dispatcher's events on
// stop functionality.
// 当停止AsyncDispatcher服务时,是否等待eventQueue中的事件被处理完
private volatile boolean drainEventsOnStop = false;
// Indicates all the remaining dispatcher's events on stop have been drained
// and processed.
// 在停止AsyncDispatcher服务时,标识所有剩余的事件被处理完
private volatile boolean drained = true;
// 对象锁
private Object waitForDrained = new Object();
// For drainEventsOnStop enabled only, block newly coming events into the
// queue while stopping.
// 在停止AsyncDispatcher服务时,
// 如果drainEventsOnStop为true,则阻塞新的事件进入queue
private volatile boolean blockNewEvents = false;
private EventHandler handlerInstance = null;
// 消费queue中事件的线程
private Thread eventHandlingThread;
// 存放事件和事件处理器的映射
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
// 当调度器发生异常时,rm是否退出
private boolean exitOnDispatchException;

类在使用之前需要进行实例化,一般都是通过构造函数进行实例化,下面就看下该类的构造函数:

1
2
3
4
5
6
7
8
9
public AsyncDispatcher() {
this(new LinkedBlockingQueue<Event>());
}
public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
super("Dispatcher");
this.eventQueue = eventQueue;
this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
}

该类有两个构造函数,一个是默认的构造函数,一个是带一个BlockingQueue参数的构造函数,而默认的构造函数在其内部又调用了带BlockingQueue参数的构造函数。

构造函数中对其关键属性eventQueueeventDispatchers进行了赋值,其中如果eventQueue不指定的话就实例化一个LinkedBlockingQueue对象。

AsyncDispatcher是在Resourcemanager中被当做一个服务而启动的,看下代码:

1
2
3
4
5
6
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
rmDispatcher = setupDispatcher();
addIfService(rmDispatcher);

将AsyncDispatcher实例赋值给rmDispatcher,然后将rmDispatcher作为一个服务启动。服务启动时都会先进行init然后start,init和start代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void serviceInit(Configuration conf) throws Exception {
this.exitOnDispatchException =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
protected void serviceStart() throws Exception {
//start all the components
super.serviceStart();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName("AsyncDispatcher event handler");
eventHandlingThread.start();
}

服务启动时,在serviceStart方法中起一个AsyncDispatcher event handler线程,从eventQueue中取出event进行调度。
看下eventHandlingThread线程的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
// 标识eventQueue是否为空
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
// 从eventQueue中移除第一个event
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
// event不为null则调度事件,让事件处理器处理
if (event != null) {
dispatch(event);
}
}
}
};
}

在run中,首先判断该服务是否处于正在stop过程中,如果正在stop并且drainEventsOnStop为true,则进入if (blockNewEvents)语句中(blockNewEvents在serviceStop中,当drainEventsOnStop为true时,blockNewEvents被赋值为true),通过drained判断eventQueue中是否还有剩余的event,如果没有剩余event则通知在serviceStop中阻塞的线程继续进行stop操作。如果有剩余则和正常逻辑(正常逻辑指在没有进行stop操作时的逻辑)一样,从eventQueue中取出一个event,通过dispatch进行调度。dispatch代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void dispatch(Event event) {
//all events go thru this loop
...
// 得到event的class名
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
// 从map集合eventDispatchers中得到该事件注册的处理器
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
// 事件处理器的handle方法进行处理event
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}

这里涉及到一个属性eventDispatchers,这个map中存放的是事件类型和对应的事件处理器,通过key事件类型得到事件处理器之后,用该事件处理器的handle方法进行处理。

这里只是从eventDispatchers中取出key对应的value,那么eventDispatchers中的key和value是怎么put进去的呢?在ResourceManager.java中不难发现eventDispatchers中的值是通过register方法注册进去的,下面看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
// 检查是否已经注册了该事件类型的处理器
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
// 如果没有则直接put
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
// 如果已经注册了该事件类型的处理器,
// 并且registeredHandler不是MultiListenerHandler类型
// 则将已存在的registeredHandler和新来的handler都add到MultiListenerHandler中
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
// 如果registeredHandler已经是MultiListenerHandler类型
// 则直接加入MultiListenerHandler中
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}

从register方法中可以得出可以对同一个事件类型注册多个事件处理器,多个事件处理器用MultiListenerHandler存储。

对某一个事件注册事件处理器时,先判断eventDispatchers是否存在该事件的处理器,没有则直接注册,如果存在并且不是MultiListenerHandler类型,则构建一个MultiListenerHandler的实例,将已经存在的处理器和新添加的处理器都添加到MultiListenerHandler中,如果存在并且是MultiListenerHandler类型,则直接向其追加处理器handler。下面看下MultiListenerHandler是个什么鬼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static class MultiListenerHandler implements EventHandler<Event> {
List<EventHandler<Event>> listofHandlers;
public MultiListenerHandler() {
listofHandlers = new ArrayList<EventHandler<Event>>();
}
@Override
public void handle(Event event) {
for (EventHandler<Event> handler: listofHandlers) {
handler.handle(event);
}
}
void addHandler(EventHandler<Event> handler) {
listofHandlers.add(handler);
}
}

MultiListenerHandler是AsyncDispatcher的一个内部静态类,有一个list属性listofHandlers来存放多个事件处理器。该类也实现了EventHandler接口,通过调用handle方法,将事件依次通过列表中的事件处理器handler来处理。

细心的朋友可能已经在想eventQueue中的event是在哪put进去的呢?

在AsyncDispatcher中查看eventQueue的put方法都在哪里调用了。其实在AsyncDispatcher中只有一处调用了eventQueue.put方法,那就是在GenericEventHandler类的handle中。GenericEventHandler是AsyncDispatcher的一个内部类,在AsyncDispatcher.getEventHandler()方法中实例化,代码如下:

1
2
3
4
5
6
public EventHandler getEventHandler() {
if (handlerInstance == null) {
handlerInstance = new GenericEventHandler();
}
return handlerInstance;
}

可以看出一个调度器里实例化一个GenericEventHandler对象,下次获取就可以直接返回handlerInstance了。

向eventQueue中put数据时,先通过AsyncDispatcher.getEventHandler()方法,得到GenericEventHandler实例handlerInstance,然后调用GenericEventHandler.handle(Event)方法put进去的,下面看下GenericEventHandler.handle方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void handle(Event event) {
// 当drainEventsOnStop为true并且当前调度器正在stop时,
// 禁止新添加event,直接return
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
// eventQueue size不为0且是1000的整数倍则打印一条log
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of event-queue is " + qSize);
}
// 得到剩余的空间
int remCapacity = eventQueue.remainingCapacity();
// 剩余的空间小于1000时打印一个warn log
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
// put进eventQueue中
eventQueue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
throw new YarnRuntimeException(e);
}
}

向eventQueue中put数据时,先统计下当前eventQueue是的使用情况,如果剩余空间小于1000,则打印一条warn日志。eventQueue是LinkedBlockingQueue类型的对象,调用LinkedBlockingQueue默认构造函数在实例化时会将Integer.MAX_VALUE作为空间大小。

至此AsyncDispatcher类从start、put、take都已介绍完,最后介绍下AsyncDispatcher的stop。看下serviceStop方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected void serviceStop() throws Exception {
// 这里首先校验drainEventsOnStop,默认为false
// 如果为true则等待eventQueue中的event处理完之后再stop调度器
if (drainEventsOnStop) {
// blockNewEvents在此设为true,在eventHandlingThread线程中检查是否已处理完
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
// 等待eventHandlingThread线程的notify
synchronized (waitForDrained) {
while (!drained && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain.");
}
}
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
// stop all the components
super.serviceStop();
}

调度器stop时,主要是校验下drainEventsOnStop的值,为false则直接stop掉调度器,为true则阻塞新eventput进eventQueue,等待eventQueue中的event被处理完再stop。

事件驱动设计思想的引入,使得YARN具有低耦合,高内聚的特点,各个模块只需完成各自的功能,而模块之间则采用事件联系起来,系统设计简单且维护方便。

您的肯定,是我装逼的最大的动力!