Flink 提供了 8 个 Process Function:
Flink sql中存在这样一个语法SELECT * FROM A INNER JOIN B WHERE A.id = B.id,A和B各为一条数据流,对于批处理来说,处理的A和B两张静态表笛卡尔积的过滤(先将A和B中相同id的数据shuffle到同一分区,然后再做笛卡尔积)。对Flink来说,需要处理的是两条流的等值内连接,对流A中和流B中一样的key做笛卡尔积。
对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。CoProcessFunction 提供了操作每一个输入流的方法: processElement1() 和 processElement2()。类似于 ProcessFunction,这两种方法都通过 Context 对象来调用。这个 Context 对象可以访问事件数据,定时器时间戳,TimerService,以及 side outputs。CoProcessFunction 也提供了 onTimer() 回调函数。
使用CoProcessFunction完成底层内连接
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource> stream1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("b", 2),Tuple2.of("a",3));DataStreamSource> stream2 = env.fromElements(Tuple2.of("a", "a"),Tuple2.of("b", "b"),Tuple2.of("a","aa"));// 需要将以往的数据都存储下来stream1.keyBy(r -> r.f0).connect(stream2.keyBy(r -> r.f0))// 泛型依次为:流一、流二、输出.process(new CoProcessFunction, Tuple2, String>() {private ListState> listState1;private ListState> listState2;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);listState1 = getRuntimeContext().getListState(new ListStateDescriptor>("list1", Types.TUPLE(Types.STRING,Types.INT)));listState2 = getRuntimeContext().getListState(new ListStateDescriptor>("list2",Types.TUPLE(Types.STRING,Types.STRING)));}@Overridepublic void processElement1(Tuple2 value, Context ctx, Collector out) throws Exception {// 流一元素到来,存储到状态变量1中listState1.add(value);// 遍历流二中的所有元素,与下面遍历流一中的元素,共同组成了笛卡尔积for(Tuple2 e : listState2.get()){out.collect(value + " =>> " + e);}}@Overridepublic void processElement2(Tuple2 value, Context ctx, Collector out) throws Exception {listState2.add(value);for (Tuple2 e : listState1.get()){out.collect(e + " =>> " + value);}}}).print();env.execute();
}
需求分析:如网上购物,创建订单,拉起第三方支付,支付成功后,第三方支付的服务器会发送一条回调通知,告诉购物app,已经支付成功,然后app后端会将订单状态由未支付改为已支付。现由于某种原因,app没有收到这条回调通知,导致钱被扣了,app中的订单状态仍然为未支付状态。
现在需要完成的任务是:下订单的操作和第三方发回的回调通知,这两条流对下账,保证订单状态及时修改。
代码如下,一事件到达后5s内另一事件没到达即认定对账失败:
public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator orderStream = env.fromElements(Event.of("order-1", "order", 1000L),Event.of("order-2", "order", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));SingleOutputStreamOperator weChatStream = env.fromElements(Event.of("order-1", "weChat", 3000L),Event.of("order-3", "weChat", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 完成对账orderStream.keyBy(r -> r.orderId).connect(weChatStream.keyBy(r -> r.orderId)).process(new MatchFunction()).print();env.execute();
}public static class MatchFunction extends CoProcessFunction{private ValueState orderState;private ValueState weChatState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);orderState = getRuntimeContext().getState(new ValueStateDescriptor("order", Types.POJO(Event.class)));weChatState = getRuntimeContext().getState(new ValueStateDescriptor("weChat",Types.POJO(Event.class)));}@Overridepublic void processElement1(Event value, Context ctx, Collector out) throws Exception {if(weChatState.value() == null){// 下订单order事件先到达orderState.update(value);ctx.timerService().registerEventTimeTimer(value.timestamp + 5000L);}else{out.collect("ID为【" + value.orderId + "】的订单对账成功,微信事件先到达");weChatState.clear();}}@Overridepublic void processElement2(Event value, Context ctx, Collector out) throws Exception {if(orderState.value() == null){// 微信支付事件先到达weChatState.update(value);ctx.timerService().registerEventTimeTimer(value.timestamp + 5000L);}else {out.collect("ID为【" + value.orderId + "】的订单对账成功,订单事件先到达");orderState.clear();}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {super.onTimer(timestamp, ctx, out);if(orderState.value() != null){// 订单事件现先到达,但5s内微信支付事件没达到out.collect("id为【" + orderState.value().orderId + "】的订单对账失败,微信事件5s内未到达");orderState.clear();}if(weChatState.value() != null){out.collect("id为【" + weChatState.value().orderId + "】的订单对账失败,订单事件5s内未到达");}}
}public static class Event{public String orderId;public String eventType;public Long timestamp;public Event() {}public Event(String orderId, String eventType, Long timestamp) {this.orderId = orderId;this.eventType = eventType;this.timestamp = timestamp;}public static Event of(String orderId,String eventType,Long timestamp){return new Event(orderId,eventType,timestamp);}@Overridepublic String toString() {return "Event{" +"orderId='" + orderId + '\'' +", eventType='" + eventType + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}
}
如果修改下述代码,理论上是对账不成功的,但输出结果为对账成功。
Event.of("order-1", "weChat", 30000L)
因为对于离线数据集,只会在数据集的开始插入一个负无穷大,数据集的末尾插入一个正无穷大,当30s微信事件到达后,由于水位线分流原则,此时的水位线仍然为负无穷大,在订单事件和微信事件之间并不存在触发6s水位线的定时事件。
只需要切换数据源即可,此段程序的逻辑并无问题,只不过其处理的是有限的数据集。
将数据源更改为:
SingleOutputStreamOperator orderStream = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) throws Exception {ctx.collectWithTimestamp(Event.of("order-1","order",1000L),1000L);ctx.emitWatermark(new Watermark(999L));ctx.collectWithTimestamp(Event.of("order-2","order",3000L),3000L);ctx.emitWatermark(new Watermark(8001L));}@Overridepublic void cancel() {}});SingleOutputStreamOperator weChatStream = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) throws Exception {ctx.collectWithTimestamp(Event.of("order-1","weChat",4000L),4000L);ctx.emitWatermark(new Watermark(3999L));ctx.emitWatermark(new Watermark(8001L));ctx.collectWithTimestamp(Event.of("order-2","weChat",9000L),9000L);}@Overridepublic void cancel() {}});
数据流操作的另一个常见需求是对两条数据流中的事件进行联结(connect)或 Join。Flink DataStream API 中内置有两个可以根据时间条件对数据流进行 Join 的算子:基于间隔的 Join 和基于窗口的 Join。
如果 Flink 内置的 Join 算子无法表达所需的 Join 语义,那么可以通过 CoProcessFunction、BroadcastProcessFunction 或 KeyedBroadcastProcessFunction 实现自定义的 Join逻辑。
注意,要设计的 Join 算子需要具备高效的状态访问模式及有效的状态清理策略。
基于间隔的 Join 会对两条流中拥有相同键值以及彼此之间时间戳不超过某一指定间隔的事件进行 Join。
下图展示了两条流(A 和 B)上基于间隔的 Join,如果 B 中事件的时间戳相较于 A中事件的时间戳不早于 1 小时且不晚于 15 分钟,则会将两个事件 Join 起来。Join 间隔具有对称性,因此上面的条件也可以表示为 A 中事件的时间戳相较 B 中事件的时间戳不早于 15 分钟且不晚于 1 小时。

