Flink中,如何使用水印解决网络延迟问题?

水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印,一般人为添加的消息的水印都会比当前消息的事件时间小一些。

窗口是否关闭按照水印时间来判断,但原有事件时间不会被修改,窗口的边界依旧是事件时间来决定。

  • 水印并不会影响原有Eventtime

  • 当数据流添加水印后,会按照水印时间来触发窗口计算

  • 一般会设置水印时间,比Eventtime小一些(一般几秒钟)

  • 当接收到的水印时间 >= 窗口的endTime且窗口内有数据,则触发计算

水印(水印时间)的计算: 事件时间 – 设置的最大允许延迟时间 = 水印时间

比如,事件时间是10分30秒, 最大延迟时间是2秒,那么水印时间就是10分28秒

举例:

窗口5秒,延迟(水印)3秒,按照事件时间计算

  • 数据事件时间3, 落入窗口0-5.水印时间0

  • 来一条数据事件时间7, 落入窗口5-10,水印时间4

  • 来一条数据事件时间4,落入窗口0-5,水印时间1

  • 来一条数据事件时间8,落入窗口5-10,水印时间5

这一条数据水印时间大于等于 窗口0-5的窗口结束时间。

满足了对窗口0-5的提交,这个窗口关闭,并触发数据计算

可以看出,第三条数据,其是延迟数据,它的事件时间是4,却来的比事件时间为7的数据还要晚。

但是因为水印的机制,这个数据未错过它的窗口,依旧成功进入属于它的窗口并且被计算

这就是水印的功能:在不影响按照事件时间判断数据属于哪个窗口的前提下,延迟某个窗口的关闭时间,让其等待一会儿延迟数据。

在多并行度下,每个并行有一个水印

比如并行度是6,那么程序中就有6个watermark

分别属于这6个并行度(线程)

那么,触发条件以6个水印中最小的那个为准

比如, 有个窗口是0-5

其中5个并行度的水印都超过了5

但有一个并行度的水印是3

那么,不管另外5个并行度中的水印达到了多大,都不会触发

因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件

一个程序中有多少个水印和并行度有关,和keyby无关

也就是

比如有单词hadoop spark

按照keyby,会分成hadoop组 和spark组

但是这两个组是共用1个水印的

hadoop来的数据满足了触发条件,会将spark组的数据也触发

资源下载: