YARN源码分析之Fair Scheduler part1

想看fair Scheduler的源码已经很久了,只是一直没有合适的时间,更没有找到合适的方法去读src,导致每次读一点就不想继续读了,最近实在是太无聊了,也不知道该读点什么,于是就又想起了这个搁置很久的任务。

这次读代码时我直接从fair Scheduler的test类中进行跟读,这样我感觉会比较清晰也容易debug。

环境初始化

要想测试Fair Scheduler,首先要虚拟出一个yarn环境,并加载fair相关的配置。
下面就来看下TestFairScheduler这个类是怎么对环境进行初始化的。

TestFairScheduler继承自FairSchedulerTestBase,一些公共常用方法被提到FairSchedulerTestBase中,yarn和相关配置环境的初始化在setUp中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void setUp() throws IOException {
// new 出一个fair Scheduler实例
scheduler = new FairScheduler();
// 在FairSchedulerTestBase中实现
conf = createConfiguration();
// new 出一个rm
resourceManager = new ResourceManager();
resourceManager.init(conf);
// TODO: This test should really be using MockRM. For now starting stuff
// 启动rm的异步调度器
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
resourceManager.getRMContext().getStateStore().start();
// to initialize the master key
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
scheduler.setRMContext(resourceManager.getRMContext());
}
// FairSchedulerTestBase
protected Configuration createConfiguration() {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
1024);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
return conf;
}

环境初始化之后,我们先来看个简单的例子,我们能从这个例子中知道每个队列的FairShare是如何算出来的。

FairShare计算逻辑

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public void testFairShareWithMaxResources() throws IOException {
// 设置fair-scheduler.xml的加载地址,由AllocationFileLoaderService进行加载文件内容
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
// set queueA and queueB maxResources,
// the sum of queueA and queueB maxResources is more than
// Integer.MAX_VALUE.
// 将fair的配置信息写入fair-sheduler.xml中
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<maxResources>1073741824 mb 1000 vcores</maxResources>");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<maxResources>1073741824 mb 1000 vcores</maxResources>");
out.println("<weight>.75</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A wants 1 * 1024.
createSchedulingRequest(1 * 1024, "queueA", "user1");
// Queue B wants 6 * 1024
createSchedulingRequest(6 * 1024, "queueB", "user1");
// createSchedulingRequest(1 * 1024, "root.default", "user1");
scheduler.update();
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
// queueA's weight is 0.25, so its fair share should be 2 * 1024.
System.out.println( "queueA FairShare: " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", max/min : " + queue.getMaxShare() + "/" + queue.getMinShare());
// queueB's weight is 0.75, so its fair share should be 6 * 1024.
queue = scheduler.getQueueManager().getLeafQueue(
"queueB", false);
System.out.println( "queueB FairShare: " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", max/min : " + queue.getMaxShare() + "/" + queue.getMinShare());
queue = scheduler.getQueueManager().getLeafQueue(
"default", false);
System.out.println( "default FairShare: " + queue.getFairShare().getMemory() +
", weight : " + queue.getWeights().getWeight(ResourceType.MEMORY) +
", steadyShare : " + queue.getSteadyFairShare() +
", demand : " + queue.getDemand() +
", max/min : " + queue.getMaxShare() + "/" + queue.getMinShare());
}

代码的运行结果如下:

1
2
3
queueA FairShare: 2048, weight : 0.25, steadyShare : <memory:1024, vCores:0>, demand : <memory:1024, vCores:1>, max/min : <memory:1073741824, vCores:1000>/<memory:0, vCores:0>
queueB FairShare: 6144, weight : 0.75, steadyShare : <memory:3072, vCores:0>, demand : <memory:6144, vCores:1>, max/min : <memory:1073741824, vCores:1000>/<memory:0, vCores:0>
default FairShare: 0, weight : 1.0, steadyShare : <memory:4096, vCores:0>, demand : <memory:0, vCores:0>, max/min : <memory:2147483647, vCores:2147483647>/<memory:0, vCores:0>

由上面的运行结果我们可以猜出Weight的默认值是1.0maxShare和minShare(即maxResources和minResources)的默认值分别为Integer.MAX_VALUE(2147483647)和0,但是FairShare和steadyFairShare是什么鬼?

我们先将FairShare和steadyFairShare放下,先介绍下与他们相关的WeightmaxResources/minResources

Weight

Weight 值的类型为大于等于0.0的数,默认为1.0。Weight是队列的一个属性,作为一个权值,使各个队列得到的资源成比例。

