【Spark On Hive】—— 基于电商数据分析的项目实战
创始人
2025-01-10 15:04:09
0

在这里插入图片描述
在这里插入图片描述

文章目录

    • Spark On Hive 详解
        • 一、项目配置
            • 1. 创建工程
            • 2. 配置文件
            • 3. 工程目录
        • 二、代码实现
            • 2.1 Class SparkFactory
            • 2.2 Object SparkFactory

Spark On Hive 详解

本文基于Spark重构基于Hive的电商数据分析的项目需求,在重构的同时对Spark On Hive的全流程进行详细的讲解。

一、项目配置
1. 创建工程

首先,创建一个空的Maven工程,在创建之后,我们需要检查一系列配置,以保证JDK版本的一致性。同时,我们需要创建出Scala的编码环境。具体可参考以下文章:
Maven工程配置与常见问题解决指南

Scala01 —— Scala基础

2. 配置文件

2.1 在Spark On Hive的项目中,我们需要有两个核心配置文件。

  • pom.xml
      4.0.0      com.ybg     warehouse_ebs_2     1.0               8         8         UTF-8         3.1.2         2.12         3.1.3         8.0.33         3.1.2         2.3.5         2.10.0                                          org.apache.spark             spark-core_${spark.scala.version}             ${spark.version}                                         org.apache.spark             spark-sql_${spark.scala.version}             ${spark.version}                                         org.apache.spark             spark-hive_${spark.scala.version}             ${spark.version}                                         org.apache.hadoop             hadoop-common             ${hadoop.version}                                         com.mysql             mysql-connector-j             ${mysql.version}                                         org.apache.hive             hive-exec             ${hive.version}                                                   org.apache.logging.log4j                     log4j-slf4j-impl                                                                       org.apache.hbase             hbase-client             ${hbase.version}                                         com.fasterxml.jackson.core             jackson-core             ${jackson.version}                                         com.fasterxml.jackson.core             jackson-databind             ${jackson.version}                 
  • log4j.properties
    log4j.properties 文件的主要作用是配置日志系统的行为,包括控制日志信息的输出和实现滚动事件日志
log4j.rootLogger=ERROR, stdout, logfile log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout ----------------------- 滚动事件日志代码 ----------------------- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.DailyRollingFileAppender log4j.appender.logfile.DatePattern='.'yyyy-MM-dd log4j.appender.logfile.append=true --------------------------------------------------------------- log4j.appender.logfile.File=log/spark_first.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n 

2.2 组件核心配置文件
在这里插入图片描述
在工程的resources目录下,需要存放在虚拟机中大数据服务的核心组件的配置文件,以便于Spark On Hive中调用大数据组件服务能够正常进行。

3. 工程目录
二、代码实现
  • 创建数据校验方法 check:
    用于确保配置项的值有效。
    检查值是否为 null。
    对字符串类型的值进行非空和正则表达式匹配校验。

  • 创建配置设置方法 set:
    先校验配置项名称和值的有效性。
    使用 SparkConf.set 方法设置有效的配置项和值。

  • 单例对象 SparkFactory:
    提供基础配置方法,如设置应用名称、主节点等。
    提供 baseConfig 方法集中进行基础配置。
    提供 end 方法返回配置好的 SparkSession 实例。

  • 在 SparkFactory 类中实现上述方法:
    定义 build 方法,返回包含 check 和 set 方法的 Builder 对象。
    在 Builder 对象中实现各种配置方法,每个方法都调用 set 方法。
    使用 SparkSession.builder() 方法在 end 方法中创建并返回 SparkSession 实例。

SparkFactory配置表如下:
配置表

2.1 Class SparkFactory
  • 作用:SparkFactory类的作用是能够工厂化地创建和配置SparkSession实例,通过一系列的setcheck方法来确保配置项的有效性和正确性,并最终生成一个配置好的SparkSession实例。
  • 注意:我们需要在Spark官网配置页获取所有配置项的标准名称。
  • 代码
