tensorflow MyBatis matplotlib go pdf serialization replace ant Minjs Normalizecss 百度seo关键词 后台ui模板 bootstrap模板 swift视频教程 网盘源码 两个正态分布相乘 mysql数据库名称 erp项目描述 div外边距 查看oracle连接数 webform开发教程 vue与html5 python查找指定字符 python函数大全 java泛型 java什么是多态 java获取现在时间 java中string的方法 java特性 java类方法 怎么装linux系统 地球末日攻略 视频修复工具 unity3d下载 工信部手机入网查询 苹果x银色 微信临时链接多久失效 送货单管理系统 js代码混淆工具 虚拟声卡驱动
当前位置: 首页 > 学习教程  > 编程语言

深入浅出SparkSQL-第二天(进阶)

2020/10/8 20:27:56 文章标签:

sparksql操作jdbc数据源sparksql保存数据操作sparksql整合hive 1. jdbc数据源 spark sql可以通过 JDBC 从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中 1.1 通过sparksql加载mysql表中…

  1. sparksql操作jdbc数据源
  2. sparksql保存数据操作
  3. sparksql整合hive

1. jdbc数据源

  • spark sql可以通过 JDBC 从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中

1.1 通过sparksql加载mysql表中的数据

  • 添加mysql连接驱动jar包
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>5.1.38</version>
</dependency>
  • 代码开发
package cn.linann.sql

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:利用sparksql加载mysql表中的数据
object DataFromMysql {

  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("DataFromMysql").setMaster("local[2]")

    //2、创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //3、读取mysql表的数据
        //3.1 指定mysql连接地址
        val url="jdbc:mysql://node1:3306/spark"
        //3.2 指定要加载的表名
        val tableName="iplocation"
        // 3.3 配置连接数据库的相关属性
        val properties = new Properties()

      //用户名
      properties.setProperty("user","root")
      //密码
      properties.setProperty("password","123456")

     val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)

      //打印schema信息
      mysqlDF.printSchema()

      //展示数据
      mysqlDF.show()

    //把dataFrame注册成表
    mysqlDF.createTempView("iplocation")

    spark.sql("select * from iplocation where total_count >1500").show()

    spark.stop()
  }
}

1.2 通过sparksql保存结果数据到mysql表中

  • 代码开发(本地运行)
package cn.linann.sql

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:通过sparksql把结果数据写入到mysql表中
object Data2Mysql {
  def main(args: Array[String]): Unit = {
    //1、创建SparkSession
    val spark: SparkSession = SparkSession
                                .builder()
                                .appName("Data2Mysql")
                                .master("local[2]")
                                .getOrCreate()
    //2、读取mysql表中数据
    //2.1 定义url连接
    val url="jdbc:mysql://node1:3306/spark"
    //2.2 定义表名
    val table="iplocation"
    //2.3 定义属性
    val properties=new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123456")

    val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties)

    //把dataFrame注册成一张表
      mysqlDF.createTempView("iplocation")

    //通过sparkSession调用sql方法
       //需要统计经度和维度出现的人口总数大于1000的记录 保存到mysql表中
      val result: DataFrame = spark.sql("select * from iplocation where total_count >1000")

    //保存结果数据到mysql表中
     //mode:指定数据的插入模式
        //overwrite: 表示覆盖,如果表不存在,事先帮我们创建
        //append   :表示追加, 如果表不存在,事先帮我们创建
        //ignore   :表示忽略,如果表事先存在,就不进行任何操作
        //error    :如果表事先存在就报错(默认选项)
    result.write.mode("append").jdbc(url,"1",properties)
    // result.write.mode(args(0)).jdbc(url,args(1),properties)

    //关闭
     spark.stop()
  }
}

  • 打成jar包集群提交

    • 代码开发

      package cn.linann.sql
      
      import java.util.Properties
      
      import org.apache.spark.sql.{DataFrame, SparkSession}
      
      //todo:通过sparksql把结果数据写入到mysql表中
      object Data2Mysql {
        def main(args: Array[String]): Unit = {
          //1、创建SparkSession
          val spark: SparkSession = SparkSession
                                      .builder()
                                      .appName("Data2Mysql") 
                                      .getOrCreate()
            
          //2、读取mysql表中数据
              //2.1 定义url连接
              val url="jdbc:mysql://node1:3306/spark"
              //2.2 定义表名
              val table="iplocation"
              //2.3 定义属性
              val properties=new Properties()
              properties.setProperty("user","root")
              properties.setProperty("password","123456")
      
          val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties)
      
          //把dataFrame注册成一张表
            mysqlDF.createTempView("iplocation")
      
          //通过sparkSession调用sql方法
             //需要统计经度和维度出现的人口总数大于1000的记录 保存到mysql表中
            val result: DataFrame = spark.sql("select * from iplocation where total_count >1000")
      
          //保存结果数据到mysql表中
          //mode:指定数据的插入模式
              //overwrite: 表示覆盖,如果表不存在,事先帮我们创建
              //append   :表示追加, 如果表不存在,事先帮我们创建
              //ignore   :表示忽略,如果表事先存在,就不进行任何操作
              //error    :如果表事先存在就报错(默认选项)
        
           result.write.mode(args(0)).jdbc(url,args(1),properties)
      
          //关闭
           spark.stop()
        }
      }
      
    • 提交任务脚本

      spark-submit \
      --master spark://node1:7077 \
      --class cn.linann.sql.Data2Mysql \
      --executor-memory 1g \
      --total-executor-cores 4 \
      --driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.38.jar \
      --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar \
      spark_class01-1.0-SNAPSHOT.jar \
      append  1
      

