Flink window解密

近几年流处理简直火的一塌糊涂,Flink也在阿里的推广下逐渐成为业界主流,正在逐步取代Storm和Spark Streaming。
Flink天生就是为了处理流,而且相对于其他流处理框架集成了很多实用功能,比如状态管理;Flink还丰富了window的功能,使window也成为了Flink的一个特色。
本篇主要对window从多种角度进行解密,主要包括:

  • 何为window
  • 如何使用
  • 如何实现
  • 各组件原理

在解密window之前,简单介绍下何为流处理?

流是指一种源源不断的形态,流处理是一种被设计来处理无界数据集的数据处理系统引擎,无界的数据集是指没有边界,数量上是无穷的一类数据集。
Flink将流处理总结为四个单词: what where when how

  • what – What results are calculated(计算的结果是什么)

  • where – Where in event time are results calculated(在事件时间中的哪个位置计算结果)

  • when – When in processing time are results materialized(在处理时间中的哪个时刻触发计算结果)

  • how – How do refinements of results relate(如何修正结果)

既然流处理有这么多的问题需要解决,那么为什么还要进行流计算?

  1. 在商业竞争中极度渴望更快的数据,而转换成流计算则是一个好的方法来降低延迟。
  2. 海量的、无穷数据集在现在的商业环境里变的越来越常见,而用专门设计来处理这样数据的系统来应对这些数据则更为容易。
  3. 在数据到达时就对他们进行处理能够更加平均地把负载进行均衡,取得更好的一致性和更可预测的计算资源消耗。

何为window

window回答了where这个问题,因为在流处理应用中,数据是连续不断的,我们不可能等到所有数据都到了才开始处理,况且我们也无法判断所有的数据何时能完整到来。而且在实际的环境中也需要做一些聚合计算,例如计算过去5分钟某个页面的UV,这就必须定义一个窗口,用来收集并计算这5分钟内的数据。所以window很好的回答了where这个问题,他决定了数据在哪里被计算。

window是无限流上一种核心机制,其按照时间边界对数据源进行切分,切分方式分为固定窗口、滑动窗口和会话窗口三种,同时,在可以在窗口内进行聚合,从而把源源不断产生的数据根据不同的条件划分成一段一段有边界的数据区间,使用户能够利用窗口功能实现很多复杂的统计分析需求。具体实现时,窗口又可以是严格按照时间来驱动,例如TimeWindow;也可以由数据驱动,例如CountWindow。切分方式如图
window切分方式

固定窗口(Fixed windows)

固定窗口按照固定的长度进行分片,这个固定的长度可以时间维度也可以是数据维度。如上图中红色的划分方式,这个固定的长度可以是每1分钟也可以是每100个元素。
固定窗口典型地会对所有的数据集进行等份划分,也叫对齐窗口。在某些情形下,可能会希望对不同的数据子集应用不同的相位偏移,从而能让分片的完整度更加的平均。这时就不再是对齐窗口,而是非对齐窗口。

滑动窗口(Sliding windows)

滑动窗口是固定窗口的一个更一般化的形式,滑动窗口一般会定义两个属性,即窗口大小(时间长短)和滑动时间。如果滑动时间比窗口要小,则窗口会重叠;如果相等,这就是固定窗口;如果滑动时间比窗口大,就产生了一种特殊的数据采样,也就是按时间只看数据集里的一部分子集的数据。类似于固定窗口,滑动窗口一般也是对齐的。出于性能考虑也会在某些情况下是非对齐的。需要注意的是,上图中蓝色部分为了能表明滑动的性质而没有把每个窗口对应到所有的键。实际情况里是都要对应到的。

会话窗口(Sessions)

会话是指在不活跃时间段之间的一连串事件,这个不活跃时间一般是设定的比超时的时间要长,会话窗口是一种动态窗口。会话单元一般用来做用户行为分析,即观察在一个会话单元里用户的一系列事件。会话单元的长度一般都没法提前确定,完全取决于实际数据的情况。会话单元也是非对齐窗口的一个经典案例,因为实际情况下,不同子集数据的会话单元长度几乎不可能一致地对齐。

