Flink系列之Flink 流式编程模式总结
创始人
2024-03-12 15:59:41
0

title: Flink系列


一、Flink 流式编程模式总结

1.1 基础总结

官网: https://flink.apache.org/

Apache Flink® — Stateful Computations over Data Streams

在这里插入图片描述

三个任意:

	任意的数据源 Source任意的计算类型 Transformation任务的数据目的地 Sink

其中关于 Flink 的编程:

在这里插入图片描述

所有的计算,都是这样的大流程:

  • ​ 从哪里读取数据
  • ​ 读取到了数据执行什么样的计算
  • ​ 计算得到结果之后,输出到哪里

Flume + DataX(从哪里收集?内部的处理?数据输出到哪里?)
总结一下:

在这里插入图片描述

通过 Flink WordCount 总结出来的编程模式:

01、获得一个执行环境:(Execution Environment)
02、加载/创建初始数据:(Source)
03、指定转换这些数据:(Transformation)
04、指定放置计算结果的位置:(Sink)
05、触发程序执行:(Action)

1.2 提交到集群上面运行流程

1.2.1 通过页面方式提交

1、测试程序如下

package com.aa.flinkscalaimport org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._/*** @Author AA* @Project bigdatapre* @Package com.aa.flinkscala* 动态参数形式测试*/
object WordCountScalaStreamWithParameter {def main(args: Array[String]): Unit = {//1、获取执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//修改成动态参数val parameterTool = ParameterTool.fromArgs(args)val hostname = parameterTool.get("hostname")val port = parameterTool.getInt("port")//2、获取数据源val textStream: DataStream[String] = env.socketTextStream(hostname, port)//3、数据计算处理逻辑val wordCountStreamRes: DataStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_, 1)).keyBy(line => line._1)//.keyBy(0) //过期的方法,将来可能不支持啦.sum(1)//4、打印输出结果wordCountStreamRes.print()//5、启动应用程序env.execute("WordCountScalaStreamWithParameter")}
}

2、打成jar

直接通过idea右侧的maven方式打包即可

在这里插入图片描述

打包完了之后如下:

在这里插入图片描述

3、启动集群进行测试即可

在hadoop10上面启动:

[root@hadoop10 bin]# cd /software/flink/bin/
[root@hadoop10 bin]# start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host hadoop10.
Starting taskexecutor daemon on host hadoop11.
Starting taskexecutor daemon on host hadoop12.
[root@hadoop10 bin]# 

页面如下:

在这里插入图片描述

4、在页面中提交

(1)点击Submit New Job

在这里插入图片描述

(2)点击Add New

在这里插入图片描述

(3)选择上传之后如下:

在这里插入图片描述

(4)单击jar包的名字,填写参数
在这里插入图片描述

(4)填写完参数如下:

在这里插入图片描述

(5)记得先启动 hadoop12 上面的 9999 端口 。 否则报错连接不上。

[root@hadoop12 software]# nc -lk 9999

(6)点击Submit提交运行

然后去运行中查看对应程序

在这里插入图片描述

(7)到hadoop12 的 9999 端口中输入数据进行测试

[root@hadoop12 software]# nc -lk 9999hello world hello hadoop hello flink
hello

(8)去页面中查看

在这里插入图片描述

(9)取消程序

点击页面中的左上角的Cancel Job即可

在这里插入图片描述

或者使用命令取消:

[root@hadoop10 bin]# flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
23.02.2022 16:11:02 : 9aab4c22c946dc0bfeb1546741fca390 : WordCountScalaStreamWithParameter (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@hadoop10 bin]# flink cancel 9aab4c22c946dc0bfeb1546741fca390
Cancelling job 9aab4c22c946dc0bfeb1546741fca390.
Cancelled job 9aab4c22c946dc0bfeb1546741fca390.
[root@hadoop10 bin]# flink list
Waiting for response...
No running jobs.
No scheduled jobs.
[root@hadoop10 bin]# 

截图如下:

在这里插入图片描述

4、通过命令显示所有历史任务

[root@hadoop10 bin]# flink list -a
Waiting for response...
No running jobs.
No scheduled jobs.
---------------------- Terminated Jobs -----------------------
23.02.2022 16:04:45 : 550fe2157ca70d796aeca4cf8b4dc622 : WordCountScalaStreamWithParameter (FAILED)
23.02.2022 16:11:02 : 9aab4c22c946dc0bfeb1546741fca390 : WordCountScalaStreamWithParameter (CANCELED)
23.02.2022 16:24:33 : b2fdf9e8b8cb122a235d2948a307b244 : WordCountScalaStreamWithParameter (CANCELED)
23.02.2022 16:30:26 : 67e18591ba85782bde8dd3a49e1488cb : WordCountScalaStreamWithParameter (CANCELED)
23.02.2022 16:31:24 : 52d672e2f628badf42a2fee396ffaf18 : WordCountScalaStreamWithParameter (FAILED)
--------------------------------------------------------------
[root@hadoop10 bin]# 

1.2.2 通过代码方式提交

1、基础理论

去到flink的bin目录下面:

flink run -c com.aa.flinkscala.WordCountScalaStreamWithParameter /home/data/flink-1.0-SNAPSHOT.jar --hostname hadoop12 --port 9999

或者:

flink run \
-c com.aa.flinkscala.WordCountScalaStreamWithParameter \
/home/data/flink-1.0-SNAPSHOT.jar \
--hostname hadoop12 --port 9999

2、实践

[root@hadoop10 bin]# flink run -c com.aa.flinkscala.WordCountScalaStreamWithParameter /home/data/flink-1.0-SNAPSHOT.jar --hostname hadoop12 --port 9999
Job has been submitted with JobID b2fdf9e8b8cb122a235d2948a307b244

去页面中查看

正在运行的如下:

在这里插入图片描述

同理输入测试就一样了。

3、小总结

提交 Flink 应用程序到 Flink Cluster 中运行:

​ 搭建 Flink 集群!
​ 编写 wordcount,打成 jar 包,提交到 flink 集群运行

flink run \
--c 全类路径名 \
jar包绝对路径 \
--hostname hadoop10 \
--port 6789

当你需要提交一个 jar 包到 flink standalone 集群或者 YARN 中运行的时候,其实是通过 flink run … 搞定的 ==>
这个 shell 命令的底层就是: java CliFrontend

提交完程序之后,在另外一个节目里面通过jps,可以看到这个CliFrontend 。

在这里插入图片描述

在源码中的全路径名为:

flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

在源码中的位置如下:

在这里插入图片描述



声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

相关内容

热门资讯

公安部:立案查处金融领域“黑灰... 北京商报讯(记者 岳品瑜 董晗萱)12月25日,公安部召开新闻发布会,通报公安部和国家金融监督管理总...
感知山东| 胶州市开展“法律护... 为不断深化“陪伴成长”全环境立德树人品牌建设,近日,胶州市司法局李哥庄司法所联合镇宣传办,邀请市“蓝...
天山脚下:检察公益诉讼保卫“地... 每年8月,当天山北坡的阳光变得灼热而明亮,新疆维吾尔自治区昌吉回族自治州吉木萨尔县的田间地头开始弥漫...
围绕关键问题聚焦制度完善建言献... 在十四届全国人大常委会第十九次会议上 本报记者 朱宁宁 十四届全国人大常委会第十九次会议12月23日...
最高法院:名誉侵权、商业诋毁,... 最高法院:名誉侵权、商业诋毁,构成重复起诉? 前后两诉当事人、诉讼标的和诉讼请求不完全一致的,不构成...
柯汶利执导犯罪悬疑片《匿杀》曝... 搜狐娱乐讯 犯罪悬疑片《匿杀》发布终极预告及海报。十五年前,一位自称“小梅”的女孩在火车上惨遭虐杀并...
海南万宁市公安局发布通告 举报... 万宁市公安局关于对举报涉枪涉爆违法犯罪线索予以奖励的通告 为切实有效打击涉枪涉爆违法犯罪活动,提高人...
科技强省需要怎样的金融体系?广... 科技自立自强是国家发展的战略支撑,也是中国式现代化的关键变量。对广东而言,建设科技强省,既是扛起经济...
央行:发挥增量政策和存量政策集... 人民网北京12月25日电 (记者罗知之)据中国人民银行网站消息,中国人民银行货币政策委员会2025年...