2. sparksql保存数据操作

  • 代码开发
package cn.linann.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

//todo:sparksql可以把结果数据保存到不同的外部存储介质中
object SaveResult {

  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("SaveResult").setMaster("local[2]")

    //2、创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //3、加载数据源
    val jsonDF: DataFrame = spark.read.json("E:\\data\\score.json")

    //4、把DataFrame注册成表
    jsonDF.createTempView("t_score")

    //todo:5、统计分析
    val result: DataFrame = spark.sql("select * from t_score where score > 80")

    //保存结果数据到不同的外部存储介质中
    //todo: 5.1 保存结果数据到文本文件  ----  保存数据成文本文件目前只支持单个字段,不支持多个字段
    result.select("name").write.text("./data/result/123.txt")

    //todo: 5.2 保存结果数据到json文件
    result.write.json("./data/json")

    //todo: 5.3 保存结果数据到parquet文件
    result.write.parquet("./data/parquet")

    //todo: 5.4 save方法保存结果数据,默认的数据格式就是parquet
    result.write.save("./data/save")

    //todo: 5.5 保存结果数据到csv文件
    result.write.csv("./data/csv")

    //todo: 5.6 保存结果数据到表中
    result.write.saveAsTable("t1")

    //todo: 5.7  按照单个字段进行分区 分目录进行存储
    result.write.partitionBy("classNum").json("./data/partitions")

    //todo: 5.8  按照多个字段进行分区 分目录进行存储
    result.write.partitionBy("classNum","name").json("./data/numPartitions")


    spark.stop()
  }

}


3. sparksql中自定义函数

  • 自定义UDF函数

  • 代码开发

    package cn.linann.sql
    
    import org.apache.spark.sql.api.java.UDF1
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //TODO:自定义sparksql的UDF函数    一对一的关系
    object SparkSQLFunction {
    
      def main(args: Array[String]): Unit = {
        //1、创建SparkSession
        val sparkSession: SparkSession = SparkSession.builder().appName("SparkSQLFunction").master("local[2]").getOrCreate()
    
        //2、构建数据源生成DataFrame
        val dataFrame: DataFrame = sparkSession.read.text("E:\\data\\test_udf_data.txt")
    
        //3、注册成表
        dataFrame.createTempView("t_udf")
    
    
        //4、实现自定义的UDF函数
    
        //小写转大写
        sparkSession.udf.register("low2Up",new UDF1[String,String]() {
          override def call(t1: String): String = {
            t1.toUpperCase
          }
        },StringType)
    
        //大写转小写
        sparkSession.udf.register("up2low",(x:String)=>x.toLowerCase)
    
    
        //4、把数据文件中的单词统一转换成大小写
        sparkSession.sql("select  value from t_udf").show()
        sparkSession.sql("select  low2Up(value) from t_udf").show()
        sparkSession.sql("select  up2low(value) from t_udf").show()
    
        sparkSession.stop()
    
      }
    }
    
    
    

4. sparksql整合hive

  • 步骤

    • 1、需要把hive安装目录下的配置文件hive-site.xml拷贝到每一个spark安装目录下对应的conf文件夹中
    • 2、需要一个连接mysql驱动的jar包拷贝到spark安装目录下对应的jars文件夹中
    • 3、可以使用spark-sql脚本 后期执行sql相关的任务
  • 启动脚本

spark-sql \
--master spark://node1:7077 \
--executor-memory 1g \
--total-executor-cores 4 \
--conf spark.sql.warehouse.dir=hdfs://node1:9000/user/hive/warehouse 
  • 应用场景
#!/bin/sh
#定义sparksql提交脚本的头信息
SUBMITINFO="spark-sql --master spark://node1:7077 --executor-memory 1g --total-executor-cores 4 --conf spark.sql.warehouse.dir=hdfs://node1:9000/user/hive/warehouse" 
#定义一个sql语句
SQL="select * from employee;" 
#执行sql语句   类似于 hive -e sql语句
echo "$SUBMITINFO" 
echo "$SQL"
$SUBMITINFO -e "$SQL"

5. spark的shuffle原理分析

5.1 shuffle概述

	Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。
	在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有自己的shuffle实现过程。

在这里插入图片描述

5.2 spark中的shuffle介绍

	在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是存在wide Dependency宽依赖的时候,需要进行shuffle,这时候会将作业job划分成多个Stage,每一个stage内部有很多可以并行运行的task。
	
	stage与stage之间的过程就是shuffle阶段,在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。

5.3 HashShuffle机制

