Flink 的时间属性
在日常的流式数据处理中,很多操作都需要依赖时间属性,Flink根据时间产生位置的差异,将时间分成了三种类型分别是
- Event Time:事件在设备上发生的时间
- Processing Time:事件在系统中被处理的时间
- Ingestion Time:数据进入到流计算处理系统的时间
对于 Event Time 来说,它是一个不可变的属性,它在流计算系统中各个节点流转,始终不变,Processing Time 则依赖于处理该事件机器的本地时钟,在系统中流转,该属性是变化的,而 Ingestion Time 则是数据进入到流计算系统时的时间。
Window
窗口是流计算常用的计算方式之一,Flink用窗口将无限连续的数据流按照一定规则划分成为一个个有限的数据集,我们可以利用窗口对窗口内的数据进行聚合运算,从而得到一定范围内的一些数值统计结果
在Flink中Window只有一个属性,就是maxTimestamp,表示这个窗口的最大边界,它的实现类分别为:
- GlobalWindow:全局窗口,单例,最大的maxTimestamp为Long的最大值
- TimeWindow:表示一个时间间隔,拥有[start, end)的开始时间和结束时间,
Evictor
它剔除元素的时机是在触发器触发之后,在窗口被处理(apply windowFunction)之前被触发,它在Flink的实现类为:
- CountEvictor:保持一定数量的Evictor
- DeltaEvictor:待了解
- TimeEvictor:基于保留时间作为剔除规则,将大于保存时间的elements剔除
Windows Assigner
在流式计算的系统中,WindowAssigner负责分配0或者多个Window给一个element,我们可以根据WindowAssigner分配窗口的方式归纳为四类:
- Tumbling Windows(滚动窗口):根据固定时间或大小划分,窗口不重叠,适合按照固定大小周期统计某一个指标
- Sliding Windows(滑动窗口):在滚动窗口的基础上,增加滑动属性,允许窗口发生重叠,适合根据用户设定的统计频率计算指定窗口的大小统计指标,例如每隔30s统计最近10min的用户
- Session Windows(会话窗口):主要将某个时间段活跃较高的数据聚成一个窗口,触发条件是Session Gap,适合非连续数据处理或周期产生的数据
- Global Windows (全局窗口):将相同的Key分配到单个窗口中计算结果,没有开始时间和结束时间,需要借助Trigger触发,否则数据一直保存在内存中
Windows Trigger
数据进入到窗口之后,窗口是否触发WindowFunction计算,取决于窗口是否满足触发条件,每种类型的窗口都有对应的触发机制,常见的Trigger有:
- CountTrigger:通过判断接入的数据量是否超过设定的阈值决定是否触发窗口
- EventTimeTrigger:通过比较WaterMark和窗口的EndTime判断窗口是否触发
- ProcessingTimeTrigger:通过比较ProcessTime和窗口的EndTime判断窗口是否触发
- 其他
我们可以看下一般实现一个Trigger需要大约实现哪些方法:
- onElement:针对每一个进入窗口的element进行触发操作
- onProcessingTime:系统时间时间定时器触发
- onEventTime:事件时间定时器触发
- onMerge:对多个窗口和状态进行合并
- clear:清除窗口和状态
我们通过Trigger返回的TriggerResult的类型决定是否触发端口:
- CONTINUE:等待,不触发窗口
- FIRE_AND_PURGE:触发窗口计算同时清除元素
- FIRE:触发窗口计算,不清除元素
- PURGE:清除window中的元素
WindowFunction
说完了Trigger之后,我们先简单的了解下WindowFunction,它可以按照计算原理分成了两大类:
- 增量聚合函数:ReduceFuction, AggregateFunction, FlodFunction
- 全量窗口函数:ProcessWindowFuction
增量聚合函数基于中间状态计算结果,窗口只维护中间状态,不需要换成原数据,而全量窗口需要维护原始数据,如果接入的数据量或者时间较大,可能会导致程序的性能下降
WaterMark
因为基于 Event Time 的模式下,数据由于网络延时等原因,往往不能准时到达,所以导致数据进入到流计算系统时会出现乱序的情况, 所以需要一种手段去衡量,当前数据处理的进度,所以 WaterMark 就是解决这个问题而设计,它表示了小于 WaterMark 的时间的数据已经到达,如果 WaterMark 的时间超过了 Window 的EndTime,就会触发窗口的计算
WaterMark是如何生成的?
目前Flink支持两种方式指定 Timestamps 和生成 WaterMark:
- 在Source Function中定义,SourceContenxt中可以调用, ctx.emitWatermark 方法进行 Watermark 的传递
- 自定义Timestamp Assigner 和 Watermark Generator 生成
WaterMark的生成形式上分为两种类型:
- Periodic WaterMarks:根据事件间隔周期性的生成WaterMark,例如
DataStream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
- Punctuated WaterMarks:依赖于数据流中的特殊元素生成WaterMark,例如
DataStream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
对于AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks,最终都会生成一个TimestampsAndXXXWatermarksOperator加入到transform中,例如:
1 | public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( |
WindowOperator
因为关于窗口的操作基本都在WindowOperator中,所以,今天我们通过一个简单的例子:org.apache.flink.streaming.examples.join.WindowJoin 这个例子分析WindowOperator究竟做了什么,这里是主要的代码
1 | public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin( |
通过上面的例子,两个DataStream进行join操作产生JoinedStreams,而JoinedStreams底层的实现实际调用了DataStream1.coGroup(DataStream2)产生CoGroupedStreams,而最终将数据封装成TaggedUnion类型,将两个流进行union,然后转成KeyedStream,这样两个数据流所有的相同的key都会汇聚到相同的window,最后调用window方法,得到WindowedStream,然后传入WindowFunction,进行函数计算
1 | public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) { |
在最后传入的CoGroupWindowFunction中,它最终会被封装为InternalIterableWindowFunction 和 WindowAssigner、trigger一起放入到WindowOperator里面
1 | ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents", |
我们可以先简单的看下WindowOperator有哪些必要的属性:
- WindowAssigner:分配element到对应的Window
- Trigger:触发window计算
- StateDescriptor(windowStateDescriptor):窗口状态的描述符(window-contents)
- InternalAppendingState(windowState)
- InternalTimerService:用于注册定时器
接下来我们简单的看下当一个element来临时,它的处理逻辑:
通过windowAssigner为element找到对应的窗口集合
将当前的数据加入到windowState(ListState)中
构建这个Key的triggerContext,调用onElement的方法,返回TriggerResult
如果TriggerResult是FIRE类型,则执行窗口计算emitWindowContents,否则将window注册到internalTimerService中,等待触发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
...
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}… }
Operator中的internalTimerService,主要由InternalTimeServiceManager管理,InternalTimeServiceManager的主要职责有:
- 维护了一个timerServices的HashMap,例如这里的(window-timers,InternalTimerServiceImpl)
- 当Operator进行Snapshot时,将数据序列化到外界存储中
- 当Operator进行Restore时,恢复相关的数据结构
- 当WaterMark到来时,遍历执行InternalTimerServiceImpl的advanceWatermark方法,从而触发Triggerable实现类的onEventTime的实现逻辑
InternalTimerServiceImpl(internalTimerService)内部维护了两个优先级队列eventTimeTimersQueue,processingTimeTimersQueue 用来存储对应的Timer,对于Event time模式来说,当Watermark到来时,会找出队列中已经排好序的timer调用Triggerable.onEventTime(WindowOperator也实现了Triggerable接口),接下来通过Trigger.onEventTime去判断TriggerResult去决定触发窗口
1 | OneInputStreamTask.emitWatermark |
而对于ProcessingTime模式来说,不需要外界触发,通过内部的一个ScheduledThreadPoolExecutor,在registerTimer时,将定时任务的callback函数注册到定时器中,到时间则触发,最终也是调用Triggerable.onProcessingTime方法,触发窗口计算
总 结
这边文章是一直以来的一些笔记总结,从Flink的时间概念开始,到和时间相关的一些特性,例如窗口,和触发器,最终通过WindowJoin这个例子简单的了解了WindowOperator在不同的时间类型下是如何工作的,进一步的加深对Flink的理解