YARN源码分析之StateMachineFactory状态机

状态机由一组状态组成,这些状态大体分为三类,分别为初始状态、中间状态和最终状态。状态机首先由初始状态A开始运行,经过一系列的中间状态后到达最终状态,并在最终状态退出,从而形成一个有向无环图。其状态处理的逻辑是收到一个事件,触发状态A到状态B的转换,而转换操作是由事件对应的hook完成的

YARN中引入了事件机制和状态机机制,事件机制可以在这篇文章中了解下事件机制的调度器,本篇主要从源码的角度解析下状态机机制中的关键类StateMachineFactory,以ApplicationMaster的状态转换为例。

RMAppImpl中的StateMachineFactory

ApplicationMaster的实现在RMAPPImpl类中,其状态转换在RMAppImpl中声明,首先声明了一个静态final类型的属性stateMachineFactory,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
...// 若干状态转换的定义
.installTopology();

StateMachineFactory从名字可以看出其是一个工厂,负责产生状态机,则RMAPPImpl中必有一个属性来接收该工厂产生的状态机类,代码如下:

1
2
3
private final StateMachine<RMAppState, RMAppEventType, RMAppEvent> stateMachine;
// 从stateMachineFactory中得到一个stateMachine对象
this.stateMachine = stateMachineFactory.make(this);

下面从状态机的调用流程中解析下StateMachineFactory中的方法。

首先看下StateMachineFactory类的声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* State machine topology.
* This object is semantically immutable. If you have a
* StateMachineFactory there's no operation in the API that changes
* its semantic properties.
*
* @param <OPERAND> The object type on which this state machine operates.
* @param <STATE> The state of the entity.
* @param <EVENTTYPE> The external eventType to be handled.
* @param <EVENT> The event object.
*
*/
final public class StateMachineFactory
<OPERAND, STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {...}

从类的注释中可以得知StateMachineFactory是一组状态的拓扑图,OPERAND是这组状态机的操作者,STATE是这组状态机中的状态,EVENTTYPE是触发状态转移的事件类型,EVENT是触发状态转移的事件。

StateMachineFactory的状态拓扑图是通过多种addTransition让用户添加各种状态转移,最后通过installTopology完成一个状态机拓扑的搭建,其中初始状态是通过StateMachineFactory的构造函数指定的。下面看下构造函数:

1
2
3
4
5
6
7
8
public StateMachineFactory(STATE defaultInitialState) {
// 构成一个transition链
this.transitionsListNode = null;
this.defaultInitialState = defaultInitialState;
this.optimized = false;
// 主要用于存放preState、eventType和transition的映射关系
this.stateMachineTable = null;
}

RMAPPImpl中的初始状态是RMAppState.NEW,由其初始状态初始化一个StateMachineFactory实例,然后通过addTransition注册各种状态转移。

addTransition

StateMachineFactory中有五个addTransition方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, STATE postState, EVENTTYPE eventType) {
return addTransition(preState, postState, eventType, null);
}
public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(
STATE preState, STATE postState, Set<EVENTTYPE> eventTypes) {
return addTransition(preState, postState, eventTypes, null);
}
public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(
STATE preState, STATE postState, Set<EVENTTYPE> eventTypes,
SingleArcTransition<OPERAND, EVENT> hook) {
StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> factory = null;
for (EVENTTYPE event : eventTypes) {
if (factory == null) {
factory = addTransition(preState, postState, event, hook);
} else {
factory = factory.addTransition(preState, postState, event, hook);
}
}
return factory;
}
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, STATE postState,
EVENTTYPE eventType,
SingleArcTransition<OPERAND, EVENT> hook){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
(this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
(preState, eventType, new SingleInternalArc(postState, hook)));
}
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, Set<STATE> postStates,
EVENTTYPE eventType,
MultipleArcTransition<OPERAND, EVENT, STATE> hook){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
(this,
new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
(preState, eventType, new MultipleInternalArc(postStates, hook)));
}

