实时

您的位置:首页>资讯 >

大数据Flink进阶(六):Flink入门案例

Flink入门案例

需求:读取本地数据文件,统计文件中每个单词出现的次数。

一、IDEA Project创建及配置

本案例编写Flink代码选择语言为Java和Scala,所以这里我们通过IntelliJ IDEA创建一个目录,其中包括Java项目模块和Scala项目模块,将Flink Java api和Flink Scala api分别在不同项目模块中实现。步骤如下:

1、打开IDEA,创建空项目

2、在IntelliJ IDEA 中安装Scala插件

使用IntelliJ IDEA开发Flink,如果使用Scala api 那么还需在IntelliJ IDEA中安装Scala的插件,如果已经安装可以忽略此步骤,下图为以安装Scala插件。


(相关资料图)

3、打开Structure,创建项目新模块

创建Java模块:

继续点击"+",创建Scala模块:

创建好"FlinkScalaCode"模块后,右键该模块添加Scala框架支持,并修改该模块中的"java"src源为"scala":

在"FlinkScalaCode"模块Maven pom.xml中引入Scala依赖包,这里使用的Scala版本为2.12.10。

  org.scala-lang  scala-library  2.12.10  org.scala-lang  scala-compiler  2.12.10  org.scala-lang  scala-reflect  2.12.10

4、Log4j日志配置

为了方便查看项目运行过程中的日志,需要在两个项目模块中配置log4j.properties配置文件,并放在各自项目src/main/resources资源目录下,没有resources资源目录需要手动创建并设置成资源目录。log4j.properties配置文件内容如下:

log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n

复制

并在两个项目中的Maven pom.xml中添加对应的log4j需要的依赖包,使代码运行时能正常打印结果:

  org.slf4j  slf4j-log4j12  1.7.36  org.apache.logging.log4j  log4j-to-slf4j  2.17.2

5、分别在两个项目模块中导入Flink Maven依赖

