【Flink】使用水位线实现热门商品排行以及Flink如何处理迟到元素
创始人
2024-02-27 21:26:21
0

文章目录

  • 一 WaterMark
    • 1 水位线特点总结
    • 2 实时热门商品【重点】
      • (1)数据集
      • (2)实现思路
        • a 分流 - 开窗 - 聚合
          • 分流:
          • 开窗:
          • 聚合:
        • b 再分流 -- 统计
          • 再分流:
          • 统计:
      • (3)代码编写
  • 二 处理迟到元素
    • 1 什么是迟到元素
      • (1)代码编写
      • (2)测试
    • 2 处理策略
      • (1)抛弃迟到元素
      • (2)重定向迟到元素
        • a 将迟到数据发送到侧输出流中(不开窗口)
        • b 将迟到数据发送到侧输出流中(开窗口)

一 WaterMark

1 水位线特点总结

  • Flink 认为时间戳小于水位线的事件都已到达。

  • 水位线是一种逻辑时钟。

  • 水位线由程序员编程插入到数据流中。

  • 水位线是一种特殊的事件。

  • 在事件时间的世界里,水位线就是时间。

  • 水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒。

  • 水位线超过窗口结束时间,窗口闭合,默认情况下,迟到元素被抛弃。

  • Flink 会在流的最开始插入一个时间戳为负无穷大的水位线。

  • Flink 会在流的最末尾插入一个时间戳为正无穷大的水位线。

  • Watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退,(Watermark 就是当前数据流的逻辑时钟)。

    水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒,显然单调递增。

  • Watermark 与数据的时间戳相关。

  • 在 Flink 中,Watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。

  • 如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。

  • 而如果 Watermark 到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题。

2 实时热门商品【重点】

求每个窗口中最热门的商品。

(1)数据集

数据集基本结构如下:用户id,商品id,所属品类id,数据类型(pv或buy),秒级时间戳。

在这里插入图片描述

(2)实现思路

a 分流 - 开窗 - 聚合

统计的结果是每一个窗口里面的每一个商品的pv次数。

  • 分流后变为键控流,键控流比DataStream多了一个泛型key,所以KeyedProcessFunction有三个泛型【key、输入、输出】。
  • 开窗后变为了WindowedStream,WindowedStream比键控流多个一个泛型window,所以ProcessWindowFunction中有四个泛型【输入、输出、key、窗口】。
  • 聚合后又变成了DateStream。

在这里插入图片描述

分流:

在这里插入图片描述

开窗:

在这里插入图片描述

聚合:

在这里插入图片描述

b 再分流 – 统计

再分流:

想统计每个窗口中的实时热门商品,再使用窗口结束时间(开始时间也可以)进行分流,效果是每一个时间窗口中不同商品id的浏览次数。

在这里插入图片描述

统计:

将ItemViewCount存放到列表状态变量中,使用ArrayList进行排序,最终输出。

在这里插入图片描述

(3)代码编写

