Kerberos认证原理 单例模式 UIkit Echojs java多行注释 ajax里面可以嵌套ajax吗 mysql 选择数据库 网页设计公司 python报错 java删除数组元素 java连数据库 java基本数据结构 java数组 java集合框架图 ip隐藏 java电子书下载 黑白照片一键变彩色 js转int dvwa安装教程 图片生成网址 在线手册 自动答题软件 华为下拉开关设置 js发送http请求 苹果手机验机软件 相册制作工具 平面设计软件下载 保卫萝卜沙漠7攻略 airdrop是什么 电脑上传速度慢 lol无限视野 无线中继是什么意思 cdr透明度怎么调 ppt背景音乐怎么关 音乐迷 cad合并成块 电脑微信官方下载 图片格式转换工具 vn打野 lol无法连接服务器请检查网络连接
当前位置: 首页 > 学习教程  > 编程语言

深入浅出SparkSQL-第一天(入门)

2020/10/8 20:22:55 文章标签:

sparksql简介sparksql中DataFrame和DataSet的数据结构sparksql中DataFrame和DataSet的使用方式 1.sparksql概述 1.1 sparksql的前世今生 Shark是专门针对于spark的构建大规模数据仓库系统的一个框架Shark与Hive兼容、同时也依赖于Spark版本Hivesql底层把sql解析成了mapreduc…

  1. sparksql简介
  2. sparksql中DataFrame和DataSet的数据结构
  3. sparksql中DataFrame和DataSet的使用方式

1.sparksql概述

1.1 sparksql的前世今生

  • Shark是专门针对于spark的构建大规模数据仓库系统的一个框架
  • Shark与Hive兼容、同时也依赖于Spark版本
  • Hivesql底层把sql解析成了mapreduce程序,Shark是把sql语句解析成了Spark任务
  • 随着性能优化的上限,以及集成SQL的一些复杂的分析功能,发现Hive的MapReduce思想限制了Shark的发展。
  • 最后Databricks公司终止对Shark的开发
    • 决定单独开发一个框架,不在依赖hive,把重点转移到了sparksql这个框架上。

1.2 什么是sparksql

在这里插入图片描述

  • Spark SQL is Apache Spark’s module for working with structured data.
  • SparkSQL是apache Spark用来处理结构化数据的一个模块

2. sparksql的四大特性

  • 1、易整合
    在这里插入图片描述

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X1pSAq4V-1602159306282)(spark_day05课程设计.assets/)]

    • 将SQL查询与Spark程序无缝混合
    • 可以使用不同的语言进行代码开发
      • java
      • scala
      • python
      • R
  • 2、统一的数据源访问
    在这里插入图片描述

    • 以相同的方式连接到任何数据源

      • sparksql后期可以采用一种统一的方式去对接任意的外部数据源

        SparkSession.read.该数据类型的方法名(该格式数据的路径)
        
  • 3、兼容hive
    在这里插入图片描述

    • sparksql可以支持hivesql这种语法 sparksql兼容hivesql
  • 4、支持标准的数据库连接
    在这里插入图片描述

    • sparksql支持标准的数据库连接JDBC或者ODBC

3. DataFrame概述

3.1 DataFrame发展

  • DataFrame前身是schemaRDD,这个schemaRDD是直接继承自RDD,它是RDD的一个实现类
  • 在spark1.3.0之后把schemaRDD改名为DataFrame,它不在继承自RDD,而是自己实现RDD上的一些功能
  • 也可以把dataFrame转换成一个rdd,调用rdd这个方法
    • 例如 val rdd1=dataFrame.rdd

3.2 DataFrame是什么

  • 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格
  • DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化
  • DataFrame可以从很多数据源构建
    • 比如:已经存在的RDD、结构化文件、外部数据库、Hive表。
DataFrame = RDD + schema元信息(对数据的结构描述信息)
DataFrame可以看成是一张mysql表。
表中有数据,同时表中还有字段的名称和类型,这里的字段的名称和类型就可以理解成Schema信息

在这里插入图片描述