一般而言,window是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。下面看下Flink内置的窗口。

Flink提供的内置窗口使用及其实现

Flink将窗口的切分规则分为3种,3种切分规则Flink都有对应内置窗口实现,内置窗口包括滚动时间窗口、滑动时间窗口、滚动计数窗口、滑动计数窗口和会话窗口。这里为了方便将内置窗口分为3类进行介绍,依次为计数窗口countWindow、时间窗口timeWindow和会话窗口sessionWindow。

  • 计数窗口
    计数窗口是根据元素个数对数据流进行分组的,分为滚动计数窗口和滑动计数窗口。使用计数窗口时先将DataStream转换为KeyedStream,然后通过调用countWindow函数进行使用。

例如需要统计每100个订单中的总数,则使用滚动计数窗口,每100个元素滚动一次,代码实现为:

1
2
3
4
5
6
7
8
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
.keyBy(0)
// 滚动计数窗口,每100个元素滚动一次
.countWindow(100)
.sum(1)

与滚动计数窗口相对的是滑动计数窗口,如果要计算每10个元素计算一次最近100个元素的总和,使用滑动窗口,窗口大小为100,每10个元素滑动一次,代码如下:

1
2
3
4
5
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// 滑动计数窗口,窗口大小为100,每10个滑动一次
.countWindow(100, 10)
.sum(1)

使用方式很简单,这里看下countWindow的具体是如何实现的,两个countWindow函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
// 滚动计数窗口
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}

// 滑动计数窗口
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}

由countWindow的实现可以看出window丰富的功能是由一些关键组件组合实现的,这些组件包括evictor和trigger。两者使用的窗口分配器都是GlobalWindows,两者的区别是trigger,因为GlobalWindows的默认trigger是NeverTrigger,意思是不触发窗口计算。滚动窗口的trigger是将CountTrigger封装成PurgingTrigger,根据窗口分配的元素个数进行触发计算,并且在计算之后清除窗口中的数据,从而达到滚动窗口的效果。滑动窗口的trigger只是一个单纯的CountTrigger,根据滑动元素的个数进行计算,由于countWindow使用的是GlobalWindows,只有一共全局窗口,trigger触发计算时又只是单纯的计算结果,并未像滚动窗口那样清除数据,所以滑动窗口中的数据会越来越多,不仅性能上会受影响,而且数据也可能会被重复计算,这里增加了evictor对数据进行过滤。

  • 时间窗口
    时间窗口是根据时间对数据流进行分组对,也分为滚动时间窗口和滑动时间窗口。时间窗口按照时间划分,这个时间在Flink中有三种概念,分别是event time(事件时间:事件发生时的时间),ingestion time(摄取时间:事件进入流处理系统的时间),processing time(处理时间:消息被计算处理的时间)。注:Flink 中窗口机制和时间类型是完全解耦的,也就是说当需要改变时间类型时不需要更改窗口逻辑相关的代码。

例如我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分汇总,这种切分被称为滚动时间窗口,代码如下:

1
2
3
4
5
6
7
8
// Stream of (userId, buyCnt)
val buyCnts: DataStream[(Int, Int)] = ...
// 将DataStream转换为KeyedStream,然后再使用窗口函数
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
.keyBy(0)
// 窗口长度为1分钟的滚动时间窗口
.timeWindow(Time.minutes(1))
.sum(1)

窗口计算时,可以选择窗口切分时使用的时间方式,默认使用的是processtime,使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)对时间方式进行更改。

时间窗口的另一种方式是滑动时间窗口,主要用于平滑地进行窗口聚合计算。如果,需要每分钟统计下最近5分钟用户购买的商品总数,就可以使用滑动时间窗口进行计算,代码如下:

1
2
3
4
5
val slidingCnts: DataStream[(Int, Int)] = buyCnts
.keyBy(0)
// 窗口长度为5分钟,每1分钟滑动一次
.timeWindow(Time.minutes(5), Time.minutes(1))
.sum(1)