注意事项:

  • weigth的值决定了一个队列相对于其它兄弟队列而言应该得到的资源,假设:
    集群资源为1000GB、200个vcore,queueX的weight是0.25,其它队列的weight总和为0.75,
    则queueX的FairShare是250GB、50个vcore这里的FairShare是指steadyFairShare,原因在下面介绍

  • Weight不只用来计算steadyFairShare也用来计算FairShare。假如这个队列或者其子队列中至少有一个active Application,则FairShare就会被强制计算(这里的FairShare是指Instantaneous FairShare)。

  • weigth的值是和集群相关的,所以当集群的资源改变之后FairShare也会发生变化,

思考 Weight的值应该怎么设置?怎么设置最合理???

minResources和maxResources

minResources和maxResources是队列的资源限制。默认是0和Integer.MAX_VALUE

注意事项:

  • minResources是一个软性的限制。满足下面这两种情况时,minResources会强制这个队列的总资源大于或者等于minResources配置的大小。
    1) 资源是可用的或者可以从其它队列中抢占
    2) 所有队列的minResources的总和不超过集群的总资源
  • maxResources是一个硬性的限制。也就是说该队列的子队列或者后代队列正在使用资源的总和不会超过此值。
  • minResources和maxResources并不被推荐。这两个配置有一些缺点:
    1) minResources和maxResources的值都是静态的。因此集群资源发生变化的话此值需要更新。(那为什么不设置为百分比?是否可以自己优化下???)
    2) maxResources限制了集群资源的使用,队列超过maxResources之后不能再使用任何空闲的资源。(这个在不同的场景优缺点的性质也不一样)
    3) 如果一个队列的minResources超过了FairShare,它可能会对其他队列的FairShare造成影响。
    4) In the past, one specified minResources when using preemption to get a chunk of resources sooner. This is no longer necessary as FairShare-based preemption has been improved significantly. We will discuss this subject in more detail later.

FairShare和SteadyFairShare

Fair scheduler包含两种FairShare,分别是Instantaneous FairShareSteady FairShare,其中Instantaneous FairShare一般被简称为FairShare(本文中的FairShare是指Instantaneous FairShare的简称)。

Steady FairShare是一个队列的理想值,这个值是由集群资源和weigths计算出来的,并不会被经常计算,只是在集群启动或者集群的总资源发生增减的时候才会被重新计算。每个队列的Steady FairShare的计算公式是`totalResources(集群总资源)/weights(所有队列的weight总和)weight(某个队列的weight)`*

Instantaneous FairShare计算集群中每个active队列的FairShare。Steady FairShare是计算集群中所有的队列。这两者的区别为:

  • 空队列也就是没有分配任何资源的队列不会计算Instantaneous FairShare,其值为0。
  • 如果集群中所有的队列都是active,则Steady FairShare和Instantaneous FairShare相等
    每个队列的Instantaneous FairShare的计算公式是`totalResources(集群总资源)/activeWeights(active队列的weight总和)activeWeight(active队列的weight)`*

上面的两个计算公式都不太严谨,仅供参考,代码中真正的计算逻辑比较复杂,先看下计算Steady FairShare代码的流程。

FairShare
每一个队列/作业会有一个FairShare值,表示该队列/作业当前应得的资源。调度器后台会有一个线程不停的根据当前集群整体情况,动态的更新每个作业的FairShare值。通常当某个作业实际获取到资源不满足其FairShare时,该作业会更有可能获取到新的资源,同样当作业实际获取到资源超出其FairShare,则很难再获取到新的资源,甚至现有的资源也会被抢占。
FairShare的计算也是一个自顶向下资源分配的过程,通常情况下同一个队列内的各个作业会尽量均分队列资源。同时具体某个作业的FairShare会受到其他一些参数的修正。

上面提到Steady FairShare只是在集群总资源发生变化时才会被计算,则其入口函数是scheduler.handle(nodeEvent1);,scheduler捕获到NODE_ADDED事件,调用addNode,然后继续调用queueMgr.getRootQueue().setSteadyFairShare(clusterResource);queueMgr.getRootQueue().recomputeSteadyShares()先设置整个集群的steadyFairShare,然后计算每个队列的steadyFairShare,这里从recomputeSteadyShares开始屡下代码的思路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void recomputeSteadyShares() {
// 根据不同的调度策略计算steadyFairShare
// fair scheduler中可以为每个队列设置调度策略,这里以fair为例
policy.computeSteadyShares(childQueues, getSteadyFairShare());
// 将上面计算的steadyFairShare赋值给每个队列
// 如果队列是父队列,则继续steadyFairShare
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
if (childQueue instanceof FSParentQueue) {
((FSParentQueue) childQueue).recomputeSteadyShares();
}
}
}
// FairSharePolicy.java
public void computeSteadyShares(Collection<? extends FSQueue> queues,
Resource totalResources) {
ComputeFairShares.computeSteadyShares(queues, totalResources,
ResourceType.MEMORY);
}

