水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印,一般人为添加的消息的水印都会比当前消息的事件时间小一些。
窗口是否关闭按照水印时间来判断,但原有事件时间不会被修改,窗口的边界依旧是事件时间来决定。
-
水印并不会影响原有Eventtime
-
当数据流添加水印后,会按照水印时间来触发窗口计算
-
一般会设置水印时间,比Eventtime小一些(一般几秒钟)
-
当接收到的水印时间 >= 窗口的endTime且窗口内有数据,则触发计算
水印(水印时间)的计算: 事件时间 – 设置的最大允许延迟时间 = 水印时间
比如,事件时间是10分30秒, 最大延迟时间是2秒,那么水印时间就是10分28秒
举例:
窗口5秒,延迟(水印)3秒,按照事件时间计算
-
数据事件时间3, 落入窗口0-5.水印时间0
-
来一条数据事件时间7, 落入窗口5-10,水印时间4
-
来一条数据事件时间4,落入窗口0-5,水印时间1
-
来一条数据事件时间8,落入窗口5-10,水印时间5
这一条数据水印时间大于等于 窗口0-5的窗口结束时间。
满足了对窗口0-5的提交,这个窗口关闭,并触发数据计算
可以看出,第三条数据,其是延迟数据,它的事件时间是4,却来的比事件时间为7的数据还要晚。
但是因为水印的机制,这个数据未错过它的窗口,依旧成功进入属于它的窗口并且被计算
这就是水印的功能:在不影响按照事件时间判断数据属于哪个窗口的前提下,延迟某个窗口的关闭时间,让其等待一会儿延迟数据。
在多并行度下,每个并行有一个水印
比如并行度是6,那么程序中就有6个watermark
分别属于这6个并行度(线程)
那么,触发条件以6个水印中最小的那个为准
比如, 有个窗口是0-5
其中5个并行度的水印都超过了5
但有一个并行度的水印是3
那么,不管另外5个并行度中的水印达到了多大,都不会触发
因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件
一个程序中有多少个水印和并行度有关,和keyby无关
也就是
比如有单词hadoop spark
按照keyby,会分成hadoop组 和spark组
但是这两个组是共用1个水印的
hadoop来的数据满足了触发条件,会将spark组的数据也触发