时间窗口使用时依然很简单方便,将DataStream流转换为KeyedStream流,调用timeWindow进行使用,传入一个参数代表滚动时间窗口,传入两个参数时代表滑动时间窗口,其中第一个参数代表窗口的长度,第二个参数代表滑动的长度。其具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 滚动时间窗口
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}

// 滑动时间窗口
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}

时间窗口的实现比计数窗口较复杂,每种窗口类型根据不同的时间设置都有专门的实现,滚动窗口的实现是TumblingProcessingTimeWindowsTumblingEventTimeWindows,而滑动窗口的实现是SlidingProcessingTimeWindowsSlidingEventTimeWindowsTumblingProcessingTimeWindowsTumblingEventTimeWindows的实现逻辑一样,都是根据元素的时间指定一个时间窗口TimeWindow,每个元素只能属于一个窗口,不同点只是触发计算的trigger,根据设置的时间策略指定相应的trigger,其中TumblingProcessingTimeWindows的trigger是ProcessingTimeTriggerTumblingEventTimeWindows的trigger是EventTimeTriggerSlidingProcessingTimeWindowsSlidingEventTimeWindows划分窗口的逻辑跟TumblingProcessingTimeWindowsTumblingEventTimeWindows一样,只是滑动窗口中一个元素可以归属多个窗口,所以返回的是一个窗口集合,至于触发器trigger和滚动窗口中的一样。

  • 会话窗口
    此处的会话类似浏览页面时所指的会话,会话窗口并没有固定时间长度,而是根据事件之间的时间间隔来决定的,如果两个事件之间的时间间隔超过阈值,则被划分到不同的窗口中。例如,需要计算每个用户在活跃期间总共购买的商品数量,由于每个用户的活跃时长不固定,不能统一设置窗口时长,所以此时应该使用会话窗口,代码如下:
1
2
3
4
5
6
7
8
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...

val sessionCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// 事件之间超过30秒则划分新窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
.sum(1)

使用会话窗口调用KeydeStream.window方法,同样此方法也在countWindowtimeWindow中被调用,并没有为会话窗口封装单独的函数,使用时传入对应时间的会话窗口分配器即可。

窗口进阶

内置窗口使用比较简单,WindowedStream也提供了一些简单的计算函数,但是如果自定义一些计算逻辑时就用到了窗口函数。
窗口函数分为ReduceFunction、AggregateFunction、FoldFunction和ProcessWindowFunction。前两个是增量聚合函数,性能比较高效;ProcessWindowFunction中会包含一个迭代器,这个迭代器中记录了窗口中的所有元素,除此之外还包含一些窗口的元信息。接下来看下每个窗口函数具体怎么用。

  • ReduceFunction
    将输入的两个元素进行计算,得到一个相同类型的元素做为输出。此函数是增量的对窗口中的元素进行计算,类似于MR中Reduce。具体使用代码如下:
1
2
3
4
5
6
7
8
9
10
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
  • AggregateFunction
    AggregateFunction也是一个增量聚合窗口函数,比ReduceFunction更加通用,包含三个参数,第一个是窗口流中输入的元素,第二个是累加器,第三个元素是计算之后的输出,累加器和输出结果的类型都可以自定义。使用样例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}

@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}

@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}

@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}

DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
  • FoldFunction
    窗口中的元素与累加器进行计算,此时累加器有初始值,并且输出的结果类型和累加器相同。此窗口函数不能用在合并窗口中,比如会话窗口。使用样例代码如下:
1
2
3
4
5
6
7
8
9
10
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
  • ProcessWindowFunction
    ProcessWindowFunction最灵活,包含的信息也较多,可以对整个窗口中的元素进行迭代计算,还可以访问窗口的一些元信息。但是单纯使用ProcessWindowFunction时,由于要缓存窗口中的所有元素,所以会消耗一些资源。使用样例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}

ProcessWindowFunction中记录的窗口信息在抽象类Context中,ProcessWindowFunction抽象类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;

/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();

/** Returns the current processing time. */
public abstract long currentProcessingTime();