这里计算steadyFairShare用的是fair策略,则调用FairSharePolicy.computeSteadyShares,调用ComputeFairShares.computeSteadyShares来计算steadyFairShare,ComputeFairShares是计算FairShare的主要逻辑实现。先来看下computeSteadyShares方法:

1
2
3
4
5
6
public static void computeSteadyShares(
Collection<? extends FSQueue> queues, Resource totalResources,
ResourceType type) {
// 计算steadyFairShare和FairShare,最后一个参数是标识位,true为计算steadyFairShare
computeSharesInternal(queues, totalResources, type, true);
}

由于FairShare和steadyFairShare都是在ComputeFairShares类中,而且其计算逻辑类似,所以放在同一个方法中计算,提出一个标识位来标识计算的是FairShare还是steadyFairShare,这个方法就是computeSharesInternal

computeSteadyShares计算的是steadyFairShare,则调用computeSharesInternal时,将标识位设为true,computeSharesInternal的计算逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private static void computeSharesInternal(
Collection<? extends Schedulable> allSchedulables,
Resource totalResources, ResourceType type, boolean isSteadyShare) {
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
// 得到不用计算FairShare或者SteadyFairShare的队列和这些队列的返回的资源
int takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);
if (schedulables.isEmpty()) {
return;
}
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables' max shares.
// 对需要计算FairShare的队列的maxShare进行sum
int totalMaxShare = 0;
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
break;
}
}
int totalResource = Math.max((getResourceValue(totalResources, type) -
takenResources), 0);
// 可用资源的总和
totalResource = Math.min(totalMaxShare, totalResource);
// 找到R的上限
double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
< totalResource) {
rMax *= 2.0;
}
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
// 二分查找,有迭代上限,COMPUTE_FAIR_SHARES_ITERATIONS是25
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
mid, schedulables, type);
if (plannedResourceUsed == totalResource) {
right = mid;
break;
} else if (plannedResourceUsed < totalResource) {
left = mid;
} else {
right = mid;
}
}
// Set the fair shares based on the value of R we've converged to
// 对每个队列设置FairShare或者SteadyFairShare,
// right值就是所要求的FairShare或者SteadyFairShare值
for (Schedulable sched : schedulables) {
if (isSteadyShare) {
setResourceValue(computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else {
setResourceValue(
computeShare(sched, right, type), sched.getFairShare(), type);
}
}
}

computeSharesInternal可以用来计算FairShare和SteadyFairShare,但是这两个计算的队列集合是不一样的,那这个队列集合是在哪进行筛选的呢?从上面的代码中可以看到有个handleFixedFairShares方法,看下其具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static int handleFixedFairShares(
Collection<? extends Schedulable> schedulables,
Collection<Schedulable> nonFixedSchedulables,
boolean isSteadyShare, ResourceType type) {
// 如果队列是fixed,则返回其队列 fixedShare 的总和
int totalResource = 0;
for (Schedulable sched : schedulables) {
// 判断队列是否是fixed
int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
if (fixedShare < 0) {
nonFixedSchedulables.add(sched);
} else {
setResourceValue(fixedShare,
isSteadyShare
? ((FSQueue)sched).getSteadyFairShare()
: sched.getFairShare(),
type);
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
Integer.MAX_VALUE);
}
}
return totalResource;
}

handleFixedFairShares用来对队列进行处理,将fixed队列的fixed fairshare进行求和返回,并将非fixed的队列放入nonFixedSchedulables中。判断某个队列是否fixed是由方法getFairShareIfFixed判断的,判断标准如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static int getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, ResourceType type) {
// 当maxResources小于等于0时,该队列是fixed
if (getResourceValue(sched.getMaxShare(), type) <= 0) {
return 0;
}
// isSteadyShare是false,用来求FairShare,则判断队列是否为active
// 该队列不是active时,返回0,代表该队列是fixed
// For instantaneous fairshares, check if queue is active
if (!isSteadyShare &&
(sched instanceof FSQueue) && !((FSQueue)sched).isActive()) {
return 0;
}
// 当队列的weight小于等于0时,该队列是fixed,返回0或者minShare
// Check if weight is 0
if (sched.getWeights().getWeight(type) <= 0) {
int minShare = getResourceValue(sched.getMinShare(), type);
return (minShare <= 0) ? 0 : minShare;
}
// 当队列是非fixed时,返回-1
return -1;
}

判断一个队列是fixed的条件是:

  • maxResources小于等于0
  • weight小于等于0
  • 当求FairShare时,如果队列是非active时