由其上的addTransition方法可以看出定于了三种状态转换方式,分别是

  1. preState通过某个事件转换为postState,也就是状态机在preState时,接收到Event事件后,执行对应的hook,并在执行完成后将当前的状态转换为postState。
    addTransition(STATE preState, STATE postState, EVENTTYPE eventType, SingleArcTransition<OPERAND, EVENT> hook)
  2. preState通过多个事件转换为postState,也就是状态机在preState时,接收到某些Event事件后,执行对应的hook,并在执行完成后将当前的状态转换为postState。
    addTransition(STATE preState, STATE postState, Set<EVENTTYPE> eventTypes, SingleArcTransition<OPERAND, EVENT> hook)
  3. preState通过某个事件转换为多个postState,也就是状态机在preState时,接收到Event事件后,执行对应的hook,并在执行完成后将返回hook的返回值所表示的状态。
    addTransition(STATE preState, Set<STATE> postStates, EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook)

下面先看下1中的addTransition

1
2
3
4
5
6
7
8
9
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, STATE postState,
EVENTTYPE eventType,
SingleArcTransition<OPERAND, EVENT> hook){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
(this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
(preState, eventType, new SingleInternalArc(postState, hook)));
}

从其代码可以看出在addTransition中又new了一个新的StateMachineFactory。这里涉及到两个类ApplicableSingleOrMultipleTransitionSingleInternalArc

先来看下SingleInternalArc类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private class SingleInternalArc
implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {
private STATE postState;
private SingleArcTransition<OPERAND, EVENT> hook; // transition hook
SingleInternalArc(STATE postState,
SingleArcTransition<OPERAND, EVENT> hook) {
this.postState = postState;
this.hook = hook;
}
@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType) {
if (hook != null) {
hook.transition(operand, event);
}
return postState;
}
}

该类有两个属性,分别为表示STATE的postState属性和表示事件发生之后调用对应hook进行处理的SingleArcTransition属性。

该类只有一个doTransition方法,该方法中会调用hook.transition去处理发生该事件之后的状态变化,hook正常处理结束之后,返回postState状态。

该类是用来处理一个状态被一个事件触发之后转移到下一个状态的,还有一个对应的处理多个状态MultipleInternalArc类,其逻辑结构和SingleInternalArc类似,只是在hook处理结束之后,判断下postState是否在备选的状态集中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private class MultipleInternalArc
implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>{
// Fields
// 存储可供选择的postState状态
private Set<STATE> validPostStates;
// 事件对应的hook
private MultipleArcTransition<OPERAND, EVENT, STATE> hook; // transition hook
MultipleInternalArc(Set<STATE> postStates,
MultipleArcTransition<OPERAND, EVENT, STATE> hook) {
this.validPostStates = postStates;
this.hook = hook;
}
@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType)
throws InvalidStateTransitonException {
STATE postState = hook.transition(operand, event);
// 校验postState是否在可选的状态集中
if (!validPostStates.contains(postState)) {
throw new InvalidStateTransitonException(oldState, eventType);
}
return postState;
}
}

