hivehook 表血缘与字段血缘的解析
创始人
2025-01-15 23:03:38
0

代码

import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.metastore.api.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  import java.util.*;  public class HiveServerQueryLogHook implements ExecuteWithHookContext { 	static final Logger LOG = LoggerFactory.getLogger(HiveServerQueryLogHook.class); 	     @Override     public void run(HookContext hookContext) throws Exception {         printLineageInfo(hookContext);     }     private void printLineageInfo(HookContext hookContext) {         // 输出表         Set inputTables = new HashSet<>();;          // 输入表         Set outputTables = new HashSet<>();;          // 字段血缘 Map         // key为输出字段,value为来源字段数组         Map> fieldLineage = new HashMap<>(); 		 		// 从 `hookContext` 中获取 `Linfo` 并返回其 entry set,这意味着我们会获取到一个包含键值对的集合;遍历 `hookContext` 中 `Linfo` 的 entry set         for(Map.Entry dep: hookContext.getLinfo().entrySet()){             // 表血缘             // 将 `dep.getKey()` 转换为一个 Optional 对象,以防止空指针异常。             Optional.ofNullable(dep.getKey())             		// 如果 `dep.getKey()` 不为空,则将其转换为 `DataContainer` 对象。                     .map(LineageInfo.DependencyKey::getDataContainer)                     // 如果 `DataContainer` 不为空,则获取其表信息。                     .map(LineageInfo.DataContainer::getTable)                     // 将表信息传递给 `dealOutputTable` 方法进行处理。                     .map(this::dealOutputTable)                     // 如果处理后的结果不为空,则将其添加到 `outputTables` 集合中。                     .ifPresent(outputTables::add);             Optional.ofNullable(dep.getValue())                     .map(LineageInfo.Dependency::getBaseCols)                     .ifPresent(items -> items.stream().map(LineageInfo.BaseColumnInfo::getTabAlias)                             .map(LineageInfo.TableAliasInfo::getTable)                             .map(this::dealOutputTable)                             .forEach(inputTables::add));              // 字段血缘             // 将 `dep.getKey()` 转换为一个 Optional 对象,以防止空指针异常。             String column = Optional.ofNullable(dep.getKey())             		// 如果 `dep.getKey()` 不为空,则将其传递给 `dealDepOutputField` 方法进行处理。                     .map(this::dealDepOutputField)                     // 如果处理后的结果不为空,则将其作为键放入 `fieldLineage` 中,并关联一个空的 ArrayList,然后将该键返回给 `column` 变量。                      .map(aimField -> {                         fieldLineage.put(aimField, new ArrayList<>());                         return aimField;                     // 如果处理后的结果为空,则将 `column` 设置为 null。                     }).orElse(null);                     // 将 `dep.getValue()` 转换为一个 Optional 对象,以防止空指针异常。             Optional.ofNullable(dep.getValue())             		// 如果 `dep.getValue()` 不为空,则获取其基础列信息。                      .map(LineageInfo.Dependency::getBaseCols)                     // 如果基础列信息不为空,则将其转换为流并依次处理,将处理后的结果添加到 `fieldLineage` 中对应 `column` 的列表中。                     .ifPresent(items -> items.stream()                             .map(this::dealBaseOutputField)                             .forEach(item -> {                                 fieldLineage.get(column).add(item);                             }));         }         LOG.info("inputTables : {} ",inputTables);         LOG.info("outputTables : {} ",outputTables);         LOG.info("fieldLineage : {} ",fieldLineage.toString());     }     // 处理表的格式为 库.表     private String dealOutputTable(Table table) {         String dbName = table.getDbName();         String tableName = table.getTableName();         return dbName != null ? String.format("%s.%s", dbName, tableName) : tableName;     }      // 处理输出字段的格式     private String dealDepOutputField(LineageInfo.DependencyKey dependencyKey) {         try{             String tableName = dealOutputTable(dependencyKey.getDataContainer().getTable());             String field = dependencyKey.getFieldSchema().getName();             return String.format("%s.%s", tableName, field);         }catch (Exception e) {             LOG.error("deal dep output field error" + e.getMessage());             return null;         }     }      // 处理来源字段的格式     private String dealBaseOutputField(LineageInfo.BaseColumnInfo baseColumnInfo) {         try{             String tableName = dealOutputTable(baseColumnInfo.getTabAlias().getTable());             String field = baseColumnInfo.getColumn().getName();             return String.format("%s.%s", tableName, field);         }catch (Exception e) {             LOG.error("deal base output field error" + e.getMessage());             return null;         }     }  

配置

编译后生成jar文件添加到hive运行环境,设置hook

1)jar放置/disk1/hive-jars/hook  2)设置env,conf/hive-env.sh pushd /disk1/hive-jars/hook export HOOK_DEPS=$(ls *.jar| xargs -Ixx echo "`pwd`/xx" | sort | tr '\n' ':') popd export HIVE_AUX_JARS_PATH=${xxx%:}:${HOOK_DEPS%:}  3)修改hive-site.xml        hive.exec.post.hooks     com.xxx.xxx.HiveServerQueryLogHook            Comma-separated list of post-execution hooks to be invoked for each statement.       A post-execution hook is specified as the name of a Java class which implements the       org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.          