public class Example7 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv")// 数据流ETL.map(new MapFunction() {@Overridepublic UserBehavior map(String value) throws Exception {String[] arr = value.split(",");return new UserBehavior(arr[0],arr[1],arr[2],arr[3],Long.parseLong(arr[4]) * 1000L);}})// 获取pv数据.filter(r -> r.behavior.equals("pv"))// 分配水位线.assignTimestampsAndWatermarks(// 此数据集针对时间戳已经做了ETL,所以设置延迟时间为0WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(UserBehavior element, long recordTimestamp) {return element.timestamp;}}))// 使用商品id对数据进行分流【分流】.keyBy(r -> r.itemId)// 每隔5分钟想查看过去一小时最热门的商品(滑动窗口)【开窗】.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))// 增量聚合与全窗口聚合结合使用【聚合】.aggregate(new CountAgg(),new WindowResult())// 按照ItemViewCount的windowEnd进行分流,获取同一个窗口的统计信息【分流】.keyBy(r -> r.windowEnd)// 取前3名.process(new TopN(3)).print();env.execute();}// 将每一个到来的ItemViewCount统计信息存储到列表状态中,设置定时器,进行排序public static class TopN extends KeyedProcessFunction{private ListState listState;private Integer n;public TopN(Integer n) {this.n = n;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);listState = getRuntimeContext().getListState(new ListStateDescriptor("list-state", Types.POJO(ItemViewCount.class)));}@Overridepublic void processElement(ItemViewCount value, Context ctx, Collector out) throws Exception {// 每来一条数据直接存储到列表状态中listState.add(value);// 一条流上元素的windowEnd相同,定时器只会注册一次ctx.timerService().registerEventTimeTimer(value.windowEnd + 1L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {super.onTimer(timestamp, ctx, out);// 排序逻辑// 列表状态变量在底层是序列化过的,所以无法针对其直接排序ArrayList itemViewCountArrayList = new ArrayList<>();for(ItemViewCount ivc : listState.get()) itemViewCountArrayList.add(ivc);// 手动对列表状态变量进行GClistState.clear();itemViewCountArrayList.sort(new Comparator() {@Overridepublic int compare(ItemViewCount o1, ItemViewCount o2) {// 降序排列return o2.count.intValue() - o1.count.intValue();}});// 取出窗口结束时间StringBuilder result = new StringBuilder();result.append("======================================\n").append("窗口结束时间:" + new Timestamp(timestamp - 1L)).append("\n");// 取出前几名,对数据ETLfor(int i = 0; i < n; i++){ItemViewCount curr = itemViewCountArrayList.get(i);result.append("第" + (i + 1) + "名的商品id是:【" + curr.itemId).append("】,浏览次数是:【" + curr.count +"】").append("\n");}result.append("======================================\n");out.collect(result.toString());}}public static class WindowResult extends ProcessWindowFunction{@Overridepublic void process(String s, Context context, Iterable elements, Collector out) throws Exception {out.collect(new ItemViewCount(s,elements.iterator().next(),context.window().getStart(),context.window().getEnd()));}}public static class CountAgg implements AggregateFunction {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(UserBehavior value, Long accumulator) {return accumulator + 1L;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}//聚合结果,每一个商品在每一个窗口中的浏览量public static class ItemViewCount{public String itemId;public Long count;public Long windowStart;public Long windowEnd;public ItemViewCount() {}public ItemViewCount(String itemId, Long count, Long windowStart, Long windowEnd) {this.itemId = itemId;this.count = count;this.windowStart = windowStart;this.windowEnd = windowEnd;}@Overridepublic String toString() {return "ItemViewCount{" +"itemId='" + itemId + '\'' +", count=" + count +", windowStart=" + new Timestamp(windowStart) +", windowEnd=" + new Timestamp(windowEnd) +'}';}}public static class UserBehavior{public String userId;public String itemId;public String categoryId;public String behavior;public Long timestamp;public UserBehavior() {}public UserBehavior(String userId, String itemId, String categoryId, String behavior, Long timestamp) {this.userId = userId;this.itemId = itemId;this.categoryId = categoryId;this.behavior = behavior;this.timestamp = timestamp;}@Overridepublic String toString() {return "UserBehavior{" +"userId='" + userId + '\'' +", itemId='" + itemId + '\'' +", categoryId='" + categoryId + '\'' +", behavior='" + behavior + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}}
}

二 处理迟到元素

1 什么是迟到元素

(1)代码编写

迟到元素:到来的数据包含的时间戳小于当前水位线。

具体实例见以下代码:

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("localhost",9999).map(new MapFunction>() {@Overridepublic Tuple2 map(String value) throws Exception {String[] arr = value.split(" ");return Tuple2.of(arr[0],Long.parseLong(arr[1]) * 1000L);}}).assignTimestampsAndWatermarks(// 使用延迟时间为0可以使用另一种写法(forBoundedOutOfOrderness(Duration.ofSeconds(0)))WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 element, long recordTimestamp) {return element.f1;}}))// 使用ProcessFunction实现:流处理,且处理的是没有经过keyBy的流,所以只能使用processElement// 不能使用状态变量和定时器// 其中定时器只能在运行时才会报错,即使在编译期写出来编译器也不会报错.process(new ProcessFunction, String>() {@Overridepublic void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {if(value.f1 < ctx.timerService().currentWatermark()){out.collect("【" + value + "】元素迟到了");}else{out.collect("【" + value + "】元素没有迟到");}}}).print();env.execute();
}

(2)测试

依次输入

水位线 = 观察到的最大时间戳 - 最大延迟时间 - 1 毫秒,显然单调递增。
a 1 【水位线:1 - 0 -1ms = 9999ms】  元素携带时间戳为1s,1s > 负无穷大(当前水位线),没有迟到
a 2 【水位线:2 - 0 -1ms = 19999ms】 元素携带时间戳为2s, 2s > 9999ms(当前水位线),没有迟到
a 1 【水位线:2 - 0 -1ms = 19999ms】 元素携带时间戳为1s, 1s < 19999ms(当前水位线),迟到

结果如下图:

在这里插入图片描述

由上可以看出,机器时间、处理时间是不存在迟到元素的,只有在事件时间中,才会存在迟到元素。对于事件时间来说,水位线就是它的逻辑时钟。