接下来看下ApplicableSingleOrMultipleTransition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static private class ApplicableSingleOrMultipleTransition
<OPERAND, STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {
final STATE preState;
final EVENTTYPE eventType;
final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
ApplicableSingleOrMultipleTransition
(STATE preState, EVENTTYPE eventType,
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {
this.preState = preState;
this.eventType = eventType;
this.transition = transition;
}
@Override
public void apply
(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
= subject.stateMachineTable.get(preState);
if (transitionMap == null) {
// I use HashMap here because I would expect most EVENTTYPE's to not
// apply out of a particular state, so FSM sizes would be
// quadratic if I use EnumMap's here as I do at the top level.
transitionMap = new HashMap<EVENTTYPE,
Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
subject.stateMachineTable.put(preState, transitionMap);
}
transitionMap.put(eventType, transition);
}
}

ApplicableSingleOrMultipleTransition主要是将preState、eventType和transition的映射关系放入stateMachineTable属性中。

addTransition调用的构造方法为:

1
2
3
4
5
6
7
8
9
private StateMachineFactory
(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,
ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) {
this.defaultInitialState = that.defaultInitialState;
this.transitionsListNode
= new TransitionsListNode(t, that.transitionsListNode);
this.optimized = false;
this.stateMachineTable = null;
}

这个构造函数主要是对属性transitionsListNode进行实例化,transitionsListNode的主要作用的把状态机中的transition按照状态转移的顺利逆序的链成一个链表。下面看下transitionsListNode是怎么实现这个链表的,

1
2
3
4
5
6
7
8
9
10
11
private class TransitionsListNode {
final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
final TransitionsListNode next;
TransitionsListNode
(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition,
TransitionsListNode next) {
this.transition = transition;
this.next = next;
}
}

有两个属性,分别为ApplicableTransition(一个接口,ApplicableSingleOrMultipleTransition实现了该接口)的transition和TransitionsListNode的next属性。从构造函数中可以看出transition是当前状态转移对应的处理类,next指向的是下一个TransitionsListNode,此时的下一个TransitionsListNode其实是上一个StateMachineFactory中的TransitionListNode

跳过2中的addTransition,先看下3中的addTransition,

1
2
3
4
5
6
7
8
9
10
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, Set<STATE> postStates,
EVENTTYPE eventType,
MultipleArcTransition<OPERAND, EVENT, STATE> hook){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
(this,
new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
(preState, eventType, new MultipleInternalArc(postStates, hook)));
}

和1中的addTransition类似,只是将SingleArcTransition换成了MultipleInternalArc,这两个类的区别在上面已经介绍过,其余的代码逻辑是一样的。

现在看下2中的addTransition

1
2
3
4
5
6
7
8
9
10
11
12
13
public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition(
STATE preState, STATE postState, Set<EVENTTYPE> eventTypes,
SingleArcTransition<OPERAND, EVENT> hook) {
StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> factory = null;
for (EVENTTYPE event : eventTypes) {
if (factory == null) {
factory = addTransition(preState, postState, event, hook);
} else {
factory = factory.addTransition(preState, postState, event, hook);
}
}
return factory;
}

2中只是循环所有的事件类型对每一个事件类型调用1中的addTransition。

通过对所有addTransition的分析发现每个addTransition都会new一个新的StateMachineFactory,并将上一个StateMachineFactory的一些属性值赋值到当前StateMachineFactory中。

installTopology

addTransition把状态都添加到StateMachineFactory中之后,通过调用installTopology进行状态链的初始化。

1
2
3
4
5
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
installTopology() {
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);
}

这里也实例化了一个新的StateMachineFactory,不同的是将属性optimized设置为true。

1
2
3
4
5
6
7
8
9
10
11
12
private StateMachineFactory
(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,
boolean optimized) {
this.defaultInitialState = that.defaultInitialState;
this.transitionsListNode = that.transitionsListNode;
this.optimized = optimized;
if (optimized) {
makeStateMachineTable();
} else {
stateMachineTable = null;
}
}

当optimized为true是,会在构造函数中调用makeStateMachineTablestateMachineTable进行赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void makeStateMachineTable() {
// 声明一个stack数据结构,用来存储TransitionsListNode链中的TransitionsListNode
// 之所以用stack,是因为TransitionsListNode链是按照状态转移的逆序排列的
// 也就是说TransitionsListNode链中的第一个是状态转移中最后一个状态对应的ApplicableSingleOrMultipleTransition
Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();
Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();
prototype.put(defaultInitialState, null);
// I use EnumMap here because it'll be faster and denser. I would
// expect most of the states to have at least one transition.
stateMachineTable
= new EnumMap<STATE, Map<EVENTTYPE,
Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);
// 将TransitionsListNode链中的ApplicableSingleOrMultipleTransition入stack
for (TransitionsListNode cursor = transitionsListNode;
cursor != null;
cursor = cursor.next) {
stack.push(cursor.transition);
}
// 将ApplicableSingleOrMultipleTransition出stack,
// 调用apply对stateMachineTable进行赋值
while (!stack.isEmpty()) {
stack.pop().apply(this);
}
}

