【Flink】使用水位线实现热门商品排行以及Flink如何处理迟到元素
创始人
2024-02-27 21:26:21
0

文章目录

  • 一 WaterMark
    • 1 水位线特点总结
    • 2 实时热门商品【重点】
      • (1)数据集
      • (2)实现思路
        • a 分流 - 开窗 - 聚合
          • 分流:
          • 开窗:
          • 聚合:
        • b 再分流 -- 统计
          • 再分流:
          • 统计:
      • (3)代码编写
  • 二 处理迟到元素
    • 1 什么是迟到元素
      • (1)代码编写
      • (2)测试
    • 2 处理策略
      • (1)抛弃迟到元素
      • (2)重定向迟到元素
        • a 将迟到数据发送到侧输出流中(不开窗口)
        • b 将迟到数据发送到侧输出流中(开窗口)

一 WaterMark

1 水位线特点总结

  • Flink 认为时间戳小于水位线的事件都已到达。

  • 水位线是一种逻辑时钟。

  • 水位线由程序员编程插入到数据流中。

  • 水位线是一种特殊的事件。

  • 在事件时间的世界里,水位线就是时间。

  • 水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒。

  • 水位线超过窗口结束时间,窗口闭合,默认情况下,迟到元素被抛弃。

  • Flink 会在流的最开始插入一个时间戳为负无穷大的水位线。

  • Flink 会在流的最末尾插入一个时间戳为正无穷大的水位线。

  • Watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退,(Watermark 就是当前数据流的逻辑时钟)。

    水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒,显然单调递增。

  • Watermark 与数据的时间戳相关。

  • 在 Flink 中,Watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。

  • 如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。

  • 而如果 Watermark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题。

2 实时热门商品【重点】

求每个窗口中最热门的商品。

(1)数据集

数据集基本结构如下:用户id,商品id,所属品类id,数据类型(pv或buy),秒级时间戳。

在这里插入图片描述

(2)实现思路

a 分流 - 开窗 - 聚合

统计的结果是每一个窗口里面的每一个商品的pv次数。

  • 分流后变为键控流,键控流比DataStream多了一个泛型key,所以KeyedProcessFunction有三个泛型【key、输入、输出】。
  • 开窗后变为了WindowedStream,WindowedStream比键控流多个一个泛型window,所以ProcessWindowFunction中有四个泛型【输入、输出、key、窗口】。
  • 聚合后又变成了DateStream。

在这里插入图片描述

分流:

在这里插入图片描述

开窗:

在这里插入图片描述

聚合:

在这里插入图片描述

b 再分流 – 统计

再分流:

想统计每个窗口中的实时热门商品,再使用窗口结束时间(开始时间也可以)进行分流,效果是每一个时间窗口中不同商品id的浏览次数。

在这里插入图片描述

统计:

将ItemViewCount存放到列表状态变量中,使用ArrayList进行排序,最终输出。

在这里插入图片描述

(3)代码编写