测试

sql

use db_dev; CREATE TABLE IF NOT EXISTS `all_report_creator`( `project_id` INT COMMENT '项目组id', `report_id` INT COMMENT '报告id', `creator_id` INT COMMENT '报告创建者id', `nick` STRING COMMENT 'nick名字' )  STORED AS PARQUET; insert overwrite table all_report_creator select t1.project_id,t1.id,t2.id,t2.nick from db.new_report t1 left join db.bigviz_user t2 on t1.creator_id = t2.id where t1.project_id in(7,24) 

血缘

24/07/12 13:34:31 INFO xxx.HiveServerQueryLogHook: inputTables : [db.new_report, db.bigviz_user] 24/07/12 13:34:31 INFO xxx.HiveServerQueryLogHook: outputTables : [db_dev.all_report_creator] 24/07/12 13:34:31 INFO xxx.HiveServerQueryLogHook: fieldLineage : {db_dev.all_report_creator.project_id=[db.new_report.project_id], db_dev.all_report_creator.report_id=[db.new_report.id], db_dev.all_report_creator.creator_id=[db.bigviz_user.id], db_dev.all_report_creator.nick=[db.bigviz_user.nick]} 

参考

HIVE源码学习-hivehook尝试表血缘与字段血缘的解析
http://ganjiacheng.cn/article/2020/article_16_HIVE%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0-hivehook%E5%B0%9D%E8%AF%95%E8%A1%80%E7%BC%98%E8%A7%A3%E6%9E%90/

相关内容

热门资讯

近年来!新西部大厅辅助器,都是... 近年来!新西部大厅辅助器,都是存在有辅助脚本(有挂功能)-哔哩哔哩1、每一步都需要思考,不同水平的挑...
随着!潮汕汇app透视软件,切... 随着!潮汕汇app透视软件,切实是真的辅助脚本(确实有挂)-哔哩哔哩1、下载好潮汕汇app透视软件透...
此事备受玩家关注!越乡游辅助工... 此事备受玩家关注!越乡游辅助工具,原来存在有辅助安装(的确有挂)-哔哩哔哩1)越乡游辅助工具有没有挂...
出现新变化!微信小程序开心泉州... 出现新变化!微信小程序开心泉州辅助器,果然有挂辅助挂(确实有挂)-哔哩哔哩微信小程序开心泉州辅助器辅...
据悉!打两圈怀疑有外g挂,竟然... 据悉!打两圈怀疑有外g挂,竟然是有辅助软件(了解有挂)-哔哩哔哩打两圈怀疑有外g挂能透视中分为三种模...
有玩家发现!雀神麻将小程序辅助... 有玩家发现!雀神麻将小程序辅助软件,切实是有辅助下载(确实有挂)-哔哩哔哩1、雀神麻将小程序辅助软件...
截至发稿!填大坑辅助软件,都是... 截至发稿!填大坑辅助软件,都是真的是有辅助修改器(真实有挂)-哔哩哔哩1、进入到填大坑辅助软件是否有...
出现新变化!打哈儿麻将辅助下载... 出现新变化!打哈儿麻将辅助下载,原来有挂辅助修改器(有挂技术)-哔哩哔哩1、打哈儿麻将辅助下载透视辅...
现就发布提示!拼十辅助器,切实... 现就发布提示!拼十辅助器,切实真的是有辅助工具(真实有挂)-哔哩哔哩1、用户打开应用后不用登录就可以...
这一现象值得深思!青鸟辅助怎么... 这一现象值得深思!青鸟辅助怎么更新,其实有挂辅助工具(今日头条)-哔哩哔哩1、玩家可以在青鸟辅助怎么...