Apache Filnk----入门
创始人
2024-12-08 18:33:32
0

文章目录

  • Flink 概述
    • Flink 是什么
    • 有界流和无界流
    • 有状态流处理
    • Flink 特点
    • Flink vs SparkStreaming
    • Flink 分层API
  • Flink 快速上手
    • WordCount 代码编写
      • 批处理
      • 流处理
      • 读取socket文本流

Flink 概述

Flink 是什么

在这里插入图片描述

有界流和无界流

  • 无界数据流:
    • 有定义流的开始,但没有定义流的结束;
    • 它们会无休止的产生数据:
    • 无界流的数据必须持续处理,即数据被摄取后需要立刻处理我们不能等到所有数据都到达再处理,因为输入是无限的。
  • 有界数据流:
    • 有定义流的开始,也有定义流的结束:
    • 有界流可以在摄取所有数据后再进行计算:
    • 有界流所有数据可以被排序,所以并不需要有序摄取:
    • 有界流处理通常被称为批处理

有状态流处理

把流处理需要的额外数据保存成一个状态,然后针对这条数据进行处理,并且更新状态。这就是所谓的"有状态的流处理"

在这里插入图片描述

  • 状态在内存中:优点:速度快 ;缺点:可靠性差
  • 状态在分布式系统中:优点:可靠性高;缺点:速度慢

Flink 特点

  • 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。
  • 结果的准确性。Flink提供了事件时间(event-time)和处理时间(processing-time)语义对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
  • 精确一次(exactly-once)的状态一致性保证。
  • 可以连接到最常用的外部系统,如Kata、Hive、JDBC、HDFS、Redis等
  • 高可用。本身高可用的设置,加上与K8S,YARN和Mesos的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Fhnk能做到以极少的停机时间7x24全天候运行。

Flink vs SparkStreaming

在这里插入图片描述

Flink 分层API

在这里插入图片描述

  • 有状态流处理:通过底层API(处理函数),对最原始数据加工处理。底层API与DataSstrea API相集成,可以处理复杂的计算。
  • DataStream API(流处理)和DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transormations,包括map、flatmap等),连接(joms),聚合(aggregations),窗口(windows)操作等。注意:Flimk1.12以后,DataStream API已经实现真正的流批一体,所以DataSet API已经过时
  • Table API 是以表为中心的声明式编程,其中表可能会动态变化。Ible API遵循关系模型:表有二维数据结构,类似于关系数据库中的表;同时API提供可比较的操作,例如select、project、jomn、group-by、aggregate等。我们可以在表与 DataStream/Dataset 之间无缝切换,以允许程序将 Table AP与DataStream 以及 DataSet 混合使用。
  • SOL这一层在语法与表达能力上与 Table API类似,但是是以SOL查询表达式的形式表现程序。SOL抽象与Table API交互密切,同上心能有难区的时SOL查询可以直接在Table API定义的表上执行。

Flink 快速上手

  • 在IDEA中创建Maven项目

  • 添加项目依赖

             1.17.0                              org.apache.flink             flink-streaming-java             ${flink.version}                             org.apache.flink             flink-clients             ${flink.version}        

WordCount 代码编写

批处理

批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

1)数据准备
(1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt
(2)在words.txt中输入一些文字,例如:

hello flink hello world hello java 

2)代码编写
(1)在com.yudan.wc包下新建Java类BatchWordCount,在静态main方法中编写代码。具体代码实现如下:

