title: Flink系列
Flink 的批处理 和 Spark 的批处理,都支持两个非常好的特性: 广播变量 + 累加器
广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks,广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
用法:
// 1:初始化数据
DataSet toBroadcast = env.fromElements(1, 2, 3)
// 2:广播数据
withBroadcastSet(toBroadcast, "broadcastSetName");
// 3:获取数据
Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意:
1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
package com.aa.flinkjava.broadcast;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;/*** @Author AA* @Date 2022/2/24 19:37* @Project bigdatapre* @Package com.aa.flinkjava.broadcast* Flink BroadCast 测试* 在这里做一个join的连接实现*/
public class FlinkBroadCastDemo {public static void main(String[] args) throws Exception {//1、获取运行环境ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();//2、造数据ArrayList> list = new ArrayList<>();list.add(new Tuple2<>("zhangsan",20));list.add(new Tuple2<>("lisi",21));list.add(new Tuple2<>("wangwu",22));//3、读取造的数据DataSource> dataSource = executionEnvironment.fromCollection(list);dataSource.print("dataSource : ");//4、帮tuple2转化为hashmap 。 map中的key是用户姓名,value是用户年龄// DataSet>的数据类型可以直接修饰强制转换。DataSet> toBroadcast = dataSource.map(new MapFunction, HashMap>() {@Overridepublic HashMap map(Tuple2 tuple2) throws Exception {HashMap hashMap = new HashMap<>();hashMap.put(tuple2.f0,tuple2.f1);return hashMap;}});//5、再造一份 join 使用的数据DataSource data2 = executionEnvironment.fromElements("zhangsan", "lisi", "wangwu");data2.print("data2 : ");//6、执行广播数据的一些操作// 下面这个DataSet类型也是强制转换的的DataSet result = data2.map(new RichMapFunction() {List> broadCastMap = new ArrayList>();HashMap allMap = new HashMap();@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.broadCastMap = getRuntimeContext().getBroadcastVariable("bdMapName");for (HashMap map : broadCastMap) {allMap.putAll(map);}}/*** @param s s是data2中间的一个一个的元素,其实就是"zhangsan", "lisi", "wangwu" 这些值* 根据 name("zhangsan", "lisi", "wangwu") 去广播变量中匹配获取相应的年龄* @return* @throws Exception*/@Overridepublic String map(String s) throws Exception {Integer age = allMap.get(s);return s + "," + age; //输出拼接的结果}}).withBroadcastSet(toBroadcast, "bdMapName");//7、打印输出result.print();}
}
Accumulator 即累加器,与 Mapreduce Counter 的应用场景差不多,都能很好地观察 Task 在运行期间的数据变化。可以在 Flink job 任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter 是一个具体的累加器 (Accumulator) 实现:IntCounter, LongCounter 和 DoubleCounter
用法:
// 1、创建累加器
private IntCounter numlines = new IntCounter();
// 2、注册累加器
getRuntimeContext().addAccumulator("num", this.numLines);
// 3、使用累加器
this.numlines.add(1);
// 4、获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num")
package com.aa.flinkjava.counter;import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;import java.util.ArrayList;/*** @Author AA* @Date 2022/2/25 14:27* @Project bigdatapre* @Package com.aa.flinkjava.counter* Flink 累加器 示例* 统计输入数据源的流入数据的次数。*/
public class FlinkCounterDemo {public static void main(String[] args) throws Exception {//1、获取运行环境ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(3);//2、读取造的数据DataSource dataSource = executionEnvironment.fromElements("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");//3、定义一点逻辑,给累加器放进去MapOperator result = dataSource.map(new RichMapFunction() {//3-1 创建累加器对象private IntCounter numlines = new IntCounter();@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//3-2 需要注册累加器/*在逻辑上来说,相当于在这个 application应用内部定义了一个变量 num 用来做统计。但是,物理上,其实这个 num 变量是由分散在所有 Task 内部的 numlines 组成的。一个 num 包含了很多个 numlines。其实最终拿到的结果,就是把所有 Task 中的 numlines 加起来,就是 num 的值。*/this.getRuntimeContext().addAccumulator("num", this.numlines);}@Overridepublic String map(String s) throws Exception {//另外注意,可能有小伙伴觉得可以在这里定义普通变量统计也行,// 注意:若并行度为1,使用普通的累加求和也可以,但是设置多个并行度,则普通的累加求和结果就不准啦。//每运行一次就 向累加器中 添加1this.numlines.add(1);return s; //这里没有做什么逻辑,就是给来的数据原样输出了。但是上面统计了累加次数了。}});//4、给结果输出出去result.writeAsText("D:\\flinkcount3");//5、执行JobExecutionResult jobExecutionResult = executionEnvironment.execute();//6、看看累加器的结果Integer num = jobExecutionResult.getAccumulatorResult("num");System.out.println("累加器的输出的结果是: " + num);}
}
声明:
文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。
B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接