基于间隔的 Join 目前只支持事件时间以及 INNER JOIN 语义(无法发出未匹配成功的事件)。
Join 成功的事件对会发送给 ProcessJoinFunction。下界和上界分别由负时间间隔和正时间间隔来定义,例如 between(Time.hour(-1), Time.minute(15))。在满足下界值小于上界值的前提下,你可以任意对它们赋值。例如,允许出现 B 中事件的时间戳相较 A 中事件的时间戳早 1~2 小时这样的条件。
基于间隔的 Join 需要同时对双流的记录进行缓冲。对第一个输入而言,所有时间戳大于当前水位线减去间隔上界的数据都会被缓冲起来;对第二个输入而言,所有时间戳大于当前水位线加上间隔下界的数据都会被缓冲起来。注意,两侧边界值都有可能为负。
上图中的 Join 需要存储数据流 A 中所有时间戳大于当前水位线减去 15 分钟的记录,以及数据流 B 中所有时间戳大于当前水位线减去 1 小时的记录。不难想象,如果两条流的事件时间不同步,那么 Join 所需的存储就会显著增加,因为水位线总是由“较慢”的那条流来决定。
下面的例子定义了一个基于间隔的 Join,解决恶意刷单行为,具体做法:将用户下订单事件与之前10分钟的pv事件进行join。
public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator orderStream = env.fromElements(Example3.Event.of("user-1", "order", 20 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Example3.Event element, long recordTimestamp) {return element.timestamp;}}));SingleOutputStreamOperator pvStream = env.fromElements(Example3.Event.of("user-1", "pv", 5 * 60 * 1000L),Example3.Event.of("user-1", "pv", 10 * 60 * 1000L),Example3.Event.of("user-1", "pv", 12 * 60 * 1000L),Example3.Event.of("user-1", "pv", 21 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Example3.Event element, long recordTimestamp) {return element.timestamp;}}));orderStream.keyBy(r -> r.orderId).intervalJoin(pvStream.keyBy(r -> r.orderId)).between(Time.minutes(-10),Time.minutes(0)).process(new ProcessJoinFunction() {@Overridepublic void processElement(Example3.Event left, Example3.Event right, Context ctx, Collector out) throws Exception {out.collect(left + " => " + right);}}).print("orderStream JOIN pvStream:");env.execute();
}
将两条流调换顺序join
pvStream.keyBy(r -> r.orderId).intervalJoin(orderStream.keyBy(r -> r.orderId)).between(Time.minutes(0),Time.minutes(10)).process(new ProcessJoinFunction() {@Overridepublic void processElement(Example3.Event left, Example3.Event right, Context ctx, Collector out) throws Exception {out.collect(right + " => " + left);}}).print("pvStream JOIN orderStream:");
顾名思义,基于窗口的 Join 需要用到 Flink 中的窗口机制。其原理是将两条输入流中的元素分配到公共窗口中并在窗口完成时进行 Join(或 Cogroup)。
下图展示了 DataStream API 中基于窗口的 Join 是如何工作的,此种方式限制了两条流的开窗大小必须相同。
下面的例子展示了如何定义基于窗口的 Join,实际上是一个基于窗口的笛卡尔积。
public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator> stream1 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 1)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 element, long recordTimestamp) {return element.f1;}}));SingleOutputStreamOperator> stream2 = env.fromElements(Tuple2.of("a", 2), Tuple2.of("a", 3), Tuple2.of("b", 2), Tuple2.of("b", 3)).assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner>() {@Overridepublic long extractTimestamp(Tuple2 element, long recordTimestamp) {return element.f1;}}));stream1.join(stream2)// 指定第一条的key.where(r -> r.f0)// 指定第二条的key.equalTo(r -> r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction, Tuple2, String>() {@Overridepublic String join(Tuple2 first, Tuple2 second) throws Exception {return first + " => " + second;}}).print();env.execute();
}