Flink 窗口
[TOC]
Apache Flink 02
Time 时间
时间类型
- 👉 EventTime
- 事件时间:指时间发生的时间,一旦确定之后再也不会改变。
- ProcessingTime
- 处理时间:指消息被计算引擎处理的时间,以各个计算节点的本地时间为准。
- IngestionTime
- 摄取时间:指事件进去流处理系统的时间,对于一个事件来说,使用其被读取的那一刻时间戳。
👉 事件时间和水位线
当涉及到支持事件时间时,Flink 的流处理运行时建立在悲观假设之上,即事件可能会乱序到达,即时间戳为 t 的事件可能会在时间戳为 t+1 的事件之后到达。由于这个原因,系统永远无法确定在未来是否会有时间戳小于给定时间戳 T 的元素到来。为了在实际可行的系统中减轻这种乱序对最终结果的影响,Flink 在流处理模式下使用了一种称为水印(Watermarks)的启发式方法。一个时间戳为 T 的水印表示没有时间戳小于 T 的元素会随后到达。
然而,在批处理模式下,由于输入数据集是预先已知的,因此不需要这样的启发式方法。至少,元素可以根据时间戳进行排序,以便按时间顺序进行处理。对于熟悉流处理的读者来说,在批处理中我们可以假设存在“完美水印”。
TimestampAssigner
👉 SerializableTimestampAssigner
👉 TimestampAssignerSupplierWatermarkGeneratorSupplier
ATTR : WatermarkGenerate
NoWatermarksGenerator
👉 BoundedOutOfOrdernessWatermarks
AscendingTimestampsWatermarksWatermarkStrategy
extends TimestampAssignerSupplier , WatermarkGeneratorSupplier
WatermarkStrategyWithTimestampAssigner
WatermarkStrategyWithIdleness
生成水位线方式:
生成式
1
2
3
4
5
// 共单分区使用
stream.assignAscendingTimestamps(extractor : scala.Function1[T, scala.Long])
// 供多分区使用
👉 stream.assignTimestampsAndWatermarks(strategy:WatermarkStrategy[T]);内置式
1
2
3
4
5
6
strategy:WatermarkStrategy[T]
def fromSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String): DataStream[T]
)有序流:主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,就是直接拿当前最大的时间戳作为水位线就可以了。一般适用于单分区。
1
2
3
4
5
6
7
8
9
stream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
override def extractTimestamp(e: Event, now: Long) = e.timestamp()
}
)
)👉 乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(FixedAmount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。一般适用于多分区。
1
2
3
4
5
6
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withIdleness(Duration.ofSeconds(60))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(e: Event, now: Long) = e.timestamp();
})
Window 窗口
窗口分配器
WidowAssigner
1
keyedDataStream.window( assigner: WidowAssigner): KeyedStream[OUT, KEY]
窗口分配和计算逻辑
WindowAssigner用来决定某个元素被分配到哪个/哪些窗口中去。
SessionWindowAssigner比较特殊,因为Session Window无法事先确定窗口的范围,是动态改变的。![]()
窗口分配
![]()
窗口计算触发
Trigger 触发器决定了一个窗口何时能够被计算或清除。
每一个窗口都拥有一个属于自己的Trigger,Trigger上的定时器决定一个窗口何时能够被计算或清除。
每当有元素加入该窗口,或者之前注册的定时器超时时,Trigger都会被调用。Trigger触发的结果如下:
- Continue:继续,不做任务操作。
- Fire:触发计算,处理窗口数据。
- Purge:触发清理,移除窗口和窗口中的数据。
- Fire + Purge:触发计算+清理,处理数据并移除窗口和窗口中的数据。
![]()
默认触发规则和窗口类型有关
1
2
3
// 自定义触发规则:需要自定义类实现 Trigger 接口
windowStream.trigger(Trigger<? super T, ? super W> trigger)
:WindowedStream<T, K, W>
窗口过滤器
Evictor 可以理解为窗口数据的过滤器,Evictor可在Window Function执行前或后,从Window中过滤元素。Flink内置了3种窗口数据过滤器。
- CountEvictor:计数过滤器。在Window中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
- DeltaEvictor:阈值过滤器。本质上来说是一个自定义规则,计算窗口中每个数据记录,然后与一个事先定义好的阈值做比较,丢弃超过阈值的数据记录。
- 👉 TimeEvictor:时间过滤器。保留Window中最近一段时间内的元素,并丢弃其余元素。
![]()
窗口类型
👉 TimeWindow
时间窗口
- Tumbling Window 时间对齐,窗口长度固定,没有重叠
- Sliding Window 时间对齐,窗口长度固定,有重叠
- Session Window 时间不对齐,窗口长度不固定,无重叠
CountWindow
固定长度,长度不够不计算
开窗方式
分散:keyBy有效
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
val stream:WindowedStream = keyedDataStream
// 数量窗口
stream.countWindow(size:Long[, slide:Long])
// 滚动窗口
stream.window(
// 滚动【处理时间】窗口
TumblingProcessingTimeWindows.of(
// 滚动【事件时间】窗口
👉 TumblingEventTimeWindows.of(
// 每个时间窗口的持续时间
size:Time
// 窗口开始的偏移量:提前或延迟开始时间
[,offset:Time
// 运行时生成交错偏移量:此策略在【并行运行时】决定每个窗口实例的具体交错偏移量
[,windowStagger:WindowStagger]]
)
)
// 滑动窗口
stream.window(
// 滑动【处理时间】窗口
SlidingProcessingTimeWindows.of(
// 滑动【事件时间】窗口
👉 SlidingEventTimeWindows.of(
size:Time // 窗口时间步长
[,slide:Time // 窗口时间步幅
[,offset:Time]] // 窗口开始偏移量
)
)
stream.window(
// 会话【处理时间】窗口
ProcessingTimeSessionWindows.withGap(
// 会话【事件时间】窗口
EventTimeSessionWindows.withGap(
// 会话间的时间间隙:
// 处理时间:新事件与现有会话窗口中的事件之间的【处理时间差】没有超过会话间隙,则进入会话
// 事件时间:通过水印 Watermark 机制来处理乱序事件,确保即使在事件乱序的情况下也能准确地 // 按照事件时间进行窗口划分,事件时间有序且相邻的事件之间时间差没有超过会话间隙,则进入
size:Time
)
// 会话【处理时间】窗口
ProcessingTimeSessionWindows.withDynamicGap(
// 会话【事件时间】窗口
EventTimeSessionWindows.withDynamicGap(
// 动态间隙
sessionWindowTimeGapExtractor:SessionWindowTimeGapExtractor<T>
)
)
/* 交错窗口 */
enum WindowStagger {
/** 默认模式:所有分区中的所有窗格同时触发。 */
ALIGNED {0},
/**
* 当分区操作符首次接收事件时,交错偏移量从均匀分布U(0, 窗口大小)中采样。
* 当分区操作符首次开始处理事件时,它会为每个分区随机生成一个介于0和窗口大小之间的交错偏移量,
* 这个偏移量将用于在分区之间产生一定的时间或顺序上的差异,
* 可能是为了平衡负载、减少资源争用或实现某种特定的时间交错效果。
*/
RANDOM {(long) (ThreadLocalRandom.current().nextDouble() * windowSize)},
/**
* 当窗口操作符接收到第一个事件时,计算窗口开始时间与当前处理时间之间的差异作为偏移量。
* 在并行处理环境中,不同的并行实例(或称为分区)可能会以不同的时间接收到数据流中的第一个事件。
* 有助于避免所有实例同时开始处理相同时间段的数据时可能出现的瓶颈。
*/
NATURAL {Math.max(0, currentProcessingTime - currentProcessingWindowStart)};
)
聚合:keyBy无效
1
2
3
4
5
val stream:AllWindowedStream = aggDataStream
// 全局数量窗口
.countWindowAll(size:Long, slide:Long)
// 全局事件窗口
.windowAll(assigner: WindowAssigner)
工具类封装
流元素时间特质
1 |
|
水位线和窗口
1 |
|