3.3 DataFrame和RDD的优缺点

  • 1、RDD

    • 优点

      • 1、编译时类型安全
        • 开发会进行类型检查,在编译的时候及时发现错误
      • 2、具有面向对象编程的风格
    • 缺点

      • 1、构建大量的java对象占用了大量heap堆空间,导致频繁的GC

        由于数据集RDD它的数据量比较大,后期都需要存储在heap堆中,这里有heap堆中的内存空间有限,出现频繁的垃圾回收(GC),程序在进行垃圾回收的过程中,所有的任务都是暂停。影响程序执行的效率
        
      • 2、数据的序列化和反序列性能开销很大
        在分布式程序中,对象(对象的内容和结构)是先进行序列化,发送到其他服务器,进行大量的网络传输,然后接受到这些序列化的数据之后,再进行反序列化来恢复该对象
      
  • 2、DataFrame

    • DataFrame引入了schema元信息和off-heap(堆外)
    • 优点
      • 1、DataFrame引入off-heap,大量的对象构建直接使用操作系统层面上的内存,不在使用heap堆中的内存,这样一来heap堆中的内存空间就比较充足,不会导致频繁GC,程序的运行效率比较高,它是解决了RDD构建大量的java对象占用了大量heap堆空间,导致频繁的GC这个缺点。

      • 2、DataFrame引入了schema元信息—就是数据结构的描述信息,后期spark程序中的大量对象在进行网络传输的时候,只需要把数据的内容本身进行序列化就可以,数据结构信息可以省略掉。这样一来数据网络传输的数据量是有所减少,数据的序列化和反序列性能开销就不是很大了。它是解决了RDD数据的序列化和反序列性能开销很大这个缺点

    • 缺点
      • DataFrame引入了schema元信息和off-heap(堆外)它是分别解决了RDD的缺点,同时它也丢失了RDD的优点
        • 1、编译时类型不安全
          • 编译时不会进行类型的检查,这里也就意味着前期是无法在编译的时候发现错误,只有在运行的时候才会发现
        • 2、不在具有面向对象编程的风格

4. 读取文件构建DataFrame

4.1 读取文本文件创建DataFrame

  • 第一种方式
//加载数据
val rdd1=sc.textFile("/person.txt").map(x=>x.split(" "))
//定义一个样例类
case class Person(id:String,name:String,age:Int)
//把rdd与样例类进行关联
val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
//把rdd转换成DataFrame
val personDF=personRDD.toDF

//打印schema信息
personDF.printSchema

//展示数据
personDF.show

  • 第二种方式
val personDF=spark.read.text("/person.txt")
//org.apache.spark.sql.DataFrame = [value: string]

//打印schema信息
personDF.printSchema

//展示数据
personDF.show

4.2 读取json文件创建DataFrame

val peopleDF=spark.read.json("/people.json")
//打印schema信息
peopleDF.printSchema

//展示数据
peopleDF.show

4.3 读取parquet文件创建DataFrame

val usersDF=spark.read.parquet("/users.parquet")
//打印schema信息
usersDF.printSchema

//展示数据
usersDF.show

5. DataFrame常用操作

5.1 DSL风格语法

  • 就是sparksql中的DataFrame自身提供了一套自己的Api,可以去使用这套api来做相应的处理
//加载数据
val rdd1=sc.textFile("/person.txt").map(x=>x.split(" "))
//定义一个样例类
case class Person(id:String,name:String,age:Int)
//把rdd与样例类进行关联
val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
//把rdd转换成DataFrame
val personDF=personRDD.toDF

//打印schema信息
personDF.printSchema

//展示数据
personDF.show

//查询指定的字段
personDF.select("name").show
personDF.select($"name").show
personDF.select(col("name").show
                
//实现age+1
 personDF.select($"name",$"age",$"age"+1).show   

//实现age大于30过滤
 personDF.filter($"age" > 30).show
  
 //按照age分组统计次数
 personDF.groupBy("age").count.show 
   
//按照age分组统计次数降序
 personDF.groupBy("age").count().sort($"count".desc)show    

5.2 SQL风格语法

  • 可以把DataFrame注册成一张表,然后通过sparkSession.sql(sql语句)操作
//DataFrame注册成表
personDF.createTempView("person")

//使用SparkSession调用sql方法统计查询
spark.sql("select * from person").show
spark.sql("select name from person").show
spark.sql("select name,age from person").show
spark.sql("select * from person where age >30").show
spark.sql("select count(*) from person where age >30").show
spark.sql("select age,count(*) from person group by age").show
spark.sql("select age,count(*) as count from person group by age").show
spark.sql("select * from person order by age desc").show

6. DataSet概述

6.1 DataSet是什么

  • DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。
  • DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及使用了Spark SQL优化的执行引擎。

6.2 RDD、DataFrame、DataSet的区别

  • 假设RDD中的两行数据长这样
    在这里插入图片描述

  • 那么DataFrame中的数据长这样
    在这里插入图片描述

  • Dataset中的数据长这样
    在这里插入图片描述

    • 或者长这样(每行数据是个Object)
      在这里插入图片描述
DataSet包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。
(1)DataSet可以在编译时检查类型
(2)并且是面向对象的编程接口

6.3 DataFrame与DataSet互相转换

  • 1、把一个DataFrame转换成DataSet

    • val dataSet=dataFrame.as[强类型]
  • 2、把一个DataSet转换成DataFrame

    • val dataFrame=dataSet.toDF
  • 补充说明

    • 可以从dataFrame和dataSet获取得到rdd
      • val rdd1=dataFrame.rdd
      • val rdd2=dataSet.rdd

6.4 构建DataSet

  • 1、 通过sparkSession调用createDataset方法

    val ds=spark.createDataset(1 to 10) //scala集合
    val ds=spark.createDataset(sc.textFile("/person.txt"))  //rdd
    
  • 2、使用scala集合和rdd调用toDS方法

    sc.textFile("/person.txt").toDS
    List(1,2,3,4,5).toDS
    
  • 3、把一个DataFrame转换成DataSet

    val dataSet=dataFrame.as[强类型]
    
  • 4、通过一个DataSet转换生成一个新的DataSet

     List(1,2,3,4,5).toDS.map(x=>x*10)
    

7. 通过IDEA开发程序实现把RDD转换DataFrame

  • 添加依赖
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

7.1 利用反射机制

  • 定义一个样例类,后期直接映射成DataFrame的schema信息
  • 代码开发
package cn.linann.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}

