Flink入门,flink接入kafka数据源,消费数据并处理数据
创始人
2024-11-12 22:04:45
0

## 使用Flink接收kafka数据,处理后发送到新topic

首先先下载kafka的linux版本,可以搜索阿里云的镜像进行下载,速度很快
http://mirrors.aliyun.com/apache/kafka/
安装过程可自行搜索。。。
注意,安装号kafka后需要修改配置文件

vi kafka的安装目录/config/server.properties 

将文件中的listeners注释去掉,并修改值为你虚拟机的ip,如下图
192.168.80.108是我虚拟机的ip
然后启动kafka,这里要切换到bin目录下

sh kafka-server-start.sh -daemon ../config/server.properties 

启动完可以切换到logs目录下查看日志
再打开两个shell页面,分别为生产和消费使用

vi server.log 

启动生产者服务,bin目录下

./kafka-console-producer.sh --topic 自定义topic名称 --bootstrap-server 你虚拟机ip:9092 

启动成功页面

启动消费者服务,bin目录下

./kafka-console-consumer.sh --topic 自定义topic名称 --bootstrap-server 你虚拟机ip:9092 

启动成功页面
然后尝试在生成者输入字符,回车发送
在这里插入图片描述
在这里插入图片描述
消费者可以收到消息,kafka安装完成,如果没有收到可以检查topic是否一致,ip是否正确
下面引入flink相关依赖

 		UTF-8 		1.19.0 		1.8 		2.12 		${target.java.version} 		${target.java.version} 		2.17.1 	  	 		 			apache.snapshots 			Apache Development Snapshot Repository 			https://repository.apache.org/content/repositories/snapshots/ 			 				false 			 			 				true 			 		 	  	 		 		 		              			org.apache.flink 			flink-table-api-java-bridge 			${flink.version} 			provided 		 		 			org.apache.flink 			flink-clients 			${flink.version} 			provided 		           		 			org.apache.flink 			flink-connector-jdbc 			1.16.0 		  		  		              			org.apache.flink 			flink-connector-kafka 			1.16.0 		  		 		 		 			org.apache.logging.log4j 			log4j-slf4j-impl 			${log4j.version} 			runtime 		 		 			org.apache.logging.log4j 			log4j-api 			${log4j.version} 			runtime 		 		 			org.apache.logging.log4j 			log4j-core 			${log4j.version} 			runtime 		 		 			org.projectlombok 			lombok 			1.18.30 			compile 		 	 

构建时会提示找不到类,在idea中勾选如图选项,或者在pom文件中修改scope的值为compile
在这里插入图片描述
下来开始写代码

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //设置并行数         env.setParallelism(4);         // 每10000毫秒进行一次checkpoint         env.enableCheckpointing(10000);         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);                  // 设置 Kafka 消费者属性         Properties properties = new Properties();         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip");         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");         properties.setProperty("key.deserializer", StringDeserializer.class.getName());         properties.setProperty("value.deserializer", StringDeserializer.class.getName());                  // 创建Kafka消费者,将消费者添加到流         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("你设定的topic", new SimpleStringSchema(), properties);         //设置只读取最新数据         consumer.setStartFromLatest();          //添加数据源         DataStreamSource source = env.addSource(consumer);          source.print();          DataStream mappedStream = source.map(new MapFunction() {             @Override             public String map(String value) throws Exception {                 return value.toUpperCase(); //进行数据治理 例如,将值转换为大写             }         });          //创建一个Flink生产者,将处理过的数据发回去         mappedStream.addSink(new FlinkKafkaProducer<>("新的topic", new SimpleStringSchema(), properties));                  env.execute("Flink Kafka Integration"); 

上面这种构建kafka数据源的方式官方显示已经过时,有另一种构建方式

         KafkaSource source = KafkaSource.builder()                 .setTopics("test")                       .setGroupId("test-consumer-group")                 .setBootstrapServers("ip:9092")                 .setStartingOffsets(OffsetsInitializer.latest())   //消费最新数据                 .setValueOnlyDeserializer(new SimpleStringSchema()).build();         DataStream dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); 

然后运行代码,在生产者生产一条数据进行查看
生产者在这里插入图片描述
查看代码控制台在这里插入图片描述
最后运行新的消费者,消费处理后的数据,只需修改topic
在这里插入图片描述
可以看到,flink英文变成了大写,简单接入完成

相关内容

热门资讯

妙计辅助!四川途游辅助(辅助挂... 妙计辅助!四川途游辅助(辅助挂)真是存在有辅助挂(有挂技巧)一、四川途游辅助游戏安装教程牌型概率发牌...
阶段辅助!欢乐情怀开挂(辅助挂... 阶段辅助!欢乐情怀开挂(辅助挂)都是真的是有辅助器(有挂助手)进入游戏-大厅左侧-新手福利-激活码辅...
演示辅助!潮友会鱼虾蟹辅助(辅... 您好,潮友会鱼虾蟹辅助这款游戏可以开挂的,确实是有挂的,需要了解加去威信【485275054】很多玩...
绝活儿辅助!广东雀神智能插件是... 绝活儿辅助!广东雀神智能插件是真的(辅助挂)其实是有辅助软件(存在有挂)1、广东雀神智能插件是真的公...
绝活辅助!天天爱消除自动消除辅... 绝活辅助!天天爱消除自动消除辅助(辅助挂)一贯是有辅助工具(有挂透明挂);运天天爱消除自动消除辅助辅...
模块辅助!凑一桌关春天怎么才能... 模块辅助!凑一桌关春天怎么才能开挂(辅助挂)果然真的有辅助挂(有挂技术)1、凑一桌关春天怎么才能开挂...
模块辅助!聚友联盟辅助器(辅助... 模块辅助!聚友联盟辅助器(辅助挂)一直真的是有辅助器(证实有挂)1、起透看视 聚友联盟辅助器辅助软件...
指引辅助!途游小程序辅助器(辅... 指引辅助!途游小程序辅助器(辅助挂)果然确实有辅助神器(新版有挂)1、在途游小程序辅助器插件功能辅助...
阶段辅助!手机卡五星辅助软件(... 阶段辅助!手机卡五星辅助软件(辅助挂)确实是真的有辅助方法(确实有挂)1、手机卡五星辅助软件免费辅助...
手段辅助!芒果辅助器安卓版(辅... 手段辅助!芒果辅助器安卓版(辅助挂)原来真的有辅助脚本(有挂解惑)1、这是跨平台的芒果辅助器安卓版轻...