水位线可以用来平衡计算的完整性和延迟两方面。除非选择一种非常保守的水位线策略 (最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则总需要处理迟到的元素。

迟到的元素是指当这个元素来到时,这个元素所对应的窗口已经计算完毕了 (也就是说水位线已经没过窗口结束时间了)。这说明迟到这个特性只针对事件时间。

2 处理策略

DataStream API 提供了三种策略来处理迟到元素

  • 直接抛弃迟到的元素。
  • 将迟到的元素发送到另一条流中去。
  • 可以更新窗口已经计算完的结果,并发出计算结果。

(1)抛弃迟到元素

抛弃迟到的元素是事件时间窗口操作符的默认行为。也就是说一个迟到的元素不会创建一个新的窗口。

process function 可以通过比较迟到元素的时间戳和当前水位线的大小来很轻易的过滤掉迟到元素。

(2)重定向迟到元素

迟到的元素也可以使用旁路输出 (side output) 特性(侧输出流)被重定向到另外的一(或n)条流中去。迟到元素所组成的旁路输出流可以继续处理或者 sink 到持久化设施中去。

a 将迟到数据发送到侧输出流中(不开窗口)

// 定义侧输出流的名字(标签)
private static OutputTag lateElement = new OutputTag("late-element"){};public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator result = env// 自定义数据源.addSource(new SourceFunction>() {@Overridepublic void run(SourceContext> ctx) throws Exception {// 指定时间戳并发送数据,如果两个时间戳不同,以指定的时间戳为准ctx.collectWithTimestamp(Tuple2.of("hello word", 1000L), 1000L);// 发送水位线ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp(Tuple2.of("hello flink", 2000L), 2000L);ctx.emitWatermark(new Watermark(1999L));ctx.collectWithTimestamp(Tuple2.of("hello late", 1000L), 1000L);}@Overridepublic void cancel() {}}).process(new ProcessFunction, String>() {@Overridepublic void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {if (value.f1 < ctx.timerService().currentWatermark()) {// 发送到测输出流ctx.output(lateElement, "迟到元素【" + value + "】已发送到侧输出流:");} else {out.collect("元素【" + value + "】正常到达!");}}});result.print("正常到达的元素:");result.getSideOutput(lateElement).print("侧输出流中的迟到元素:");env.execute();
}

b 将迟到数据发送到侧输出流中(开窗口)

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator result = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) throws Exception {ctx.collectWithTimestamp("a", 1000L);ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp("a", 2000L);ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp("a", 4000L);// 关闭 0-5 秒的窗口:必须销毁窗口,迟到数据才会被发送到侧输出流ctx.emitWatermark(new Watermark(4999L));ctx.collectWithTimestamp("a late", 3000L);}@Overridepublic void cancel() {}}).keyBy(r -> 1)// 开窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 将迟到元素输出到测输出流.sideOutputLateData(new OutputTag("late") {}).process(new ProcessWindowFunction() {@Overridepublic void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception {out.collect("窗口中共有:" + elements.spliterator().getExactSizeIfKnown() + "个元素");}});// 获取正常到达数据result.print("正常元素:");// 获取迟到元素,根据字符串id识别侧输出标签,可以保证其实单例result.getSideOutput(new OutputTag("late"){}).print("迟到元素:");env.execute();
}

相关内容

热门资讯

牛市狂奔他按兵不动!公募老将遭... 本报(chinatimes.net.cn)记者栗鹏菲 叶青 北京报道 在A股市场持续走强的2025年...
美联储主席:限制性政策或将调整... 当地时间2025年8月22日,美国怀俄明州杰克逊霍尔,美联储主席鲍威尔在杰克逊霍尔附近的提顿国家公园...
和前妻婚姻存续期内,男子向现任... 婚前,男子沈某先后与高某、马某非婚生子,后又相继与高某、马某结婚。在与高某婚姻关系存续期间,他向马某...
原创 妻... 据法院披露,李先生(化姓)与张女士(化姓)经人介绍相识恋爱,一年半后,双方登记结婚,并于2007年生...
涉嫌严重违纪违法,王莉霞何琼妹... 中央纪委国家监委8月22日消息,内蒙古自治区党委副书记、自治区政府主席王莉霞涉嫌严重违纪违法,目前正...
这钱花得值!拜仁2.18亿从英... 直播吧08月23日讯 在凌晨进行的新赛季德甲揭幕战上,拜仁6-0大胜RB莱比锡取得开门红,凯恩、奥利...
海岛花式度夏 海南暑期旅游人气... 新华社海口8月22日电(记者 陈子薇)落地海南,一口清补凉入喉,暑气瞬间消了一半。今年暑期,“到海南...
纪念中国人民抗日战争暨世界反法... 人民网平壤8月23日电 (记者刘融)8月22日,旅朝华侨、留学生共同举办纪念中国人民抗日战争暨世界反...
美法官裁定特朗普政府不得因“庇... 当地时间8月22日晚,美法官裁定,特朗普政府不得因为波士顿、芝加哥、丹佛、洛杉矶及其他30个城市和县...