//todo:利用反射机制实现把rdd转成dataFrame
case class Person(id:String,name:String,age:Int)

object CaseClassSchema {
  def main(args: Array[String]): Unit = {

    //1、构建SparkSession对象
    val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()

    //2、获取sparkContext对象
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("warn")

    //3、读取文件数据
    val data: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(x=>x.split(" "))

    //4、定义一个样例类

    //5、将rdd与样例类进行关联
    val personRDD: RDD[Person] = data.map(x=>Person(x(0),x(1),x(2).toInt))

    //6、将rdd转换成dataFrame
    //需要手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRDD.toDF

    //7、对dataFrame进行相应的语法操作
    //todo:----------------- DSL风格语法-----------------start
    //打印schema
    personDF.printSchema()
    //展示数据
    personDF.show()

    //获取第一行数据
    val first: Row = personDF.first()
    println("first:"+first)

    //取出前3位数据
    val top3: Array[Row] = personDF.head(3)
    top3.foreach(println)

    //获取name字段
    personDF.select("name").show()
    personDF.select($"name").show()
    personDF.select(new Column("name")).show()
    personDF.select("name","age").show()

    //实现age +1
    personDF.select($"name",$"age",$"age"+1).show()

    //按照age过滤
    personDF.filter($"age" >30).show()
    val count: Long = personDF.filter($"age" >30).count()
    println("count:"+count)

    //分组
    personDF.groupBy("age").count().show()

    personDF.show()
    personDF.foreach(row => println(row))

    //使用foreach获取每一个row对象中的name字段
    personDF.foreach(row =>println(row.getAs[String]("name")))
    personDF.foreach(row =>println(row.get(1)))
    personDF.foreach(row =>println(row.getString(1)))
    personDF.foreach(row =>println(row.getAs[String](1)))
    //todo:----------------- DSL风格语法--------------------end


    //todo:----------------- SQL风格语法-----------------start
    personDF.createTempView("person")
    //使用SparkSession调用sql方法统计查询
    spark.sql("select * from person").show
    spark.sql("select name from person").show
    spark.sql("select name,age from person").show
    spark.sql("select * from person where age >30").show
    spark.sql("select count(*) from person where age >30").show
    spark.sql("select age,count(*) from person group by age").show
    spark.sql("select age,count(*) as count from person group by age").show
    spark.sql("select * from person order by age desc").show
    //todo:----------------- SQL风格语法----------------------end

    //关闭sparkSession对象
    spark.stop()
  }
}

7.2 通过StructType直接指定Schema

  • 代码开发
package cn.linann.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

//todo;通过动态指定dataFrame对应的schema信息将rdd转换成dataFrame
object StructTypeSchema {

  def main(args: Array[String]): Unit = {
    //1、构建SparkSession对象
    val spark: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()

    //2、获取sparkContext对象
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("warn")

    //3、读取文件数据
    val data: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(x=>x.split(" "))

    //4、将rdd与Row对象进行关联
    val rowRDD: RDD[Row] = data.map(x=>Row(x(0),x(1),x(2).toInt))

    //5、指定dataFrame的schema信息   
    //这里指定的字段个数和类型必须要跟Row对象保持一致
    val schema=StructType(
      StructField("id",StringType)::
        StructField("name",StringType)::
        StructField("age",IntegerType)::Nil
    )

    val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema)
    dataFrame.printSchema()
    dataFrame.show()

    dataFrame.createTempView("user")
    spark.sql("select * from user").show()


    spark.stop()

  }

}

8、sparksql 操作hivesql

  • 添加依赖
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>
  • 代码开发
package cn.linann.sql
import org.apache.spark.sql.SparkSession


//todo:利用sparksql操作hivesql
object HiveSupport {
  def main(args: Array[String]): Unit = {
    //1、构建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .appName("HiveSupport")
      .master("local[2]")
      .enableHiveSupport() //开启对hive的支持
      .getOrCreate()
    //2、直接使用sparkSession去操作hivesql语句

      //2.1 创建一张hive表
       spark.sql("create table people(id string,name string,age int) row format delimited fields terminated by ','")

      //2.2 加载数据到hive表中
       spark.sql("load data local inpath './data/11.txt' into table people ")

      //2.3 查询
      spark.sql("select * from people").show()

    spark.stop()
  }
}


本文链接: http://www.dtmao.cc/news_show_250272.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?