创建第一个 Flink 项目
创始人
2024-11-12 01:37:02
0

一、运行环境介绍

Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarnk8sMesos等不同的资源管理器部署自己的应用。

环境依赖:
【1】JDK环境:Flink核心模块均使用 Java开发,所以运行环境需要依赖JDKJDK版本需要保证在1.8以上。
【2】Maven编译环境:Flink的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】IDEA:需要安装scala插件以及scala环境等;

二、Flink项目 Scala版 DataSet 有界流

需求:同进文件文件中的单词出现的次数;

【1】创建Maven项目,pom.xml文件中配置如下依赖

            org.apache.flink        flink-scala_2.12        1.10.0                    org.apache.flink        flink-streaming-scala_2.12        1.10.0                                       net.alchim31.maven            scala-maven-plugin            3.4.6                                                                                                compile                                                                            org.apache.maven.plugins            maven-assembly-plugin            3.0.0                                                jar-with-dependencies                                                                            make-assembly                    package                                            single                                                              

【2】resource目录中添加需要进行统计的文件文件及内容
[点击并拖拽以移动] ​

【3】WordCount.java文件内容如下,需要注意隐私转换问题,需要引入scala._

 import org.apache.flink.api.scala._  /** * @Description 批处理 word count * @Author zhengzhaoxiang * @Date 2020/7/12 18:55 * @Param * @Return */ object WordCount {   def main(args: Array[String]): Unit = {     //创建一个批处理的执行环境     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment     //从文件中读取数据     var inputDateSet: DataSet[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")     //基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group by     val resultDataSet: DataSet[(String,Int)] = inputDateSet       .flatMap(_.split(" "))//分词得到所有 word构成的数据集       .map((_,1))//_表示当前 word 转换成一个二元组(word,count)       .groupBy(0)//以二元组中第一个元素作为key       .sum(1) //1表示聚合二元组的第二个元素的值      //打印输出     resultDataSet.print()   } } 

【4】统计结果展示:
[点击并拖拽以移动] ​

三、Flink项目 Scala版 DataStream 无界流

【1】StreamWordCount.java文件内容如下

package com.zzx.flink  import org.apache.flink.streaming.api.scala._  object StreamWordCount {  def main(args: Array[String]): Unit = {    // 创建一个流处理执行环境    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 接受 socket 文本流    val inputDataStream: DataStream[String] = env.socketTextStream("hadoop1",6666);    //定义转换操作 word count    val resultDataStream: DataStream[(String,Int)] = inputDataStream      .flatMap(_.split(" "))//以空格分词,得到所有的 word      .filter(_.nonEmpty)      .map((_,1))//转换成 word count 二元组      .keyBy(0)//按照第一个元素分组      .sum(1)//按照第二个元素求和     resultDataStream.print()     //上面的只是定义了处理流程,同时定义一个名称。不会让任务结束    env.execute("stream word count word")  } } 

【2】我这里在Hadoop1中通过nc -lk xxx打开一个socket通信
点击并拖拽以移动​

【3】查看IDEA输出统计内容如下:输出word的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过env.setParallelism进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
[点击并拖拽以移动] ​

【4】当我们需要将Java文件打包上传到Flink的时候,这里的hostport可以从参数中进行获取,代码修改如下:

package com.zzx.flink  import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._  object StreamWordCount {  def main(args: Array[String]): Unit = {    // 创建一个流处理执行环境    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 接受 socket 文本流  hostname:prot 从程序运行参数中读取    val params: ParameterTool = ParameterTool.fromArgs(args);    val hostname: String = params.get("host");    val port: Int = params.getInt("port");    val inputDataStream: DataStream[String] = env.socketTextStream(hostname,port);    //定义转换操作 word count    val resultDataStream: DataStream[(String,Int)] = inputDataStream      .flatMap(_.split(" "))//以空格分词,得到所有的 word      .filter(_.nonEmpty)      .map((_,1))//转换成 word count 二元组      .keyBy(0)//按照第一个元素分组      .sum(1)//按照第二个元素求和     resultDataStream.print()     //上面的只是定义了处理流程,同时定义一个名称。不会让任务结束    env.execute("stream word count word")  } } 

相关内容

热门资讯

绝活儿辅助!广西老友玩老是输怎... 绝活儿辅助!广西老友玩老是输怎么办(辅助挂)都是真的有辅助app(讲解有挂)在进入广西老友玩老是输怎...
法门辅助!福建13水插件(辅助... 法门辅助!福建13水插件(辅助挂)一贯是有辅助技巧(有挂技术)1、许多玩家不知道福建13水插件辅助怎...
办法辅助!潮友会app下载官方... 办法辅助!潮友会app下载官方辅助器(辅助挂)真是真的是有辅助app(有挂教程)该软件可以轻松地帮助...
妙招辅助!邯郸胡乐挂辅助(辅助... 妙招辅助!邯郸胡乐挂辅助(辅助挂)好像存在有辅助插件(有挂方略)1、上手简单,内置详细流程视频教学,...
教程书辅助!乐酷辅助(辅助挂)... 教程书辅助!乐酷辅助(辅助挂)其实存在有辅助脚本(有挂细节)乐酷辅助能透视中分为三种模型:乐酷辅助模...
学习辅助!决战卡五星辅助(辅助... 学习辅助!决战卡五星辅助(辅助挂)本来真的是有辅助软件(有人有挂)学习辅助!决战卡五星辅助(辅助挂)...
绝活辅助!边锋嘉兴麻将辅助器(... 绝活辅助!边锋嘉兴麻将辅助器(辅助挂)真是真的有辅助神器(新版有挂)1、边锋嘉兴麻将辅助器公共底牌简...
举措辅助!枫叶辅助器(辅助挂)... 举措辅助!枫叶辅助器(辅助挂)本来存在有辅助技巧(竟然有挂)1、下载好枫叶辅助器正确养号方法之后点击...
讲义辅助!点我达辅助(辅助挂)... 讲义辅助!点我达辅助(辅助挂)一直存在有辅助技巧(有人有挂)1、点我达辅助辅助器安装包、点我达辅助辅...
模块辅助!威信茶馆有挂的吗(辅... 模块辅助!威信茶馆有挂的吗(辅助挂)一直真的是有辅助脚本(揭秘有挂)1、玩家可以在威信茶馆有挂的吗线...