大数据学习之Spark基础(补充)
创始人
2025-01-07 06:32:58
0
10)mapValues
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}  object Demo10MapValues {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf()       .setMaster("local")       .setAppName("MapValues算子演示")      val sc: SparkContext = new SparkContext(conf)      val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")      val kvRDD1: RDD[(String, Int)] = linesRDD.map(_.split(",")).map {       case Array(_, name:String, age:String, _,_) =>         (name, age.toInt)     }       /**      * mapValues函数也是作用在kv格式的算子上      * 将每个元素的值传递给后面的函数,进行处理得到新的值,键不变,这个处理后的组合重新返回到新的RDD中      * 不改变键,只改变每个键所对应的值(是每个人的年龄加100)      */     kvRDD1.mapValues(_ + 100).foreach(println)    } } 
11)sortBy
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo12SortBy {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MapValues算子演示")      val sc: SparkContext = new SparkContext(conf)      //TODO parallelize:将scala的集合变成spark中的RDD     val rdd1: RDD[Int] = sc.parallelize(List(34, 123, 6, 1, 231, 1, 34, 56, 2))      //TODO 与Scala中不同,若是.sortBy(e)形式则会报错     val rdd2: RDD[Int] = rdd1.sortBy((e: Int) => e)     rdd2.foreach(println)   } } 
12)mapPartitions

mapPartitions:一次处理一个分区中的数据

它与map的区别在于,map是每次处理一条数据就返回一条数据到下一个rdd

mapPartitions一次处理一个分区的数据,处理完再返回

最后的处理效果和map的处理效果是一样的