下面看下ApplicableSingleOrMultipleTransition.apply方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void apply
(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {
// 从stateMachineTable中拿到preState对应的transitionMap
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
= subject.stateMachineTable.get(preState);
// transitionMap为null则new一个transitionMap的HashMap
if (transitionMap == null) {
// I use HashMap here because I would expect most EVENTTYPE's to not
// apply out of a particular state, so FSM sizes would be
// quadratic if I use EnumMap's here as I do at the top level.
transitionMap = new HashMap<EVENTTYPE,
Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
subject.stateMachineTable.put(preState, transitionMap);
}
// 将eventType对应的Transition放入transitionMap中
transitionMap.put(eventType, transition);
}

触发状态转移

RMAPPImpl中状态转移的触发是在RMAPPImpl.handle中触发的,在handle中调用this.stateMachine.doTransition(event.getType(), event),其中stateMachine是在RMAPPImpl的构造方法中调用stateMachineFactory.make(this)进行实例化的。

看下make方法

1
2
3
public StateMachine<STATE, EVENTTYPE, EVENT> make(OPERAND operand) {
return new InternalStateMachine(operand, defaultInitialState);
}

在make中new了一个InternalStateMachine对象,则stateMachine其实是InternalStateMachine类的对象,stateMachine.doTransition的调用其实就是stateMachine.doTransition的doTransition方法,然而在stateMachine.doTransition方法中又调用了StateMachineFactory的doTransition方法,StateMachineFactory.doTransition代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// StateMachineFactory.doTransition
private STATE doTransition
(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitonException {
// We can assume that stateMachineTable is non-null because we call
// maybeMakeStateMachineTable() when we build an InnerStateMachine ,
// and this code only gets called from inside a working InnerStateMachine .
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
= stateMachineTable.get(oldState);
if (transitionMap != null) {
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition
= transitionMap.get(eventType);
if (transition != null) {
return transition.doTransition(operand, oldState, event, eventType);
}
}
throw new InvalidStateTransitonException(oldState, eventType);
}

StateMachineFactory.doTransition只能通过内部类InnerStateMachine的doTransition方法调用,调用StateMachineFactory.doTransition时假设stateMachineTable不为null,因为在InnerStateMachine的构造方法中可能会调用maybeMakeStateMachineTable方法。

从stateMachineTable中得到当前state和eventType对应的transition,然后调用transition.doTransition方法,调用eventType对应的hook去执行相关逻辑。

transition是一个接口,实现该接口的有两个类SingleInternalArc和MultipleInternalArc,相应的hook处理完之后,doTransition返回结束时的状态。

至此一次状态转移就完成了。

状态转移代码示例

下面以RMAppEventType.START事件类型被触发之后,发送的状态转移。

首先该事件在RMAppImpl.handle中被处理,调用this.stateMachine.doTransition(event.getType(), event)方法,跟踪到InternalStateMachine.doTransition,在其内又继续调用StateMachineFactory.this.doTransition,然后根据RMAppEventType.START事件被触发时的状态RMAppState.NEW,从stateMachineTable中EventType和Transition的映射关系transitionMap,接着从transitionMap中拿到RMAppEventType.START对应的transition(SingleInternalArc),接着调用transition的doTransition方法,这里才调用到该状态转移涉及到的hook,也就是在addTransition中注册的hook类RMAppNewlySavingTransition

代码流程为:
stateMachine[InternalStateMachine].doTransition -> StateMachineFactory.this.doTransition -> transition[SingleInternalArc].doTransition -> hook[RMAppNewlySavingTransition].transition

此流程结束之后RMAppMaster的状态由RMAppState.NEW转移为RMAppState.NEW_SAVING,这套流程是在addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition())中规定的。

最后列一个RMApp的状态转移流程图
RMApp状态机
图中的顶点是状态,边是触发状态转移的事件类型

您的肯定,是我装逼的最大的动力!