/** Returns the current event-time watermark. */
public abstract long currentWatermark();

/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();

/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}

}

虽然ProcessWindowFunction比较耗资源,但是在有些场景下他可以和之前介绍的增量聚合窗口函数一起使用,这样不仅可以将元素进行增量聚合减少资源消耗,也可以访问ProcessWindowFunction中记录的窗口元信息。下面的例子是ReduceFunction和ProcessWindowFunction结合使用的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...;

input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}

private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}

窗口底层实现

窗口的实现包含三个组件,分别是窗口分配器(WindowAssigner)、触发器(Trigger)和驱逐器(Evictor)。其中窗口分配器决定元素该分发到哪个窗口;触发器决定窗口中的元素何时计算或者清除该窗口中的元素,每个窗口都有自己的trigger;驱逐器类似过滤器,根据一定规则将窗口中部分数据清除掉。将这三个组件的不同实现组合在一起,就能实现各种窗口计算。

  • 窗口分配器(WindowAssigner)

先看下核心函数
assignWindows: 将对应的元素分配到指定的窗口中
getDefaultTrigger: 获取该窗口分配器默认指定的Trigger
isEventTime: 是否基于event time对元素进行分配

countWindow中用的是GlobalWindow,所以该分配器只是返回当前窗口即可。
重点看下timeWindow的assignWindows方法,这里只关注下SlidingEventTimeWindowsassignWindows,因为滚动窗口的分配逻辑和滑动窗口类似。SlidingEventTimeWindows.assignWindows代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// 计算窗口可以有多少个滑动窗口
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
// 当前元素时间所在的最近窗口的起始位置
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
// 生成该元素的窗口
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
// 窗口的区间是前闭后开
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}

核心代码逻辑在TimeWindow.getWindowStartWithOffset中,代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*
* 滑动窗口和滚动窗口都是调用这个函数获取窗口的起始位置
* 滑动窗口传入的windowSize是滑动的大小,对于滑动窗口按照slide滑动,其实可以理解成长度为slide的滚动窗口
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
// offset 可以理解成最开始到那个窗口的起始位置
// timestamp - offset 为 到最开始那个窗口的距离
// (timestamp - offset + windowSize) % windowSize 离最近窗口的距离,相当于timestamp - offset - windowSize - windowSize ...
// timestamp - (timestamp - offset + windowSize) % windowSize 则为最近窗口的起始位置
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
  • 触发器(Trigger)

先看下核心函数
onElement: 处理每个添加到窗口中的元素
onEventTime: TriggerContext中注册的event-time timer被触发时调用
onProcessingTime: TriggerContext中注册的processing-time timer被触发时调用
onMerge: window被merge时触发

在WindowAssigner中,会调用getDefaultTrigger得到该窗口的默认触发器。这里看下SlidingEventTimeWindows的默认trigger–EventTimeTrigger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// element 所在的 window,maxTimestamp为end-1
// wartermark是定期生成的,maxTimestamp正常应该是大于wartermark的
// window的maxTimestamp是不变的,wartermark是周期性变化的
// 则onElement一般不会触发FIRE
// 迟到时会触发
// 假如一直没有迟到的数据到达是不是就不会在这里触发,
// 假如迟到的数据到达了,但是该窗口已经由Time触发了FIRE怎么办??
// 迟到和乱序的理解:迟到会再次触发window计算,乱序是window等待一段时间进行计算,只计算一次。
// 迟到会触发多次,只要在规定的最大迟到时间内都会触发计算
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}

// 最后一秒中的第一条数据触发窗口计算,那这之后的数据怎么计算?
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
  • 驱逐器(Evictor)

先看下核心函数
evictBefore: 在触发窗口函数计算之前对窗口中的元素进行过滤
evictAfter: 在触发窗口函数计算之后对窗口中的元素进行过滤
evict: 过滤窗口中元素的具体实现,在evictBefore或者evictAfter内调用

完整的窗口计算流程,经历了 Window Assigner -> Trigger -> Evictor -> Evaluation Function 的过程,最终获得结果。

您的肯定,是我装逼的最大的动力!