把流处理需要的额外数据保存成一个状态,然后针对这条数据进行处理,并且更新状态。这就是所谓的"有状态的流处理"
在IDEA中创建Maven项目
添加项目依赖
1.17.0 org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version}
批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
1)数据准备
(1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt
(2)在words.txt中输入一些文字,例如:
hello flink hello world hello java
2)代码编写
(1)在com.yudan.wc包下新建Java类BatchWordCount,在静态main方法中编写代码。具体代码实现如下:
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class BatchWordCount { public static void main(String[] args) throws Exception { // 1. 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本) DataSource lineDS = env.readTextFile("input/words.txt"); // 3. 转换数据格式 FlatMapOperator> wordAndOne = lineDS.flatMap(new FlatMapFunction>() { @Override public void flatMap(String line, Collector> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word,1L)); } } }); // 4. 按照 word 进行分组 UnsortedGrouping> wordAndOneUG = wordAndOne.groupBy(0); // 5. 分组内聚合统计 AggregateOperator> sum = wordAndOneUG.sum(1); // 6. 打印结果 sum.print(); } }
(2)输出
(flink,1) (world,1) (hello,3) (java,1)
从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class StreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文件 DataStreamSource lineStream = env.readTextFile("input/words.txt"); // 3. 转换、分组、求和,得到统计结果 SingleOutputStreamOperator> sum = lineStream.flatMap(new FlatMapFunction>() { @Override public void flatMap(String line, Collector> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } } }).keyBy(data -> data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } }
输出:
3> (java,1) 5> (hello,1) 5> (hello,2) 5> (hello,3) 13> (flink,1) 9> (world,1)
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据。
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class SocketStreamWordCount { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号 DataStreamSource lineStream = env.socketTextStream("hadoop102", 7777); // 3. 转换、分组、求和,得到统计结果 SingleOutputStreamOperator> sum = lineStream.flatMap((String line, Collector> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)) .keyBy(data -> data.f0) .sum(1); // 4. 打印 sum.print(); // 5. 执行 env.execute(); } }
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2