mapPartition可以优化与数据库连接的次数

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}  object Demo11partitionBy {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf()       .setMaster("local")       .setAppName("MapValues算子演示")      val sc: SparkContext = new SparkContext(conf) // TODO spark/data/wcs/*目录下有两个文件,相当于两个block块,会生成两个分区     val linesRDD: RDD[String] = sc.textFile("spark/data/wcs/*")      /**      * mapPartitions:一次处理一个分区中的数据      * 它与map的区别在于,map是每次处理一条数据就返回一条数据到下一个rdd      * 而mapPartitions一次处理一个分区的数据,处理完再返回()      * 最后的处理效果和map的处理效果是一样的      * mapPartition可以优化与数据库连接的次数,使用mapPartition时,可以将一个分区的数据视为一个批次,      * 并在该批次内统一处理。这意味着可以在处理整个分区之前建立一次数据库连接,并在处理完整个分区后断开连接。      */     val rdd1: RDD[String] = linesRDD.mapPartitions((itr: Iterator[String]) => {       // 每个分区打印一次         println("=========================================")       itr.map((e: String) => {         e       })     })      val rdd1: RDD[String] = linesRDD.map((itr: String) => {          //  每条数据打印一次       println("=========================================")       itr     })  //    linesRDD.map((e:String)=>{ //      //...数据库连接 //    })      rdd1.foreach(println)    } } 
2、行动算子
1)foreach
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}  object Demo13foreach {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf()       .setMaster("local")       .setAppName("MapValues算子演示")      val sc: SparkContext = new SparkContext(conf)      val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")      val rdd1: RDD[Array[String]] = linesRDD.map((e: String) => {       e.split(",")     })      val rdd2: RDD[(String, String, String, String, String)] = rdd1.map {       case Array(id: String, name: String, age: String, gender: String, clazz: String) =>          (id, name, age, gender, clazz)     }      /**      * 行动算子,就可以触发一次作业执行,有几次行动算子调用,就会触发几次      *      * rdd是懒加载的性质      */     //    rdd2.foreach(println)     //    println("====================================")     //    rdd2.foreach(println)       println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") // 一定会打印,属于Scala而不属于spark作业中的语句     val rdd3: RDD[(String, String, String, String, String)] = rdd2.map((t5: (String, String, String, String, String)) => {       println("===============================")       t5     })     println("#############################")      rdd3.foreach(println)      while (true) {      }    } } 
2)collect
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD  object Demo14collect {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf()       .setMaster("local")       .setAppName("MapValues算子演示")      val sc: SparkContext = new SparkContext(conf)      val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")      val rdd1: RDD[Array[String]] = linesRDD.map((e: String) => {       e.split(",")     })      val rdd2: RDD[Student] = rdd1.map {       case Array(id: String, name: String, age: String, gender: String, clazz: String) =>         Student(id.toInt, name, age.toInt, gender, clazz)     }      //collect将rdd转成合适的scala中的数据结构     val stuArray: Array[Student] = rdd2.collect()     //foreach是scala中的foreach,不会产生作业执行的     stuArray.foreach(println)     while (true){      }    } }  case class Student(id: Int, name: String, age: Int, gender: String, clazz: String) 

在这里插入图片描述

3、算子案例
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD  object Demo15StudentTest1 {   def main(args: Array[String]): Unit = {     //求年级总分前10的学生各科分数的详细信息     val conf: SparkConf = new SparkConf()       .setMaster("local")       .setAppName("MapValues算子演示")      val sc: SparkContext = new SparkContext(conf)      /**      * 文件中数据格式为:      * 1500100001,1000001,98      * 1500100001,1000002,5      * 1500100001,1000003,137      * 1500100001,1000004,29      */     val linesRDD: RDD[String] = sc.textFile("spark/data/score.txt")      val idWithScoreRDD: RDD[(String, String, Int)] = linesRDD.map((line: String) => {       line.split(",") match {         case Array(id: String, subject_id: String, score: String) =>           (id, subject_id, score.toInt)       }     })       val array1: Array[String] = idWithScoreRDD       .map((t3: (String, String, Int)) => (t3._1, t3._3))       // 求出每个人的总分,为下面取总分前十名做铺垫       .reduceByKey(_ + _)       .sortBy((kv: (String, Int)) => -kv._2)       .take(10)       .map(_._1)      idWithScoreRDD.filter((t3: (String, String, Int)) => {       val bool: Boolean = array1.contains(t3._1)       if(bool){         println("存在")       }       bool     }).foreach((t3: (String, String, Int)) => {       println("==========================")       println(t3)     })     } } 
Spark中的缓存

checkpoint和cache的区别?

cache是将一个复杂的RDD做缓存,将来执行的时候,只是这个rdd会从缓存中取

checkpoint是永久将rdd数据持久化,将来执行的时候,直接从检查点的rdd往后执行

1、cache

执行过后缓存就都没了

在这里插入图片描述

import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext}  object Demo16Cache {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf()       .setMaster("local")       .setAppName("缓存演示")     val sc: SparkContext = new SparkContext(conf)     //===================================================================      val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")     val studentsRDD: RDD[Student2] = linesRDD.map(_.split(","))       .map {         case Array(id: String, name: String, age: String, gender: String, clazz: String) =>           Student2(id, name, age.toInt, gender, clazz)       }      /**      * 缓存:      * 在缓存的第一次执行时,就已经从缓存中放入数据并且下面取的时候就从缓存中取数据了      * 缓存的目的是为了spark core作业执行的时候,缩短rdd的执行链,能够更快的得到结果      * RDD在缓存之后,若是再次执行这个RDD有向无环图时,即可从所缓存的RDD开始执行,缩短执行速度      * 缓存的实现方式:      *  1、需要缓存的rdd调用cache函数      *  2、persist(StorageLevel.MEMORY_ONLY) 修改缓存级别      */ //    studentsRDD.cache() // 默认将rdd缓存到内存中,缓存级别为memory_only     // 将rdd缓存到磁盘中     studentsRDD.persist(StorageLevel.MEMORY_AND_DISK)      //需求1:求每个班级的人数     val rdd1: RDD[(String, Iterable[Student2])] = studentsRDD.groupBy(_.clazz)     val resRDD1: RDD[(String, Int)] = rdd1.map((kv: (String, Iterable[Student2])) =>       (kv._1, kv._2.size))     resRDD1.foreach(println)      //需求2:求每个年龄的人数     val rdd2: RDD[(Int, Iterable[Student2])] = studentsRDD.groupBy(_.age)     val resRDD2: RDD[(Int, Int)] = rdd2.map((kv: (Int, Iterable[Student2])) =>       (kv._1, kv._2.size))     resRDD2.foreach(println)      while (true){      }   } }  case class Student2(id:String,name:String,age:Int,gender:String,clazz:String) 

缓存前

在这里插入图片描述

缓存后

在这里插入图片描述

2、checkpoint

永久将执行过程中RDD中流动的数据存储到磁盘(hdfs)中

checkpoint

需要设置checkpoint的路径,统一设置的

checkpoint也相当于一个行动算子,触发作业执行(触发一个新的Job作业的执行)

第二次DAG有向无环图执行的时候(再存储完之后的又一次调用行动算子后),直接从最后一个有检查点的rdd开始向下执行

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}  object Demo17Checkpoint {   def main(args: Array[String]): Unit = {     val conf: SparkConf = new SparkConf()       .setMaster("local")       .setAppName("缓存演示")     val sc: SparkContext = new SparkContext(conf)     //TODO 设置检查点的存储路径     sc.setCheckpointDir("spark/data/checkpoint1")     //===================================================================      val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")     val studentsRDD: RDD[Student2] = linesRDD.map(_.split(","))       .map {         case Array(id: String, name: String, age: String, gender: String, clazz: String) =>           Student2(id, name, age.toInt, gender, clazz)       }            // TODO checkpoint     studentsRDD.checkpoint()      //需求1:求每个班级的人数     val rdd1: RDD[(String, Iterable[Student2])] = studentsRDD.groupBy(_.clazz)     val resRDD1: RDD[(String, Int)] = rdd1.map((kv: (String, Iterable[Student2])) => (kv._1, kv._2.size))     resRDD1.foreach(println)      //需求2:求每个年龄的人数     val rdd2: RDD[(Int, Iterable[Student2])] = studentsRDD.groupBy(_.age)     val resRDD2: RDD[(Int, Int)] = rdd2.map((kv: (Int, Iterable[Student2])) => (kv._1, kv._2.size))     resRDD2.foreach(println)      while (true) {      }   } } 

运行界面信息

在这里插入图片描述
在这里插入图片描述

Spark在linux中的部署
配置

1、上传解压,配置环境变量 配置bin目录

解压
tar -zxvf spark-3.1.3-bin-hadoop3.2.tgz -C …/
重命名
mv spark-3.1.3-bin-hadoop3.2 spark-3.1.3
配置环境变量
vim /etc/profile

更改用户组:

chown -R root:root spark-3.1.3

2、修改配置文件 conf

cp spark-env.sh.template spark-env.sh
增加配置:
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077

export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=2g
export JAVA_HOME=/usr/local/soft/jdk1.8.0_171

master相当于RM worker相当于NM

增加从节点配置
cp workers.template workers

vim workers

更改localhost为:

node1
node2

3、复制到其它节点

scp -r spark-3.1.3 node1:pwd
scp -r spark-3.1.3 node2:pwd

4、在主节点执行启动命令

启动集群,在master中执行
./sbin/start-all.sh

http://master:8080/ 访问spark ui

standalone client模式

1、standalone client模式 日志在本地输出,一般用于上线前测试(bin/下执行)

需要进入到spark-examples_2.11-2.4.5.jar 包所在的目录下执行
cd /usr/local/soft/spark-2.4.5/examples/jars

提交spark任务
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 ./spark-examples_2.12-3.1.3.jar 10

2、standalone cluster模式 上线使用,不会在本地打印日志

在该模式下运行时,必须保证node1、node2的/usr/local/soft/spark-3.1.3/examples/jars下由所要运行的jar包。因为不确定是哪个节点在运行任务。

scp spark-1.0.jar node1:/usr/local/soft/spark-3.1.3/examples/jars 

执行模板:

spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 512M --total-executor-cores 1 --deploy-mode cluster ./spark-examples_2.12-3.1.3.jar 100

spark-shell : spark 提供的一个交互式的命令行,可以直接写代码

spark-shell master spark://master:7077

整合yarn

yarn中之前没有配置yarn.application.classpath,需要对其进行配置:

样例:

在Linux中查找value:

hadoop classpath 
 yarn.application.classpath /usr/local/soft/hadoop-3.1.3/etc/hadoop:/usr/local/soft/hadoop-3.1.3/share/hadoop/common/    lib/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/common/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/hd    fs:/usr/local/soft/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/hdf    s/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/usr/local/soft/hadoop-3.1.3/share/hado    op/mapreduce/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/yarn:/usr/local/soft/hadoop-3.1.3/share/hado    op/yarn/lib/*:/usr/local/soft/hadoop-3.1.3/share/hadoop/yarn/*  

提交到了yarn上,可以在master:9870网页查看

在公司一般不适用standalone模式,因为公司一般已经有yarn 不需要搞两个资源管理框架

停止spark集群
在spark sbin目录下执行 ./stop-all.sh

spark整合yarn只需要在一个节点整合, 可以删除node1 和node2中所有的spark 文件

1、增加hadoop 配置文件地址

conf目录下 vim spark-env.sh 增加 export HADOOP_CONF_DIR=/usr/local/soft/hadoop-3.1.3/etc/hadoop 

2、往yarn提交任务需要增加两个配置 yarn-site.xml(/usr/local/soft/hadoop-3.1.3/etc/hadoop/yarn-site.xml)(Hadoop配置时已配置)

先关闭yarn
stop-all.sh

cd /usr/local/soft/hadoop-3.1.3/etc/hadoop

vim yarn-site.xml

增加配置

yarn.nodemanager.pmem-check-enabled
false

yarn.nodemanager.vmem-check-enabled false

4、同步到其他节点,重启yarn(Hadoop配置时已配置)

scp -r yarn-site.xml node1:pwd
scp -r yarn-site.xml node2:pwd

启动yarn
start-all.sh

cd /usr/local/soft/spark-3.1.3/examples/jars

3.spark on yarn client模式 日志在本地输出,一班用于上线前测试
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client spark-examples_2.12-3.1.3.jar 100

4.spark on yarn cluster模式 上线使用,不会再本地打印日志 减少io
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster spark-examples_2.12-3.1.3.jar 100

获取yarn程序执行日志 执行成功之后才能获取到
yarn logs -applicationId application_1560967444524_0003

hdfs webui
http://node1:50070

yarn ui
http://node1:8088

相关内容

热门资讯

Python酷库之旅-第三方库... 目录一、用法精讲25、pandas.HDFStore.get函数25-1、语法25-2、参数25-3...
科普微扑克透明挂(微扑克)辅助... 科普微扑克透明挂(微扑克)辅助器代打工具(外挂辅助器下载)详细教程(2025已更新)(哔哩哔哩);微...
Python酷库之旅-第三方库... 目录一、用法精讲4、pandas.read_csv函数4-1、语法4-2、参数4-3、功能4-4、返...
基于Python的图书馆管理系... 开发工具:python3.10数据库:Python内置的SQLite3数...
Python从0到100(三十... 1. 为什么要学习xpath和lxmllxml是一款高性能的 Python HTML/XML 解析器...
【Python实用案例】还在冲... 文章目录前言开发工具环境搭建效果展示代码展示模块导入窗口设置其他代码总结零基础Python学习资源介...
【解密】记一次辽宁省某综合实践... 最近接到需求,于是准备弄一下,发现对方整个流程是:先加密在...
【JavaEE精炼宝库】多线程... 目录一、线程池的概念及优势1.1 线程池的概念:1.2 线程池的优势:二...
新2022版教程微扑克代打原来... 新2022版教程微扑克代打原来确实是有挂,太嚣张了原来真的是有挂,详细教程(有挂技巧);AI辅助机器...
PGCCC|【PostgreS... 小编特将PostgreSQL证书申报个税退税流程,编辑成文,供大家申报参...