大家好,我是范佳,是Apache SeaTunnel社区的PMC member。今天给大家分享一些基于Apache SeaTunnel二次开发的内容。
这部分内容主要涉及代码层面的知识,如果大家有什么疑问,欢迎来社区找我交流!
大部分数据开发工程师基于Apache SeaTunnel的二次开发,可能做的就是任务提交,任务的一些监控,还有在任务没有跑起来之前,我们可能需要预先知道跑起来之后的可能一些结果。
基于以上内容,我将从五个部分来分享相关内容:
Apache SeaTunnel 是一个高性能的实时和离线数据批处理平台,自加入Apache软件基金会以来已有两年多时间,期间社区不断发展,增加了许多新功能和特性。
感兴趣的同学可以访问官网:https://seatunnel.apache.org/
SeaTunnel支持多种数据处理引擎,包括市场上流行的开源引擎如Spark和Flink,以及SeaTunnel自研的Zeta引擎。这使得SeaTunnel能够灵活应对不同的数据处理需求,无论是大规模数据集还是实时数据流。
项目提供了广泛的连接器支持,使得SeaTunnel可以轻松接入各种数据源和目的地,从而简化了数据集成过程。这一特性对于需要将数据从多个源汇总到单一系统的企业尤为重要。
对HTTP的支持是SeaTunnel的又一亮点,特别是对于开发者来说,因为它可以显著降低适配成本。通过HTTP支持,开发者可以更容易地将SeaTunnel集成到现有的Web应用和服务中。
SeaTunnel的流批一体功能确保了无缝的数据处理,无论是流处理还是批处理,都能在同一个平台上高效运行。这一特性简化了架构,减少了维护的复杂性。
作为一个数据同步引擎,SeaTunnel提供了流速控制功能,这对保护下游系统不被过载非常关键。尤其是在上游数据量大而下游系统承载能力有限的场景中,流速控制显得尤为重要。
自动建表功能可以极大地帮助简化数据处理流程,特别是对下游系统来说。这一功能允许SeaTunnel根据数据自动创建表结构,减少了手动介入的需要,提高了数据管道的灵活性和效率。
一般来讲,我们基于开源软件二开,第一步就是启动,而启动SeaTunnel任务的第一步是准备用户界面,确保二开后的用户可以通过界面触发或定时提交任务。
一旦用户界面设置完成,以下是使用Shell脚本提交任务的基本步骤:
编写Shell脚本:创建一个Shell脚本,用于封装启动命令和任务参数。
执行命令:通过执行Shell脚本来提交任务到SeaTunnel引擎。
在任务提交时,我们的引擎会返回一个任务ID。
这个ID在使用脚本模式启动时只会打印在日志文件中。如果需要监控任务,需要解析日志文件以获取任务ID。
然而,这种方式比较滞后,因为ID是引擎端生成的,可能需要等待一段时间才能得到。
为了解决这个问题,我们新增了一个功能,允许在提交任务时配置自定义ID。这个ID可以由第三方服务或集成SeaTunnel的平台生成,然后传递给SeaTunnel,SeaTunnel会使用该ID作为任务的唯一标识。
这项功能虽然小,但对于二次开发或集成非常有用,避免了解析日志或等待SeaTunnel生成ID的过程。
通过Shell脚本启动任务时,可以在日志文件中获取任务ID。
我们也支持通过HTTP提交任务。这种方式无需额外启动客户端,对第三方集成更加友好。
HTTP提交任务的方式更加自然和通用。
对于更深度、精细化和功能更强大的任务提交方式,推荐使用SeaTunnel Client。
SeaTunnel Client是一个核心类,通过它可以提交所有任务。无论是引擎内部代码还是外部集成代码,都可以使用这个Client提交任务到集群。
通过SeaTunnel Client,我们可以在JVM进程中直接提交任务。例如,在一个Spring服务中,用户点击启动按钮后,后端可以直接使用SeaTunnel Client提交任务,而不需要启动一个额外的HTTP或Java进程。
这种方式的好处包括:
启动任务后,我们需要对SeaTunnel进行监控,以了解任务的状态。
例如,任务是否启动成功?运行了多久?数据是否成功读取?任务是否失败?失败的原因是什么?这些都是二次开发时需要关注的内容,因为我们不能保证所有任务都能正常运行。
我们可以通过以下三种方式监控任务状态:
比如说下面的截图,这个就是一个job result
然后这个 job result
也是我们SeaTunnel client 返给我们的,然后我们就可以看到里面的状态。
如何调用SeaTunnel Client?
传入任务ID即可获取任务状态,任务是正在运行还是失败。对于集成开发来说,获取任务异常信息非常重要。如果通过Shell脚本查看日志,用户需要手动解析日志文件。
这在集成的Web页面中并不方便。因此,我们推荐通过HTTP或SeaTunnel Client获取异常信息。
除了监控任务状态之外,我们还需要有指标。
例如,任务虽然在运行,但它是否真正读取到了数据?读取了多少数据?写入了多少数据?吞吐量是多少?这些都是需要关注的指标。
SeaTunnel引擎内部提供了对应的指标获取方式,有以下三种方式: 1.Shell脚本:通过Shell脚本可以查询任务的各项指标。 2.HTTP:通过HTTP接口可以获取任务的各项指标。 3.SeaTunnel Client:通过SeaTunnel Client可以查询任务的各项指标。
我们可以监控的核心指标包括:
●读取数量 ●读取的字节数 ●QPS(每秒查询率) ●每秒字节数 ●写入数量 ●写入的字节数
对于CDC(Change Data Capture),我们比较关心的是CDC的延迟,即从CDC源端的数据产生到SeaTunnel读取到它的延迟是多少。
目前,我们的支持是每个任务级别的,但对于每个任务中的每张表的支持还比较弱,因为SeaTunnel支持多表任务,即一个任务可以读取和写入多张表。我们正在改进这方面的支持。
除了查询指标外,我们还可以将指标定时对外暴露,例如暴露到Prometheus或SeaTunnel的指标体系中。
目前,SeaTunnel对这方面的支持还比较弱,但我们希望在未来能更好地支持将指标对外抛出到第三方组件,如Prometheus,这样对用户会更友好。
我们提供的默认指标可能不能满足所有用户或开发者的需求。那么,如何定制属于内部系统或二次开发系统的指标呢?
定制化指标集成实际上很简单。可以通过我们的context对象来实现。这个context对象包含一个MetricsContext对象,我们可以向其中注册自定义指标。
这样就完成了定制化指标的集成,通过这种方式注册的自定义指标,可以通过HTTP、Shell脚本或SeaTunnel Client查询和展示。
除了指标外,如果需要一些瞬发性的事件处理,例如在某些事件发生时收到通知,可以使用SeaTunnel内部设计的事件系统。
事件示例
SeaTunnel的事件系统可以处理以下事件:
后续我们会实现DDL事件的发送功能。社区正在开发的DDL功能主要是为了应对schema变化,例如在MySQL CDC运行过程中,schema发生变化会产生DDL事件。
我们可以将这些DDL事件包装成对应的事件发送出去。外部系统可以接收到这些事件,比如某个表增加了一列或删除了一列,然后进行相应的展示或处理。这是事件系统的作用。
自定义事件
就像我们可以自定义metrics一样,事件也可以自定义。自定义事件的方式与metrics非常相似。用户可以实现自己的事件来处理特定业务需求。
在metrics中,可以通过context对象获取MetricsContext。同样地,在事件系统中,我们可以获取EventListener,然后通过它注册和处理自定义事件。
我们提供了对应的接口EventHandler,它是一个SPI实现。用户可以实现自己的handler,然后将其放到lib目录下,或者打包到应用中。
有了这个handler之后,Master节点会发现所有的EventHandler,并调用它们的handle方法。具体的事件处理逻辑由实现的handler决定。
我们内部提供了一个默认的实现:JobEventHttpReporterHandler。这个handler会将事件通过HTTP接口发送到用户配置的地址。
用户可以通过这个接口接收引擎中的事件,例如任务开始、任务结束、数据到达等。
事件系统不仅用于捕获运行时的事件,还可以用于DDL事件。例如,MySQL CDC运行过程中,schema变化会产生DDL事件。我们可以将这些DDL事件包装并发送出去,外部系统可以接收到这些事件并进行相应处理,例如展示schema变化、执行后续操作等。
除了任务级别的监控,我们还需要关注集群节点的健康状况。作为一个集群系统,了解整个集群是否正常运作非常重要。这些信息可以通过SeaTunnel Client获取。
通过SeaTunnel Client,我们可以获取到集群的一些健康信息。这些信息包括但不限于:
这些与性能和集群稳定性相关的信息能够帮助我们更好地监控和维护系统。
例如,我们可以通过SeaTunnel Client获取集群节点的健康状况,并在页面上展示出来。如果在3个节点的集群中,只有2个节点正常,我们可以通过接口判断并处理异常节点。
SaveMode与Sink密切相关,决定了在写入数据之前执行的一系列操作。这些操作包括自动建表、表重建、数据清空或数据追加。
通过配置schema_save_mode和data_save_mode,可以定义这些行为。
我们预览的核心是 SaveMode 到底会怎么操作。这一块是纯代码层面,如果要集成的话,肯定需要写代码。虽然不像 HTTP 那么简单,但它非常有用。
例如,我现在任务还没开始跑,或者即将定时运行。我想知道在配置了表重建的情况下,任务到底会创建表还是不会创建表。在任务运行前,我们可以通过行为预览确定 SaveMode 和 data SaveMode 的行为。这对于涉及表操作的情况尤其重要,因为表操作可能比较敏感。
比如说我们从 source 端读取的是 MySQL 的表,MySQL 表在二次开发中可能会涉及到一个 CatalogTable。
我们会将外部系统的表抽象成内部统一的 CatalogTable。例如,从 MySQL 读取一张表,然后转换成系统内部的 CatalogTable。
如果任务配置读取表 a,我们可以通过页面上的一些操作,预览表 a 的输出结构。
具体步骤如下:
这种预览在任务还没有真正跑起来时就可以执行,确保任务读取的表结构是正确的。
例如,我们有 SQL 作为 transform 操作,希望在 SQL 中改一个字段的类型,同时增加和删除一些字段。
预览功能可以在任务运行前确认这些操作是否会如期执行。
具体步骤如下:
从 transform 输出的表结构,需要传入 Sink 进行写入操作。涉及到自动建表时,我们可以通过 SaveModeHandler 确认以下内容:
SaveMode handler 会根据 schema_save_mode 和 data_save_mode 配置,以及 catalog 中的表判断是否需要建表。
当我们具体操作Catalog,比如说Catalog 有一个 exist 的方法去判断我们的 table pass ,根据我们的 schema_save_mode, data_save_mode 去判断我们的接下来的这一块的行为到底是什么?
SaveMode Handler 提供了能力,例如:
通过 SaveModeHandler 提供的能力,可以预览和确认任务在运行时是否会创建表或进行其他操作。
我们执行 SQL 时,可以提前看到将要建表的 SQL。例如:
我们提供了 Catalog 预览功能,调用 preAction 方法可以预览建表或删除表的 SQL。
在建表时,输出表结构的类型非常重要。我们需要知道内存中看到的类型在自动建表时会被建成什么类型。
为此,SeaTunnel 内部有一套叫 TypeConverter 的接口体系。
通过 TypeConverter,我们可以预览并确认 SeaTunnel 和数据库之间的字段类型交互。例如,通过转换和反向转换,我们可以知道表字段类型在 SeaTunnel 和数据库之间的具体表现。
在行为预览中,我们可以通过 TypeConverter 接口体系实现类型转换的预览。预览与实际运行时的转换结果一致,因为实际运行中也是通过这套代码进行类型转换。
通过集成 Type Converter 接口,我们可以在预览时确认建表的具体类型。
例如:
今天给大家主要分享了以下内容:
SeaTunnel 内部已经实现了许多功能,通过集成这些功能,可以实现更高效、更兼容的二次开发。
希望这些接口和设计能让大家在集成和二次开发时更加简单和高效,欢迎大家基于这些标准化体系进行扩展,并将实现的功能回馈给社区,使 SeaTunnel 更加丰富和强大。
通过本文的分享,能够帮助大家对 SeaTunnel 的二次开发有更深入的了解。如果大家有任何问题,欢迎随时与我交流。谢谢大家!
本文由 白鲸开源科技 提供发布支持!
上一篇:AI开发者大会_创建大会