import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;  public class BatchWordCount {      public static void main(String[] args) throws Exception {          // 1. 创建执行环境         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();                  // 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)         DataSource lineDS = env.readTextFile("input/words.txt");                  // 3. 转换数据格式         FlatMapOperator> wordAndOne = lineDS.flatMap(new FlatMapFunction>() {              @Override             public void flatMap(String line, Collector> out) throws Exception {                  String[] words = line.split(" ");                  for (String word : words) {                     out.collect(Tuple2.of(word,1L));                 }             }         });          // 4. 按照 word 进行分组         UnsortedGrouping> wordAndOneUG = wordAndOne.groupBy(0);                  // 5. 分组内聚合统计         AggregateOperator> sum = wordAndOneUG.sum(1);          // 6. 打印结果         sum.print();     } } 

(2)输出

(flink,1) (world,1) (hello,3) (java,1) 

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar 

流处理

import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;  import java.util.Arrays;  public class StreamWordCount {      public static void main(String[] args) throws Exception {              // 1. 创建流式执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();                  // 2. 读取文件         DataStreamSource lineStream = env.readTextFile("input/words.txt");                  // 3. 转换、分组、求和,得到统计结果         SingleOutputStreamOperator> sum = lineStream.flatMap(new FlatMapFunction>() {             @Override             public void flatMap(String line, Collector> out) throws Exception {                  String[] words = line.split(" ");                  for (String word : words) {                     out.collect(Tuple2.of(word, 1L));                 }             }         }).keyBy(data -> data.f0)            .sum(1);          // 4. 打印         sum.print();                  // 5. 执行         env.execute();     } } 

输出:

3> (java,1) 5> (hello,1) 5> (hello,2) 5> (hello,3) 13> (flink,1) 9> (world,1) 
  • 主要观察与批处理程序BatchWordCount的不同:
    • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
    • 转换处理之后,得到的数据对象类型不同。
    • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
    • 代码末尾需要调用env的execute方法,开始执行任务。

读取socket文本流

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据。

import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;  import java.util.Arrays;  public class SocketStreamWordCount {      public static void main(String[] args) throws Exception {          // 1. 创建流式执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();                  // 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号         DataStreamSource lineStream = env.socketTextStream("hadoop102", 7777);                  // 3. 转换、分组、求和,得到统计结果         SingleOutputStreamOperator> sum = lineStream.flatMap((String line, Collector> out) -> {             String[] words = line.split(" ");              for (String word : words) {                 out.collect(Tuple2.of(word, 1L));             }         }).returns(Types.TUPLE(Types.STRING, Types.LONG))                 .keyBy(data -> data.f0)                 .sum(1);          // 4. 打印         sum.print();                  // 5. 执行         env.execute();     } } 

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

相关内容

热门资讯

科技新动态!开心跑得快有辅助工... 科技新动态!开心跑得快有辅助工具吗(透明挂)外挂透明挂辅助神器(2021已更新)(哔哩哔哩)1)开心...
4分钟实锤!吉祥麻将,微扑克切... 4分钟实锤!吉祥麻将,微扑克切实是真的有挂,介绍教程(有挂揭秘);一、吉祥麻将AI软件牌型概率发牌机...
实测发现!鄂州晃晃外 挂(透视... 实测发现!鄂州晃晃外 挂(透视)透视辅助工具(2021已更新)(哔哩哔哩)1、鄂州晃晃外 挂系统规律...
三分钟了解!好彩麻将怎样才可以... 三分钟了解!好彩麻将怎样才可以拿好牌(透视辅助)外挂透明挂辅助机制(2020已更新)(哔哩哔哩)1、...
九分钟辅助!斗棋辅助器在哪,w... 九分钟辅助!斗棋辅助器在哪,wepoker本来真的是有挂,教你攻略(有挂教程)1、下载好斗棋辅助器在...
记者揭秘!!广东雀神麻雀辅助器... 记者揭秘!!广东雀神麻雀辅助器在哪里下载(透视)透视辅助app(2020已更新)(哔哩哔哩)1、很好...
终于清楚!皮皮跑胡子输赢规律(... 终于清楚!皮皮跑胡子输赢规律(辅助挂)外挂透明挂辅助机制(2026已更新)(哔哩哔哩)1)皮皮跑胡子...
二分钟科普!花城牌舍系统规律,... 二分钟科普!花城牌舍系统规律,aAPOKER竟然存在有挂,揭秘教程(有挂插件)进入游戏-大厅左侧-新...
一分钟教你!心悦手机麻将辅牌器... 一分钟教你!心悦手机麻将辅牌器(透视辅助)外挂透视辅助挂(2024已更新)(哔哩哔哩)1、每一步都需...
科技新动态!四方河南麻将赢牌技... 科技新动态!四方河南麻将赢牌技巧(透视)外挂透明挂辅助神器(2026已更新)(哔哩哔哩)1、每一步都...