class SparkFactory {   def build():Builder={     new Builder {       val conf = new SparkConf()        /**        * 数据校验        * @param title 校验主题        * @param value 待校验的值        * @param regex 若待校验值为字符串,且有特定的规则,那么提供正则表达式进一步验证格式        */       private def check(title:String,value:Any,regex:String=null)={         if (null == value) {           throw new RuntimeException()(s"value for $title null pointer exception")         }         if(value.isInstanceOf[String]){           if(value.toString.isEmpty){             throw new RuntimeException(s"value for $title empty string exception")           }           if(regex!=null){             if(!value.toString.matches(regex)){               throw new RuntimeException(s"$title is not match regex $regex")             }           }         }       }        /**        * 先检查配置项名称是否正确        * 再检查配置项的值是否正确        * @param item 配置项名称        * @param value 配置项值        * @param regexValue 配置项正则规则        */       private def set(item:String,value:String,regexValue:String=null)={         check("name_of_config_item",item,"^spark\\..*")         check(item,value,regexValue)         conf.set(item,value)       }       // Base       private def setBaseAppName(appName:String)={         set("spark.app.name",appName,"^\\w+$")       }       private def setBaseMaster(master:String)={         set("spark.master",master,"yarn|spark://([a-z]\\w+|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}|local(\\[\\*|[1-9][0-9]*])")       }       private def setBaseDeployMode(deployMode:String)={         set("spark.submit.deployMode",deployMode,"client|cluster")       }       private def setBaseEventLogEnabled(eventLogEnabled:Boolean)={         set("spark.eventLog.enabled",s"$eventLogEnabled")       }       override def baseConfig(appName: String, master: String = "local[*]", deployMode: String = "client", eventLogEnabled: Boolean = false): Builder = {         setBaseAppName(appName)         setBaseMaster(master)         setBaseDeployMode(deployMode)         setBaseEventLogEnabled(eventLogEnabled)         this       }       // Driver       private def setDriverMemory(memoryGB:Int)={         set("spark.driver.memory",s"${memoryGB}g","[1-9]\\d*")       }       private def setDriverCoreNum(coreNum: Int) = {         set("spark.driver.cores", s"${coreNum}g", "[1-9]\\d*")       }       private def setDriverMaxResultGB(maxRstGB:Int)={         set("spark.driver.maxResultSize",s"${maxRstGB}g","[1-9]\\d*")       }       private def setDriverHost(driverHost:String)={         set("spark.submit.deployMode",driverHost,"localhost|[a-z]\\w+")       }       override def optimizeDriver(memoryGB: Int = 2, coreNum: Int = 1, maxRstGB: Int = 1, driverHost: String = "localhost"): Builder = {         setDriverCoreNum(coreNum)         setDriverMemory(memoryGB)          /**          * 每一个Spark行动算子触发的所有分区序列化结果大小上限          */         setDriverMaxResultGB(maxRstGB)          /**          * Standalone 模式需要设置 DriverHost,便于 executor 与 master 通信          */         if (conf.get("spark.master").startsWith("spark://")) {           setDriverHost(driverHost)         }         setDriverHost(driverHost)         this       }       // Executor       private def setExecutorMemory(memoryGB: Int) = {         set("spark.executor.memory", s"${memoryGB}g", "[1-9]\\d*")       }       private def setExecutorCoreNum(coreNum: Int) = {         set("spark.executor.cores", s"$coreNum", "[1-9]\\d*")       }       override def optimizeExecutor(memoryGB:Int=1,coreNum:Int=1):Builder={         setExecutorMemory(memoryGB)         /**          * Yarn模式下只能由1个核          * 其他模式下,核数为所有可用的核          */         if(!conf.get("spark.master").equals("yarn")){           setExecutorCoreNum(coreNum)         }         this       }       // Limit       private def setLimitMaxCores(maxCores:Int)={         set("spark.cores.max",s"$maxCores","[1-9]\\d*")       }       private def setLimitMaxTaskFailure(maxTaskFailure:Int)={         set("spark.task.maxFailures",s"$maxTaskFailure","[1-9]\\d*")       }       private def setLimitMaxLocalWaitS(maxLocalWaitS:Int)={         set("spark.locality.wait",s"${maxLocalWaitS}s","[1-9]\\d*")       }       override def optimizeLimit(maxCores:Int=4,maxTaskFailure:Int=3,maxLocalWaitS:Int=3):Builder={         if (conf.get("spark.master").startsWith("spark://")) {           setLimitMaxCores(maxCores)         }          /**          * 单个任务允许失败最大次数,超出会杀死本次任务          */         setLimitMaxTaskFailure(maxTaskFailure)          /**          * 数据本地化读取加载的最大等待时间          * 大任务:建议适当增加此值          */         setLimitMaxLocalWaitS(maxLocalWaitS)         this       }       // Serializer       override def optimizeSerializer(serde:String="org.apache.spark.serializer.JavaSerializer",clas:Array[Class[_]]=null):Builder={         /**          * 设置将需要通过网络发送或快速缓存的对象序列化工具类          * 默认为JavaSerializer          * 为了提速,推荐设置为KryoSerializer          * 若采用 KryoSerializer,需要将所有自定义的实体类(样例类)注册到配置中心          */         set("spark.serializer",serde,"([a-z]+\\.)+[A-Z]\\w*")         if(serde.equals("org.apache.spark.serializer.KryoSerializer")){           conf.registerKryoClasses(clas)         }         this       }       // Net       private def setNetTimeout(netTimeoutS:Int)={         set("spark.cores.max",s"${netTimeoutS}s","[1-9]\\d*")       }       private def setNetSchedulerMode(schedulerMode:String)={         set("spark.scheduler.mode",schedulerMode,"FAIR|FIFO")       }       override def optimizeNetAbout(netTimeOusS:Int=120,schedulerMode:String="FAIR"):Builder={         /**          * 所有和网络交互相关的超时阈值          */         setNetTimeout(netTimeOusS)          /**          * 多人工作模式下,建议设置为FAIR          */         setNetSchedulerMode(schedulerMode)         this       }       // Dynamic       private def setDynamicEnabled(dynamicEnabled:Boolean)={         set("spark.dynamicAllocation.enabled",s"$dynamicEnabled")       }       private def setDynamicInitialExecutors(initialExecutors:Int)={         set("spark.dynamicAllocation.initialExecutors",s"$initialExecutors","[1-9]\\d*")       }       private def setDynamicMinExecutors(minExecutors:Int)={         set("spark.dynamicAllocation.minExecutors",s"$minExecutors","[1-9]\\d*")       }       private def setDynamicMaxExecutors(maxExecutors:Int)={         set("spark.dynamicAllocation.maxExecutors",s"$maxExecutors","[1-9]\\d*")       }       override def optimizeDynamicAllocation(dynamicEnabled:Boolean=false,initialExecutors:Int=3,minExecutors:Int=0,maxExecutors:Int=6):Builder={         /**          * 根据应用的工作需求,动态分配executor          */         setDynamicEnabled(dynamicEnabled)         if(dynamicEnabled){           setDynamicInitialExecutors(initialExecutors)           setDynamicMinExecutors(minExecutors)           setDynamicMaxExecutors(maxExecutors)         }         this       }       override def optimizeShuffle(parallelism:Int=3,shuffleCompressEnabled:Boolean=false,maxSizeMB:Int=128,shuffleServiceEnabled:Boolean=true):Builder={         null       }       override def optimizeSpeculation(enabled:Boolean=false,interval:Int=15,quantile:Float=0.75F):Builder={         null       }       override def warehouseDir(hdfs:String):Builder={         null       }       override def end():SparkSession={         SparkSession.builder().getOrCreate()       }     }   } } 
2.2 Object SparkFactory
object SparkFactory {   trait Builder{     // 默认值能给就给     /**      * 基本配置      * @param appName      * @param master 默认是本地方式      * @param deployMode 默认是集群模式      * @param eventLogEnabled 生产环境打开,测试环境关闭      * @return      */     def baseConfig(appName:String,master:String="local[*]",deployMode:String="client",eventLogEnabled:Boolean=false):Builder     /**      * 驱动端优化配置      * @param memoryGB 驱动程序的内存大小      * @param coreNum 驱动程序的核数      * @param maxRstGB 驱动程序的最大结果大小      * @param driverHost 驱动程序的主机地址:驱动程序会在主机地址上运行,并且集群中的其他节点会通过这个地址与驱动程序通信      * @return      */     def optimizeDriver(memoryGB:Int=2,coreNum:Int=1,maxRstGB:Int=1,driverHost:String="localhost"):Builder     def optimizeExecutor(memoryGB:Int=1,coreNum:Int=1):Builder     /**      * 整体限制配置      * @param maxCores 整体可用的最大核数      * @param maxTaskFailure 单个任务失败的最大次数      * @param maxLocalWaitS 容错机制:数据读取阶段允许等待的最长时间,超过时间切换到其他副本。      * @return      */     def optimizeLimit(maxCores:Int=4,maxTaskFailure:Int,maxLocalWaitS:Int=30):Builder     /**      * 默认使用:Java序列化      * 推荐使用:Kryo序列化 提速或对速度又要i去      * 所有的自定义类型都要注册到Spark中,才能完成序列化。      * @param serde 全包路径      * @param classes 自定义类型,默认认为不需要指定,Class[_]表示类型未知。      * @return Builder      */     def optimizeSerializer(serde:String="org.apache.spark.serializer.JavaSerializer",clas:Array[Class[_]]=null):Builder     /**      * 在Spark的官方配置中,netTimeOutS可能被很多超时的数据调用。      * @param netTimeOusS 判定网络超时的时间      * @param schedulerMode 可能很多任务一起跑,因此公平调度      * @return      */     def optimizeNetAbout(netTimeOusS:Int=180,schedulerMode:String="FAIR"):Builder     /**      * 动态分配->按需分配      * 类似于配置线程池中的最大闲置线程数,根据需要去做动态分配      * @param dynamicEnabled 是否开启动态分配      * @param initialExecutors 初始启用的Executors的数量      * @param minExecutors 最小启用的Executors的数量      * @param maxExecutors 最大启用的Executors的数量      * @return      */     def optimizeDynamicAllocation(dynamicEnabled:Boolean=false,initialExecutors:Int=3,minExecutors:Int=0,maxExecutors:Int=6):Builder     /**      * 特指在没有指定分区数时,对分区数的配置。      * 并行度和初始启用的Executors的数量一致,避免额外开销。      *      * @param parallelism      * @param shuffleCompressEnabled      * @param maxSizeMB      * @param shuffleServiceEnabled      * @return      */     def optimizeShuffle(parallelism:Int=3,shuffleCompressEnabled:Boolean=false,maxSizeMB:Int=128,shuffleServiceEnabled:Boolean=true):Builder     /**      * 推测执行,将运行时间长的任务,放到队列中,等待运行时间短的任务运行完成后,再运行。      * @param enabled      * @param interval Spark检查任务执行时间的时间间隔,单位是秒。      * @param quantile 如果某个任务的执行时间超过指定分位数(如75%的任务执行时间),则认为该任务执行时间过长,需要启动推测执行。      */     def optimizeSpeculation(enabled:Boolean=false,interval:Int=15,quantile:Float=0.75F):Builder     def warehouseDir(hdfs:String):Builder     def end():SparkSession   } } 

在这里插入图片描述

相关内容

热门资讯

透视线上!wepoker透视最... 透视线上!wepoker透视最简单三个步骤,一贯存在有挂(透视)玩家教你(有挂插件);1、下载好we...
透视黑科技"wpk安... 透视黑科技"wpk安卓下载辅助"原来是真的有挂(透视)玩家教你(有挂解密)1、wpk安卓下载辅助系统...
透视美元局!aapoker辅助... 透视美元局!aapoker辅助器怎么用(透视)ai插件(原来真的有挂);1、进入到aapoker辅助...
透视规律(WEPOKER)we... 透视规律(WEPOKER)wepoker透视脚本免费app(透视)真是真的是有挂(详细教程)1、操作...
透视辅助!wepoker透视a... 透视辅助!wepoker透视app下载,都是有挂(透视)分享教程(有挂攻略);1、玩家可以在wepo...
透视玄学"we-po... 透视玄学"we-poker有人玩吗"总是是真的有挂(透视)科技教程(有挂详情);1、任何we-pok...
透视软件!aapoker插件(... 透视软件!aapoker插件(透视)透视脚本下载(果然有挂)1)aapoker插件辅助挂:进一步探索...
透视透视(WePoKer)ht... 透视透视(WePoKer)htx矩阵wepoker辅助(透视)原来是有挂(第三方教程)htx矩阵we...
透视透视!wepoker辅助是... 透视透视!wepoker辅助是真的假的,素来真的是有挂(透视)规律教程(有挂技巧)在进入wepoke...
透视神器"poker... 透视神器"pokernow辅助工具"切实存在有挂(透视)存在挂教程(有挂辅助);1、点击下载安装,p...