computeSharesInternal通过handleFixedFairShares将nonFixedSchedulables放入schedulables中,schedulables集合中存放的就是需要计算FairShare或者SteadyFairShare的队列。

求出所需计算的队列集合之后,则对这些队列应得的资源进行计算,要计算应得资源则需求出这些队列的总资源(这里的总资源并不是集群的总资源,而是所需计算队列的总资源)。所求队列的总资源就是这些队列所配置的maxResources之和,其计算方式为:

1
2
3
4
5
6
7
8
9
10
// 这部分代码在computeSharesInternal中
int totalMaxShare = 0;
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
break;
}
}

这些队列的总和有可能超过Integer.MAX_VALUE,则需要对其总资源进行校验,代码如下:

1
2
3
4
5
// 这部分代码在computeSharesInternal中
// totalResources 集群总资源,takenResources为不需要计算队列(即fixed 队列)的资源总和
int totalResource = Math.max((getResourceValue(totalResources, type) -
takenResources), 0);
totalResource = Math.min(totalMaxShare, totalResource);

求出这些队列的总资源,则下面就是将这些资源根据各个队列的weight进行分配。

在考虑minResources和maxResources时尽可能的进行公平分配资源,fair scheduler规定FairShare需在minResources和maxResources之间

fair scheduler定义一个比值R,则各个队列应分配的资源分为如下三种情况:

  • 当队列S的minShare > R*weight,则分配minShare
  • 当队列S的manShare < R*weight,则分配manShare
  • 其它情况直接分配R*weight
    这些分配的总和等于totalSlots。这个比值R也称为weight-to-slots,因为其值将队列的weight转换为实际的资源个数。

那么R的取值是关键,R过小则导致分配给队列的slots总和小于totalSlots,R过大则导致分配的slots总和多余totalSlots。

R的查找可以使用二分查找,则另一个关键点就是R上限的确定,由上面对R的定义,得知R的上限就是尽可能的让分配的slots接近totalSlots。R上限的代码如下:

1
2
3
4
5
6
7
// 这部分代码在computeSharesInternal中
double rMax = 1.0;
// rMax不满足上限的标准则使其乘以2,继续循环
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
< totalResource) {
rMax *= 2.0;
}

resourceUsedWithWeightToResourceRatio返回的值是当R为rMax时,各个队列分配的slots总和。而各个队列应该分配的slots是通过computeShare计算的。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
Collection<? extends Schedulable> schedulables, ResourceType type) {
int resourcesTaken = 0;
for (Schedulable sched : schedulables) {
int share = computeShare(sched, w2rRatio, type);
resourcesTaken += share;
}
return resourcesTaken;
}
private static int computeShare(Schedulable sched, double w2rRatio,
ResourceType type) {
double share = sched.getWeights().getWeight(type) * w2rRatio;
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
return (int) share;
}

找到R的最大值之后,就是进行二分查找,找到最公平的分配方案,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 这部分代码在computeSharesInternal中
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
mid, schedulables, type);
if (plannedResourceUsed == totalResource) {
right = mid;
break;
} else if (plannedResourceUsed < totalResource) {
left = mid;
} else {
right = mid;
}
}

最后根据isSteadyShare标识设置每个队列的FairShare或者SteadyFairShare的值。

总结

本篇主要解析了下队列FairShare和SteadyFairShare的计算过程,其中FairShare和SteadyFairShare的区别为FairShare分为Instantaneous FairShare和SteadyFairShare,但是Instantaneous FairShare一般简称为FairShare,则FairShare和SteadyFairShare的区别为前者计算时不包括未active的队列,后者计算时包括所有的队列。

FairShare和SteadyFairShare的计算思想为:
先找到一个比值R,则各个队列应分配的资源分为如下三种情况:

  • 当队列S的minShare > R*weight,则分配minShare
  • 当队列S的maxShare < R*weight,则分配maxShare
  • 其它情况直接分配R*weight
    这些分配的总和等于totalSlots。这个比值R也称为weight-to-slots,因为其值将队列的weight转换为实际的资源个数。

其中比值R的查找比较麻烦,平时如果想简单的评估下各个队列应分配的资源可以使用如下公式:
Steady FairShare的计算公式是totalResources(集群总资源)/weights(所有队列的weight总和)*weight(某个队列的weight)
FairShare的计算公式是totalResources(集群总资源)/activeWeights(active队列的weight总和)*activeWeight(active队列的weight)

这两个公式成立的前提是所求出的SteadyFairShare和FairShare在minResources和maxResources之间。如果小于minResources(大于maxResources)则steadyFairShare或者FairShare应该为minResources(maxResources),这种情况分配给队列的slots总和会超出totalSlots。

您的肯定,是我装逼的最大的动力!