【Flink CDC(一)】实现mysql整表与增量读取
创始人
2024-11-28 06:06:47
0

文章目录

  • 一. 运行前准备
    • 1. 依赖
      • 1.1. Maven dependency
      • 1.2. SQL Client JAR(推荐)
    • 2. 配置 MySQL 服务器(必须)
  • 二. 功能说明
    • 1. 启动模式
    • 2. 全量阶段支持 checkpoint
    • 3. 关于无主键表
    • Exactly-Once 处理
  • 三. 实战
    • 1. 实现mysql整表与增量表同步
  • FAQ

MySQL CDC 连接器允许从 MySQL 数据库读取快照数据(比如:flink任务消费时刻的整表数据)和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。

本篇只关注mysql整表与增量读取的实现,对于并发读取等能力后续再探索。

 

一. 运行前准备

1. 依赖

1.1. Maven dependency

   com.ververica   flink-connector-mysql-cdc      2.4.0  

 

1.2. SQL Client JAR(推荐)

下载 flink-sql-connector-mysql-cdc-2.4.0.jar 到 /lib/ 目录下。

 

2. 配置 MySQL 服务器(必须)

你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。

# 创建用户 mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';  # 赋权 mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';  # 刷新权限 mysql> FLUSH PRIVILEGES; 

注意:

scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

 

二. 功能说明

1. 启动模式

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog

  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取

  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog
    的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改

  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。

  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

  MySQLSource.builder()     .startupOptions(StartupOptions.earliest()) // 从最早位点启动     .startupOptions(StartupOptions.latest()) // 从最晚位点启动     .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动     .startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动     .startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动     ...     .build()     CREATE TABLE mysql_source (...) WITH (     'connector' = 'mysql-cdc',     'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动     'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动     'scan.startup.mode' = 'specific-offset', -- 从特定位点启动       'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名     'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置     'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合      'scan.startup.mode' = 'timestamp', -- 从特定位点启动     'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳     ... )  

 

2. 全量阶段支持 checkpoint

增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

 

3. 关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

  1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。

  2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:

  • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
  • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

 

Exactly-Once 处理

MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。

 

三. 实战

1. 实现mysql整表与增量表同步

-- 'scan.startup.mode'= 'initial'  --  CREATE TABLE tjy_sql1   (     `id` int,     `name` string,     `face` string    ,PRIMARY KEY(id) NOT ENFORCED   ) WITH (           'connector' = 'mysql-cdc',           'hostname' = 'xxx',           'port' = '3306',           'username' = 'middle_test',           'password' = '123456',           'database-name' = 'middle_test',           'table-name' = 'tjy_fortest1'          -- ,'scan.incremental.snapshot.enabled' = 'false'          --  initial: 默认值,全表同步,然后进行增量同步;        --  'scan.startup.mode'= 'initial'          -- 'debezium.snapshot.mode' = 'initial'      );          CREATE TABLE tjy_sql1_sink    (     `id` int,     `name` string,     `face` string     ,PRIMARY KEY(id) NOT ENFORCED    ) WITH (              'connector' = 'mysql-x',              'url' = 'jdbc:mysql://xxx:3306/middle_test?useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true',              'username' = 'middle_test',              'password' = '123456',              'table-name' = 'flink_type',              'table-name' = 'tjy_fortest2'          );         insert into tjy_sql1_sink select * from tjy_sql1;  

 

FAQ

相关问题:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

可能涉及到的问题

在这里插入图片描述

 

参考:
官网:https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc%28ZH%29.html

相关内容

热门资讯

欢乐棋牌!wepower有辅助... 欢乐棋牌!wepower有辅助器(透视挂)苹果版本教程-都是真的有挂(百度贴吧)1、许多玩家不知道欢...
WPk!wepoke透明真的(... WPk!wepoke透明真的(透视辅助)插件挂教程-原来真的有挂(微博热搜)1、操作简单,无需注册,...
aapOKER!wepoke辅... aapOKER!wepoke辅助透视教程(透视辅助)输赢教程-好像真的有挂(头条)1、aapOKER...
AaPOKER!wepoke挂... AaPOKER!wepoke挂(辅助挂)智能教程-原来真的有挂(抖音);1、AaPOKER机器人多个...
wpk有透视辅助!wepoke... wpk有透视辅助!wepoke辅助是真的,AApoker的确真的有挂,详细教程(有挂教程);该软件可...
AApOKER!wepoke中... AApOKER!wepoke中牌率(透视挂)系统教程-果真真的有挂(微博热搜)1、AApOKER!w...
微扑克辅助挂!wepoke里面... 微扑克辅助挂!wepoke里面有ai,We辅poker助原来真的有挂,揭秘攻略(有挂攻略);1、让任...
wepoke有挂!来玩app德... wepoke有挂!来玩app德州辅助器,poker master安卓版的确是有挂的,技巧教程(有挂规...
轰趴十三水!wepoke游戏数... 轰趴十三水!wepoke游戏数据有说法(透视挂)开挂教程-果然真的有挂(知乎)轰趴十三水辅助器中分为...
AaPOKER!wopoker... AaPOKER!wopoker游戏辅助器(透视辅助)黑科技教程-果然真的有挂(哔哩哔哩)1)AaPO...