5.3.1 HashShuffle概述

	在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。
	该ShuffleManager-HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。
	SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
  • Hash shuffle
    • HashShuffleManager的运行机制主要分成两种
      • 一种是普通运行机制
      • 另一种是合并的运行机制
    • 合并机制主要是通过复用buffer来优化Shuffle过程中产生的小文件的数量。
    • Hash shuffle是不具有排序的Shuffle。

5.3.2 普通机制的Hash shuffle

在这里插入图片描述

  • 图解
   这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

    图中有3个ReduceTask,从ShuffleMapTask 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 ShuffleMapTask 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以ReduceTask 会在属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 ShuffleMapTask 输出3份本地文件,这里有4个 ShuffleMapTask,所以总共输出了4 x 3个分类文件 = 12个本地小文件。
  • shuffle Write阶段
	主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey,groupByKey),而将每个task处理的数据按key进行“分区”。所谓“分区”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于reduce端的stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

     那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢? 很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。
  • shuffle Read阶段
	shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给Reduce端的stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

      shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。
  • 注意
(1)buffer起到的是缓存作用,缓存能够加速写磁盘,提高计算的效率,buffer的默认大小32k。

(2)分区器:根据hash/numRedcue取模决定数据由几个Reduce处理,也决定了写入几个buffer中

(3)block file:磁盘小文件,从图中我们可以知道磁盘小文件的个数计算公式:
                 block file=M*R

 (4) M为map task的数量,R为Reduce的数量,一般Reduce的数量等于buffer的数量,都是由分区器决定的
  • Hash shuffle普通机制的问题
(1).Shuffle阶段在磁盘上会产生海量的小文件,建立通信和拉取数据的次数变多,此时会产生大量耗时低效的 IO 操作 (因为产生过多的小文件)

(2).可能导致OOM,大量耗时低效的 IO 操作 ,导致写磁盘时的对象过多,读磁盘时候的对象也过多,这些对象存储在堆内存中,会导致堆内存不足,相应会导致频繁的GC,GC会导致OOM。由于内存中需要保存海量文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题
5.3.3 合并机制的Hash shuffle
	合并机制就是复用buffer缓冲区,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

在这里插入图片描述

  • 图解
	这里有6个这里有6个shuffleMapTask,数据类别还是分成3种类型,因为Hash算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里,然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据),每1个Task所在的进程中,分别写入共同进程中的3份本地文件,这里有6个shuffleMapTasks,所以总共输出是 2个Cores x 3个分类文件 = 6个本地小文件。
  • 注意
(1).启动HashShuffle的合并机制ConsolidatedShuffle的配置
   spark.shuffle.consolidateFiles=true

(2).block file=Core*R
	Core为CPU的核数,R为Reduce的数量
  • Hash shuffle合并机制的问题
	如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。

5.4 Sort shuffle

  • SortShuffleManager的运行机制主要分成两种,
    • 一种是普通运行机制
    • 另一种是bypass运行机制

5.4.1 Sort shuffle的普通机制

在这里插入图片描述

  • 图解
	在该模式下,数据会先写入一个数据结构,聚合算子写入Map,一边通过Map局部聚合,一遍写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值(5M),如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。

在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个task过程会产生多个临时文件。

最后在每个task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个task的数据在文件中的索引start offset和end offset。

	这样算来如果第一个stage 50个task,每个Executor执行一个task,那么无论下游有几个task,就需要50*2=100个磁盘文件。
  • 好处
1. 小文件明显变少了,一个task只生成一个file文件

2. file文件整体有序,加上索引文件的辅助,查找变快,虽然排序浪费一些性能,但是查找变快很多

5.4.2 bypass模式的sortShuffle

  • bypass机制运行条件

    • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值
    • 不是聚合类的shuffle算子(比如reduceByKey)
      在这里插入图片描述
  • 好处

    该机制与sortshuffle的普通机制相比,在shuffleMapTask不多的情况下,首先写的机制是不同,其次不会进行排序。这样就可以节约一部分性能开销。
  • 总结
    在shuffleMapTask数量小于默认值200时,启用bypass模式的sortShuffle,并没有进行sort,原因是数据量本身比较少,没必要进行sort全排序,因为数据量少本身查询速度就快,正好省了sort的那部分性能开销。

6. Spark Shuffle调优

//buffer大小默认是32K  maptask端的shuffle 降低磁盘IO
spark.shuffle.file.buffer 32k

//shuffle read拉取数据量的大小
spark.reducer.MaxSizeFlight 48M 

//shuffle聚合内存的比例
spark.shuffle.memoryFraction 0.2 

//拉取数据重试次数
spark.shuffle.io.maxRetries 3 

//调整到重试间隔时间60s
spark.shuffle.io.retryWait 5s 

//Spark Shuffle的种类
spark.shuffle.manager hash|sort 

//针对HashShuffle   HashShuffle 合并机制
spark.shuffle.consolidateFiles false 

//针对SortShuffle     SortShuffle bypass机制 200次
spark.shuffle.sort.bypassMergeThreshold 200 

本文链接: http://www.dtmao.cc/news_show_250311.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?