public class Example7 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv")// 数据流ETL.map(new MapFunction() {@Overridepublic UserBehavior map(String value) throws Exception {String[] arr = value.split(",");return new UserBehavior(arr[0],arr[1],arr[2],arr[3],Long.parseLong(arr[4]) * 1000L);}})// 获取pv数据.filter(r -> r.behavior.equals("pv"))// 分配水位线.assignTimestampsAndWatermarks(// 此数据集针对时间戳已经做了ETL,所以设置延迟时间为0WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(UserBehavior element, long recordTimestamp) {return element.timestamp;}}))// 使用商品id对数据进行分流【分流】.keyBy(r -> r.itemId)// 每隔5分钟想查看过去一小时最热门的商品(滑动窗口)【开窗】.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))// 增量聚合与全窗口聚合结合使用【聚合】.aggregate(new CountAgg(),new WindowResult())// 按照ItemViewCount的windowEnd进行分流,获取同一个窗口的统计信息【分流】.keyBy(r -> r.windowEnd)// 取前3名.process(new TopN(3)).print();env.execute();}// 将每一个到来的ItemViewCount统计信息存储到列表状态中,设置定时器,进行排序public static class TopN extends KeyedProcessFunction{private ListState listState;private Integer n;public TopN(Integer n) {this.n = n;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);listState = getRuntimeContext().getListState(new ListStateDescriptor("list-state", Types.POJO(ItemViewCount.class)));}@Overridepublic void processElement(ItemViewCount value, Context ctx, Collector out) throws Exception {// 每来一条数据直接存储到列表状态中listState.add(value);// 一条流上元素的windowEnd相同,定时器只会注册一次ctx.timerService().registerEventTimeTimer(value.windowEnd + 1L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {super.onTimer(timestamp, ctx, out);// 排序逻辑// 列表状态变量在底层是序列化过的,所以无法针对其直接排序ArrayList itemViewCountArrayList = new ArrayList<>();for(ItemViewCount ivc : listState.get()) itemViewCountArrayList.add(ivc);// 手动对列表状态变量进行GClistState.clear();itemViewCountArrayList.sort(new Comparator() {@Overridepublic int compare(ItemViewCount o1, ItemViewCount o2) {// 降序排列return o2.count.intValue() - o1.count.intValue();}});// 取出窗口结束时间StringBuilder result = new StringBuilder();result.append("======================================\n").append("窗口结束时间:" + new Timestamp(timestamp - 1L)).append("\n");// 取出前几名,对数据ETLfor(int i = 0; i < n; i++){ItemViewCount curr = itemViewCountArrayList.get(i);result.append("第" + (i + 1) + "名的商品id是:【" + curr.itemId).append("】,浏览次数是:【" + curr.count +"】").append("\n");}result.append("======================================\n");out.collect(result.toString());}}public static class WindowResult extends ProcessWindowFunction{@Overridepublic void process(String s, Context context, Iterable elements, Collector out) throws Exception {out.collect(new ItemViewCount(s,elements.iterator().next(),context.window().getStart(),context.window().getEnd()));}}public static class CountAgg implements AggregateFunction {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(UserBehavior value, Long accumulator) {return accumulator + 1L;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}//聚合结果,每一个商品在每一个窗口中的浏览量public static class ItemViewCount{public String itemId;public Long count;public Long windowStart;public Long windowEnd;public ItemViewCount() {}public ItemViewCount(String itemId, Long count, Long windowStart, Long windowEnd) {this.itemId = itemId;this.count = count;this.windowStart = windowStart;this.windowEnd = windowEnd;}@Overridepublic String toString() {return "ItemViewCount{" +"itemId='" + itemId + '\'' +", count=" + count +", windowStart=" + new Timestamp(windowStart) +", windowEnd=" + new Timestamp(windowEnd) +'}';}}public static class UserBehavior{public String userId;public String itemId;public String categoryId;public String behavior;public Long timestamp;public UserBehavior() {}public UserBehavior(String userId, String itemId, String categoryId, String behavior, Long timestamp) {this.userId = userId;this.itemId = itemId;this.categoryId = categoryId;this.behavior = behavior;this.timestamp = timestamp;}@Overridepublic String toString() {return "UserBehavior{" +"userId='" + userId + '\'' +", itemId='" + itemId + '\'' +", categoryId='" + categoryId + '\'' +", behavior='" + behavior + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}}
}

二 处理迟到元素

1 什么是迟到元素

(1)代码编写

迟到元素:到来的数据包含的时间戳小于当前水位线。

具体实例见以下代码:

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("localhost",9999).map(new MapFunction>() {@Overridepublic Tuple2 map(String value) throws Exception {String[] arr = value.split(" ");return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);}}).assignTimestampsAndWatermarks(// 使用延迟时间为0可以使用另一种写法(forBoundedOutOfOrderness(Duration.ofSeconds(0)))WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 element, long recordTimestamp) {return element.f1;}}))// 使用ProcessFunction实现:流处理,且处理的是没有经过keyBy的流,所以只能使用processElement// 不能使用状态变量和定时器// 其中定时器只能在运行时才会报错,即使在编译期写出来编译器也不会报错.process(new ProcessFunction, String>() {@Overridepublic void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {if(value.f1 < ctx.timerService().currentWatermark()){out.collect("【" + value + "】元素迟到了");}else{out.collect("【" + value + "】元素没有迟到");}}}).print();env.execute();
}

(2)测试

依次输入

水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒,显然单调递增。
a 1 【水位线:1 - 0 -1ms = 9999ms】  元素携带时间戳为1s,1s > 负无穷大(当前水位线),没有迟到
a 2 【水位线:2 - 0 -1ms = 19999ms】 元素携带时间戳为2s, 2s > 9999ms(当前水位线),没有迟到
a 1 【水位线:2 - 0 -1ms = 19999ms】 元素携带时间戳为1s, 1s < 19999ms(当前水位线),迟到

结果如下图:

在这里插入图片描述

由上可以看出,机器时间、处理时间是不存在迟到元素的,只有在事件时间中,才会存在迟到元素。对于事件时间来说,水位线就是它的逻辑时钟。

