Flink 认为时间戳小于水位线的事件都已到达。
水位线是一种逻辑时钟。
水位线由程序员编程插入到数据流中。
水位线是一种特殊的事件。
在事件时间的世界里,水位线就是时间。
水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒。
水位线超过窗口结束时间,窗口闭合,默认情况下,迟到元素被抛弃。
Flink 会在流的最开始插入一个时间戳为负无穷大的水位线。
Flink 会在流的最末尾插入一个时间戳为正无穷大的水位线。
Watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退,(Watermark 就是当前数据流的逻辑时钟)。
水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒,显然单调递增。
Watermark 与数据的时间戳相关。
在 Flink 中,Watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
而如果 Watermark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题。
求每个窗口中最热门的商品。
数据集基本结构如下:用户id,商品id,所属品类id,数据类型(pv或buy),秒级时间戳。
统计的结果是每一个窗口里面的每一个商品的pv次数。
想统计每个窗口中的实时热门商品,再使用窗口结束时间(开始时间也可以)进行分流,效果是每一个时间窗口中不同商品id的浏览次数。
将ItemViewCount存放到列表状态变量中,使用ArrayList进行排序,最终输出。
public class Example7 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv")// 数据流ETL.map(new MapFunction() {@Overridepublic UserBehavior map(String value) throws Exception {String[] arr = value.split(",");return new UserBehavior(arr[0],arr[1],arr[2],arr[3],Long.parseLong(arr[4]) * 1000L);}})// 获取pv数据.filter(r -> r.behavior.equals("pv"))// 分配水位线.assignTimestampsAndWatermarks(// 此数据集针对时间戳已经做了ETL,所以设置延迟时间为0WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(UserBehavior element, long recordTimestamp) {return element.timestamp;}}))// 使用商品id对数据进行分流【分流】.keyBy(r -> r.itemId)// 每隔5分钟想查看过去一小时最热门的商品(滑动窗口)【开窗】.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))// 增量聚合与全窗口聚合结合使用【聚合】.aggregate(new CountAgg(),new WindowResult())// 按照ItemViewCount的windowEnd进行分流,获取同一个窗口的统计信息【分流】.keyBy(r -> r.windowEnd)// 取前3名.process(new TopN(3)).print();env.execute();}// 将每一个到来的ItemViewCount统计信息存储到列表状态中,设置定时器,进行排序public static class TopN extends KeyedProcessFunction{private ListState listState;private Integer n;public TopN(Integer n) {this.n = n;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);listState = getRuntimeContext().getListState(new ListStateDescriptor("list-state", Types.POJO(ItemViewCount.class)));}@Overridepublic void processElement(ItemViewCount value, Context ctx, Collector out) throws Exception {// 每来一条数据直接存储到列表状态中listState.add(value);// 一条流上元素的windowEnd相同,定时器只会注册一次ctx.timerService().registerEventTimeTimer(value.windowEnd + 1L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {super.onTimer(timestamp, ctx, out);// 排序逻辑// 列表状态变量在底层是序列化过的,所以无法针对其直接排序ArrayList itemViewCountArrayList = new ArrayList<>();for(ItemViewCount ivc : listState.get()) itemViewCountArrayList.add(ivc);// 手动对列表状态变量进行GClistState.clear();itemViewCountArrayList.sort(new Comparator() {@Overridepublic int compare(ItemViewCount o1, ItemViewCount o2) {// 降序排列return o2.count.intValue() - o1.count.intValue();}});// 取出窗口结束时间StringBuilder result = new StringBuilder();result.append("======================================\n").append("窗口结束时间:" + new Timestamp(timestamp - 1L)).append("\n");// 取出前几名,对数据ETLfor(int i = 0; i < n; i++){ItemViewCount curr = itemViewCountArrayList.get(i);result.append("第" + (i + 1) + "名的商品id是:【" + curr.itemId).append("】,浏览次数是:【" + curr.count +"】").append("\n");}result.append("======================================\n");out.collect(result.toString());}}public static class WindowResult extends ProcessWindowFunction{@Overridepublic void process(String s, Context context, Iterable elements, Collector out) throws Exception {out.collect(new ItemViewCount(s,elements.iterator().next(),context.window().getStart(),context.window().getEnd()));}}public static class CountAgg implements AggregateFunction {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(UserBehavior value, Long accumulator) {return accumulator + 1L;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}//聚合结果,每一个商品在每一个窗口中的浏览量public static class ItemViewCount{public String itemId;public Long count;public Long windowStart;public Long windowEnd;public ItemViewCount() {}public ItemViewCount(String itemId, Long count, Long windowStart, Long windowEnd) {this.itemId = itemId;this.count = count;this.windowStart = windowStart;this.windowEnd = windowEnd;}@Overridepublic String toString() {return "ItemViewCount{" +"itemId='" + itemId + '\'' +", count=" + count +", windowStart=" + new Timestamp(windowStart) +", windowEnd=" + new Timestamp(windowEnd) +'}';}}public static class UserBehavior{public String userId;public String itemId;public String categoryId;public String behavior;public Long timestamp;public UserBehavior() {}public UserBehavior(String userId, String itemId, String categoryId, String behavior, Long timestamp) {this.userId = userId;this.itemId = itemId;this.categoryId = categoryId;this.behavior = behavior;this.timestamp = timestamp;}@Overridepublic String toString() {return "UserBehavior{" +"userId='" + userId + '\'' +", itemId='" + itemId + '\'' +", categoryId='" + categoryId + '\'' +", behavior='" + behavior + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}}
}
迟到元素:到来的数据包含的时间戳小于当前水位线。
具体实例见以下代码:
public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("localhost",9999).map(new MapFunction>() {@Overridepublic Tuple2 map(String value) throws Exception {String[] arr = value.split(" ");return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);}}).assignTimestampsAndWatermarks(// 使用延迟时间为0可以使用另一种写法(forBoundedOutOfOrderness(Duration.ofSeconds(0)))WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 element, long recordTimestamp) {return element.f1;}}))// 使用ProcessFunction实现:流处理,且处理的是没有经过keyBy的流,所以只能使用processElement// 不能使用状态变量和定时器// 其中定时器只能在运行时才会报错,即使在编译期写出来编译器也不会报错.process(new ProcessFunction, String>() {@Overridepublic void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {if(value.f1 < ctx.timerService().currentWatermark()){out.collect("【" + value + "】元素迟到了");}else{out.collect("【" + value + "】元素没有迟到");}}}).print();env.execute();
}
依次输入
水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒,显然单调递增。
a 1 【水位线:1 - 0 -1ms = 9999ms】 元素携带时间戳为1s,1s > 负无穷大(当前水位线),没有迟到
a 2 【水位线:2 - 0 -1ms = 19999ms】 元素携带时间戳为2s, 2s > 9999ms(当前水位线),没有迟到
a 1 【水位线:2 - 0 -1ms = 19999ms】 元素携带时间戳为1s, 1s < 19999ms(当前水位线),迟到
结果如下图:
由上可以看出,机器时间、处理时间是不存在迟到元素的,只有在事件时间中,才会存在迟到元素。对于事件时间来说,水位线就是它的逻辑时钟。
水位线可以用来平衡计算的完整性和延迟两方面。除非选择一种非常保守的水位线策略 (最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则总需要处理迟到的元素。
迟到的元素是指当这个元素来到时,这个元素所对应的窗口已经计算完毕了 (也就是说水位线已经没过窗口结束时间了)。这说明迟到这个特性只针对事件时间。
DataStream API 提供了三种策略来处理迟到元素
抛弃迟到的元素是事件时间窗口操作符的默认行为。也就是说一个迟到的元素不会创建一个新的窗口。
process function 可以通过比较迟到元素的时间戳和当前水位线的大小来很轻易的过滤掉迟到元素。
迟到的元素也可以使用旁路输出 (side output) 特性(侧输出流)被重定向到另外的一(或n)条流中去。迟到元素所组成的旁路输出流可以继续处理或者 sink 到持久化设施中去。
// 定义侧输出流的名字(标签)
private static OutputTag lateElement = new OutputTag("late-element"){};public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator result = env// 自定义数据源.addSource(new SourceFunction>() {@Overridepublic void run(SourceContext> ctx) throws Exception {// 指定时间戳并发送数据,如果两个时间戳不同,以指定的时间戳为准ctx.collectWithTimestamp(Tuple2.of("hello word", 1000L), 1000L);// 发送水位线ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp(Tuple2.of("hello flink", 2000L), 2000L);ctx.emitWatermark(new Watermark(1999L));ctx.collectWithTimestamp(Tuple2.of("hello late", 1000L), 1000L);}@Overridepublic void cancel() {}}).process(new ProcessFunction, String>() {@Overridepublic void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {if (value.f1 < ctx.timerService().currentWatermark()) {// 发送到测输出流ctx.output(lateElement, "迟到元素【" + value + "】已发送到侧输出流:");} else {out.collect("元素【" + value + "】正常到达!");}}});result.print("正常到达的元素:");result.getSideOutput(lateElement).print("侧输出流中的迟到元素:");env.execute();
}
public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator result = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) throws Exception {ctx.collectWithTimestamp("a", 1000L);ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp("a", 2000L);ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp("a", 4000L);// 关闭 0-5 秒的窗口:必须销毁窗口,迟到数据才会被发送到侧输出流ctx.emitWatermark(new Watermark(4999L));ctx.collectWithTimestamp("a late", 3000L);}@Overridepublic void cancel() {}}).keyBy(r -> 1)// 开窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 将迟到元素输出到测输出流.sideOutputLateData(new OutputTag("late") {}).process(new ProcessWindowFunction() {@Overridepublic void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception {out.collect("窗口中共有:" + elements.spliterator().getExactSizeIfKnown() + "个元素");}});// 获取正常到达数据result.print("正常元素:");// 获取迟到元素,根据字符串id识别侧输出标签,可以保证其实单例result.getSideOutput(new OutputTag("late"){}).print("迟到元素:");env.execute();
}