"FlinkJavaCode"模块导入Flink Maven依赖如下:

  UTF-8  1.8  1.8  1.16.0  1.7.36  2.17.2        org.apache.flink    flink-clients    ${flink.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

"FlinkScalaCode"模块导入Flink Maven依赖如下:

  UTF-8  1.8  1.8  1.16.0  1.7.31  2.17.1  2.12.10  2.12        org.apache.flink    flink-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-streaming-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-clients    ${flink.version}          org.scala-lang    scala-library    ${scala.version}        org.scala-lang    scala-compiler    ${scala.version}        org.scala-lang    scala-reflect    ${scala.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

注意:在后续实现WordCount需求时,Flink Java Api只需要在Maven中导入"flink-clients"依赖包即可,而Flink Scala Api 需要导入以下三个依赖包:

flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients

主要是因为在Flink1.15版本后,Flink添加对opting-out(排除)Scala的支持,如果你只使用Flink的Java api,导入包不必包含scala后缀,如果使用Flink的Scala api,需要选择匹配的Scala版本。

二、案例数据准备

在项目"MyFlinkCode"中创建"data"目录,在目录中创建"words.txt"文件,向文件中写入以下内容,方便后续使用Flink编写WordCount实现代码。

hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink

三、案例实现

数据源分为有界和无界之分,有界数据源可以编写批处理程序,无界数据源可以编写流式程序。DataSet API用于批处理,DataStream API用于流式处理。

批处理使用ExecutionEnvironment和DataSet,流式处理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示数据的特殊类,DataSet处理的数据是有界的,DataStream处理的数据是无界的,这两个类都是不可变的,一旦创建出来就无法添加或者删除数据元。

1、Flink 批数据处理案例

Java版本WordCount

使用Flink Java Dataset api实现WordCount具体代码如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.读取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分单词FlatMapOperator wordsDS =        linesDS.flatMap((String lines, Collector collector) -> {    String[] arr = lines.split(" ");    for (String word : arr) {        collector.collect(word);    }}).returns(Types.STRING);//3.将单词转换成Tuple2 KV 类型MapOperator> kvWordsDS =        wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 进行分组处理得到最后结果并打印kvWordsDS.groupBy(0).sum(1).print();

Scala版本WordCount

使用Flink Scala Dataset api实现WordCount具体代码如下:

//1.准备环境,注意是Scala中对应的Flink环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.api.scala._//3.读取数据文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.进行 WordCount 统计并打印linesDS.flatMap(line => {  line.split(" ")})  .map((_, 1))  .groupBy(0)  .sum(1)  .print()

以上无论是Java api 或者是Scala api 输出结果如下,显示的最终结果是统计好的单词个数。

(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)

2、Flink流式数据处理案例

Java版本WordCount

使用Flink Java DataStream api实现WordCount具体代码如下:

//1.创建流式处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.读取文件数据DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分单词,设置KV格式数据SingleOutputStreamOperator> kvWordsDS =        lines.flatMap((String line, Collector> collector) -> {    String[] words = line.split(" ");    for (String word : words) {        collector.collect(Tuple2.of(word, 1L));    }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分组统计获取 WordCount 结果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式计算中需要最后执行execute方法env.execute();
Scala版本WordCount

使用Flink Scala DataStream api实现WordCount具体代码如下:

//1.创建环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.streaming.api.scala._//3.读取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.进行wordCount统计ds.flatMap(line=>{line.split(" ")})  .map((_,1))  .keyBy(_._1)  .sum(1)  .print()//5.最后使用execute 方法触发执行env.execute()

以上输出结果开头展示的是处理当前数据的线程,一个Flink应用程序执行时默认的线程数与当前节点cpu的总线程数有关。

3、DataStream BATCH模式

下面使用Java代码使用DataStream API 的Batch 模式来处理批WordCount代码,方式如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置批运行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() {    @Override    public void flatMap(String lines, Collector> out) throws Exception {        String[] words = lines.split(" ");        for (String word : words) {            out.collect(new Tuple2<>(word, 1L));        }    }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute();

以上代码运行完成之后结果如下,可以看到结果与批处理结果类似,只是多了对应的处理线程号。

3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)

此外,Stream API 中除了可以设置Batch批处理模式之外,还可以设置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式会根据数据是有界流/无界流自动决定采用BATCH/STREAMING模式来读取数据,设置方式如下:

//BATCH 设置批处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 会根据有界流/无界流自动决定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 设置流处理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

除了在代码中设置处理模式外,还可以在Flink配置文件(flink-conf.yaml)中设置execution.runtime-mode参数来指定对应的模式,也可以在集群中提交Flink任务时指定execution.runtime-mode来指定,Flink官方建议在提交Flink任务时指定执行模式,这样减少了代码配置给Flink Application提供了更大的灵活性,提交任务指定参数如下:

$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar

关键词:

推荐阅读
本案例编写Flink代码选择语言为Java和Scala,所以这里我们通过IntelliJIDEA创建一个目录,其中包括Java项目模块和Scala项目模块,将

2023-03-21 08:09:21

1、盛仲孙妻刘氏墓志年代是北宋元符1年(1098)11月28日。2、出土地点是江蘇省江都市出土。

2023-03-21 04:43:46

倪萍同框毛阿敏两个人完全就像是两代人呢!虽然倪萍曾经减肥成功了,但是现在的她看着更加的臃肿,就像是一位普通老大妈毛阿敏却完全不一样。

2023-03-20 23:59:13

说起粗粮相信大家都非常熟悉,以前粗粮是人们饮食中的主要供给,它含有粗纤维以及氨基酸,不过随着生活条件的改善,很多人几乎不怎么吃粗粮,

2023-03-20 20:58:33

1、好医保系列是蚂蚁保险平台上售卖的保险产品,它的医疗险比较多,比如好医保长期医疗险,好医保住院医疗险,其中好医保长期医疗险是最为热门

2023-03-20 17:59:09

据中国报告大厅对2023年3月20日上海市十八醇价格最新走势监测显示:2023年3月20日上海市十八醇(泰国科宁)报价1

2023-03-20 16:19:56

为进一步探讨新能源汽车大数据共享共用机制、全力构建数字跨界应用新生态,3月19日至20日,“中国新能源汽车大数据2023年产业大会”在位于沈阳

2023-03-20 14:23:01

解答:1、配料表。先看看配料表。国家规定配料表里的配料要按添加量排列,前面的添加量多。如果第一名是奶酪,奶酪或者牛奶,牛

2023-03-20 11:55:24

今天小编肥嘟来为大家解答以上的问题。丝印油墨配方比例,丝印油墨配方相信很多小伙伴还不知道,现在让我们一起来看看吧!1、PVC-醋酸乙烯共聚

2023-03-20 10:03:53

故意还是不小心?男子住宅被邻居摄像头“全包围”,咋回事

2023-03-20 08:19:21

1、还没有完结呢。2、顾云黛赵元璟小说:宠妃难为:皇上,娘娘今晚不侍寝。以上就是【宠妃难为仟夕瑶,宠妃难为】相关内容。

2023-03-20 03:06:36

1、利空是国家或上市公司出现了一些对上市公司有害的消息 导致股票的下跌。2、  利空因素是控制股价下跌信息的因素,包括政治因素,经济发展

2023-03-19 22:16:52

格隆汇3月19日丨中国华融(02799 HK)公布,公司将于2023年3月29日(星期三)召开董事会会议,藉以审议并(如认为适当)批准包括公司及其附属公司截至2

2023-03-19 18:57:09

英超第28轮,切尔西主场2-2战平埃弗顿,奇尔维尔首发出场,他本场比赛数据如下:出场90分钟,触球96次(全场第一)传球

2023-03-19 15:08:16

1、年利率未超过24%的,合法有效。2、双方约定的利率未超过年利率24%,出借人请求借款人按照约定的利率支付利息的,人民

2023-03-19 11:57:48

1、工作日加班费是劳动者本人工资的1 5倍,双休日是2倍,国定假日是3倍。2、按照法律规定内,每月工作时间为容20 83天,月计薪天数为21 75天,

2023-03-19 08:16:52

作者|草猫那位伟大的向导大人:隐秘而伟大的流光无际mod幕后功臣,他隐姓埋名数载,创造国产泰拉瑞亚mod奇迹。2023年3月18日,泰拉瑞亚顶级国

2023-03-19 02:51:40

1、细菌。2、也就是微生物的原始形态。以上就是【海洋里最小的生物,最小的生物】相关内容。

2023-03-18 22:13:15

1、笑眼就是笑起来眼睛会完成月亮的眼型,或者眯成一条弯弯的缝,看起来甜甜的,给人一种天真浪漫的感觉。2、所谓的笑眼,就是

2023-03-18 18:43:18

北京时间3月15日凌晨4点,欧冠1 8决赛次回合的一场焦点战展开争夺,曼城在主场迎战莱比锡。曼城因为顶级的教练和豪华的阵容,让其成为各项赛事

2023-03-18 15:16:29