水位线可以用来平衡计算的完整性和延迟两方面。除非选择一种非常保守的水位线策略 (最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则总需要处理迟到的元素。

迟到的元素是指当这个元素来到时,这个元素所对应的窗口已经计算完毕了 (也就是说水位线已经没过窗口结束时间了)。这说明迟到这个特性只针对事件时间。

2 处理策略

DataStream API 提供了三种策略来处理迟到元素

  • 直接抛弃迟到的元素。
  • 将迟到的元素发送到另一条流中去。
  • 可以更新窗口已经计算完的结果,并发出计算结果。

(1)抛弃迟到元素

抛弃迟到的元素是事件时间窗口操作符的默认行为。也就是说一个迟到的元素不会创建一个新的窗口。

process function 可以通过比较迟到元素的时间戳和当前水位线的大小来很轻易的过滤掉迟到元素。

(2)重定向迟到元素

迟到的元素也可以使用旁路输出 (side output) 特性(侧输出流)被重定向到另外的一(或n)条流中去。迟到元素所组成的旁路输出流可以继续处理或者 sink 到持久化设施中去。

a 将迟到数据发送到侧输出流中(不开窗口)

// 定义侧输出流的名字(标签)
private static OutputTag lateElement = new OutputTag("late-element"){};public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator result = env// 自定义数据源.addSource(new SourceFunction>() {@Overridepublic void run(SourceContext> ctx) throws Exception {// 指定时间戳并发送数据,如果两个时间戳不同,以指定的时间戳为准ctx.collectWithTimestamp(Tuple2.of("hello word", 1000L), 1000L);// 发送水位线ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp(Tuple2.of("hello flink", 2000L), 2000L);ctx.emitWatermark(new Watermark(1999L));ctx.collectWithTimestamp(Tuple2.of("hello late", 1000L), 1000L);}@Overridepublic void cancel() {}}).process(new ProcessFunction, String>() {@Overridepublic void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {if (value.f1 < ctx.timerService().currentWatermark()) {// 发送到测输出流ctx.output(lateElement, "迟到元素【" + value + "】已发送到侧输出流:");} else {out.collect("元素【" + value + "】正常到达!");}}});result.print("正常到达的元素:");result.getSideOutput(lateElement).print("侧输出流中的迟到元素:");env.execute();
}

b 将迟到数据发送到侧输出流中(开窗口)

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator result = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) throws Exception {ctx.collectWithTimestamp("a", 1000L);ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp("a", 2000L);ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp("a", 4000L);// 关闭 0-5 秒的窗口:必须销毁窗口,迟到数据才会被发送到侧输出流ctx.emitWatermark(new Watermark(4999L));ctx.collectWithTimestamp("a late", 3000L);}@Overridepublic void cancel() {}}).keyBy(r -> 1)// 开窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 将迟到元素输出到测输出流.sideOutputLateData(new OutputTag("late") {}).process(new ProcessWindowFunction() {@Overridepublic void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception {out.collect("窗口中共有:" + elements.spliterator().getExactSizeIfKnown() + "个元素");}});// 获取正常到达数据result.print("正常元素:");// 获取迟到元素,根据字符串id识别侧输出标签,可以保证其实单例result.getSideOutput(new OutputTag("late"){}).print("迟到元素:");env.execute();
}

相关内容

热门资讯

公牛集团回应宣传语争议:起诉家... 日前,公牛集团就“10户中国家庭,7户用公牛”宣传语引发的争议作出回应。近日,广东中山市家的电器有限...
一次性信用修复政策来了!细则详... 今天(22日),中国人民银行发布通知,实施一次性信用修复政策,支持信用受损但积极还款的个人高效便捷重...
《中国人民银行关于实施一次性信... 1.符合哪些条件的逾期信息可以适用一次性信用修复政策,作不予展示处理? 一次性信用修复政策主要有四个...
【光明论坛】实施更加积极有为的... 【光明论坛】 2025年中央经济工作会议定调明年经济工作政策取向。会议强调,“实施更加积极有为的宏观...
央行:实施一次性信用修复政策 人民财讯12月22日电,为积极应对新冠疫情后续影响,支持信用受损但积极还款的个人高效便捷重塑信用,助...
中国人民银行发布关于实施一次性... 中国人民银行发布关于实施一次性信用修复政策有关安排的通知,其中提到,对于2020年1月1日至2025...
支持个人信用重塑!央行发布一次... 本文转自【新华社】; 为支持信用受损但积极还款的个人高效便捷重塑信用,12月22日中国人民银行对外...