单例模式 私有变量 sockets jpa websocket datagridview configuration static 后台管理模板下载 郑州网站开发 多商户商城模板 软件测试项目实战案例 bootstrap模态框传参 mysql修改字段值 mysql分页查询sql语句 图片生成链接 mysql插入 python字典类型 python操作mysql python中pop函数 python语言编程 java函数 java入门基础 java八种基本数据类型 java判断 java小程序 源计划卡特 unix系统下载 js获取父节点 整站系统 java游戏编程 音乐剪辑器下载 淘宝自动发货软件 wow怎么赚钱 华为下拉开关设置 汽车配件查询软件 电脑cmd命令大全 ip切换软件 汉仪文黑 windowsjs延时函数
当前位置: 首页 > 学习教程  > 编程语言

大数据——Apache Spark基础及架构

2020/11/4 15:01:16 文章标签:

Apache Spark基础及架构为什么使用SparkSpark简介Spark优势Spark技术栈Spark环境部署Spark初体验Spark架构设计Spark架构核心组件Spark API(一)Spark API(二)示例:使用IDEA初始化Spark运行环境具体步骤实施Spark API&a…

Apache Spark基础及架构

  • 为什么使用Spark
  • Spark简介
  • Spark优势
  • Spark技术栈
  • Spark环境部署
  • Spark初体验
  • Spark架构设计
  • Spark架构核心组件
  • Spark API(一)
  • Spark API(二)
    • 示例:使用IDEA初始化Spark运行环境
      • 具体步骤实施
  • Spark API(三)
  • Spark RDD概念(一)
  • Spark RDD概念(二)
  • RDD与DAG
  • RDD的特性
  • RDD编程流程
  • RDD的创建(一)
    • 示例
  • RDD的创建(二)
    • 示例
  • RDD的创建(三)
  • RDD的创建(四)
  • RDD创建方式的最佳实战
  • RDD分区
  • RDD操作
    • RDD转换算子
      • RDD常用的转换算子(一)
        • 示例
      • RDD常用的转换的算子(二)
        • 示例
      • RDD常用的转换算子(三)
        • 示例
      • RDD常用的转换算子(四)
        • 示例
    • RDD动作算子
      • RDD常用动作算子(一)
      • RDD常用动作算子(二)
      • RDD常用动作算子(三)
        • 示例
      • RDD常用动作算子(四)
        • 示例

为什么使用Spark

MapReduce编程模型的局限性

  • 繁杂

     只有MapReduce两个操作,复杂的逻辑需要大量的样板代码
    
  • 处理效率低

     Map中间结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据
     任务调度与启动开销大
    
  • 不适合迭代处理、交互式处理和流式处理

Spark是类Hadoop MapReduce的通用并行框架

  • Job中间输出结果可以保存在内存,不再需要读写HDFS
  • 比MapReduce平均快10倍以上

Spark简介

诞生于加州大学伯克利分校AMP实验室,是一个基于内存的分布式计算框架
发展历程

  • 2009年诞生于加州大写伯克利分校AMP实验室
  • 2010年正式开源
  • 2013年6月正式成为Apache孵化项目
  • 2014年2月成为Apache顶级项目
  • 2014年5月正式发布Spark 1.0版本
  • 2014年10月Spark打破MapReduce保持的排序记录
  • 2015年发布了1.3、1.4、1.5版本
  • 2016年发布了1.6、2.x版本
    在这里插入图片描述

Spark优势

速度快

  • 基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试)
  • 基于硬盘数据处理,比MR快10个数量级以上

易用性

  • 支持Java、Scala、Python、R语言
  • 交互式shell方便开发测试

通用性

  • 一栈式解决方案:批处理、交互式查询、实时流处理、图计算及机器学习

随处运行

  • YARN、Mesos、EC2、Kubernetes、Standalone、Local

Spark技术栈

Spark Core

  • 核心组件,分布式计算引擎

Spark SQL

  • 高性能的基于Hadoop的SQL解决方案

Spark Streaming

  • 可以实现高吞吐量、具备容错机制的准实时流处理系统

Spark Graphx

  • 分布式图处理框架

SparkMLlib

  • 构建在Spark上的分布式机器学习库
    在这里插入图片描述

Spark环境部署

选择较新的Spark2.2版本,下载地址

  • Spark

解压并配置环境变量

  • SPARK_HOME

Spark配置文件

  • $SPARK_HOME/conf/spark-env.sh
  • $SPARK_HOME/conf/slaves

启动Spark

  • $SPARK_HOME/sbin/start-all.sh
  • 7077与8080端口

Spark具体安装和配置

Spark初体验

spark-shell:Spark自带的交互式工具

  • 本机

     spark-shell --master local[*]
    
  • Standalone

     spark-shell --master spark://MASTERHOST:7077
    
  • YARN

     spark-shell --master yarn-client
    

Spark实现WordCount

/*hello.txt内容
hello scala
hello spark
hello world*/
scala>sc.textFile("hdfs://cluster1/data/hello.txt").flatMap(x=>x.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect

在这里插入图片描述

Spark架构设计

运行架构
在这里插入图片描述

  • 在驱动程序中,通过SparkContext主导应用的执行
  • SparkContext可以连接不同类型的Cluster Manager(Standalone、YARN、Mesos),连接后,获得集群节点上的Executor
  • 一个Worker节点默认一个Executor,可通过SPARK_WORKER_INSTANCES调整
  • 每个应用获取自己的Executor
  • 每个Task处理一个RDD分区

Spark架构核心组件

术语说明
Application建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码
Driver program驱动程序。Application中的main函数并创建SparkContext
Cluster Manager在集群(Standalone、Mesos、YARN)上获取资源的外部服务
Worker Node集群中任何可以运行Application代码的节点
Executor某个Application运行在worker节点上的一个进程
Task被送到某个Executor上的工作单元
Job包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job
Stage每个Job会被拆分成多组Task,作为一个TaskSet,其名称为Stage

Spark API(一)

SparkContext

  • 连接Driver与Spark Cluster(Workers)
  • Spark的主入口
  • 每个JVM仅能有一个活跃的SparkContext
  • SparkContext.getOrCreate
import org.apache.spark.{SparkConf, SparkContext}

val conf=new SparkConf().setMaster("local[2]").setAppName("HelloSpark")
val sc=SparkContext.getOrCreate(conf)

Spark API(二)

SparkSession

  • Spark 2.0+应用程序的主入口:包含了SparkContext、SQLContext、HiveContext以及StreamingContext
  • SparkSession.getOrCreate
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.master("local[2]").appName("appName").getOrCreate()

示例:使用IDEA初始化Spark运行环境

需求说明

  • 创建Maven工程,添加依赖

     scala-library
     spark-core
     spark-sql
    
  • 创建SparkContext

  • 创建SparkSession

  • 使用Spark实现WordCount

具体步骤实施

  • 创建Maven工程
  • 添加依赖包
<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.21</version>
    </dependency>

代码展示

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    val conf:SparkConf=new SparkConf().setMaster("local[2]").setAppName("workcount")
    val sc:SparkContext=SparkContext.getOrCreate(conf)

    val rdd1:RDD[String]=sc.parallelize(List("hello world","hello java","hello scala"),5)

    val rdd2=sc.makeRDD(List("hello python","hello hadoop","hello c"))


    rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
      rdd2.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)

    val partitions:Array[Partition] = rdd1.partitions
    println("rdd分区 "+partitions.length)
    }
}

结果展示:
在这里插入图片描述
在这里插入图片描述

  • 然后这时会发现很多的红色信息,如果不想看到这些信息,可以设置一下
  • 在IDEA左侧找到External Libraries
    在这里插入图片描述
  • 点开External Libraries,找到Maven:org.apache.spark:spark-core_2.11:2.1.1下面的spark-core_2.11-2.1.1.jar下面的org下面的apache下面的spark文件

在这里插入图片描述

  • 在spark文件下面找到log4j-defaults.properties文件,复制
    在这里插入图片描述
  • 在主目录下创建一个resource文件夹
  • 选择刚创建好的resource文件夹,右击选择Mark Directory as下的Sources Root

在这里插入图片描述

  • 把刚复制的文件放进去,并改名为log4j.properties
    在这里插入图片描述
  • 打开log4j.properties,把INFO更改为ERROR

在这里插入图片描述

  • 再次执行WordCount

在这里插入图片描述

Spark API(三)

RDD

  • Spark核心,主要数据抽象

Dataset

  • 从Spark1.6开始引入的新的抽象,特定领域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作

DataFrame

  • DataFrame是特殊Dataset

Spark RDD概念(一)

简单的解释

  • RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存中,并执行正确的操作

复杂的解释

  • RDD是用于数据转换的接口
  • RDD指向了存储在HDFS、Cassandra、HBase等、或缓存(内存、内存+磁盘、仅磁盘等),或在故障或缓冲收回时重新计算其他RDD分区中的数据

Spark RDD概念(二)

RDD是弹性分布式数据集(Resilient Distributed Datasets)

  • 分布式数据集

     RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上
     RDD并不存储真正的数据,只是对数据和操作的描述
    
  • 弹性

      RDD默认存放在内存中,当内存不足,Spark自动将RDD写入磁盘
    
  • 容错性

      根据数据血统,可以自动从节点失败中恢复分区
    

RDD与DAG

两者是Spark提供的核心抽象
DAG(有向无环图)反映了RDD之间的依赖关系
在这里插入图片描述

RDD的特性

一系列的分区(分片)信息,每个任务处理一个分区
每个分区上都有compute函数,计算该分区中的数据
RDD之间有一系列的依赖
分区函数决定数据(key-value)分配至哪个分区
最佳位置列表,将计算任务分派到其所在处理数据块的存储位置

RDD编程流程

RDD创建
RDD转换
RDD持久化
RDD执行

RDD的创建(一)

使用集合创建RDD

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.count
rdd.partitions.size
val rdd=sc.parallelize(List(1,2,3,4,5,6),5)
rdd.partitions.size
val rdd=sc.makeRDD(List(1,2,3,4,5,6))

注意:
1、Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定
2、Spark会为每一个分区运行一个任务进行处理

示例

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    val conf:SparkConf=new SparkConf().setMaster("local[2]").setAppName("workcount")
    val sc:SparkContext=SparkContext.getOrCreate(conf)

    val rdd1:RDD[String]=sc.parallelize(List("hello world","hello java","hello scala"),5)

    val rdd2=sc.makeRDD(List("hello python","hello hadoop","hello c"))


    rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
      rdd2.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)

    val partitions:Array[Partition] = rdd1.partitions
    println("rdd分区 "+partitions.length)
    println(rdd1.partitions.size)
    }
}

结果展示:
在这里插入图片描述

RDD的创建(二)

通过加载文件产生RDD

//文件中的一行文本作为RDD的一个元素
val distFile=sc.textFile("file:///home/hadoop/data/hello.txt")
distFile.count
val distHDFSFile=sc.textFile("hdfs://hadoop000:8020/hello.txt")

注意:加载"file://…"时,以local运行仅需一份本地文件,以Spark集群方式运行,应保证每个节点均有该文件的本地副本

支持目录、压缩文件以及通配符

sc.textFile("/my/directory")
sc.textFile("/my/directory/*.txt")
sc.textFile("/my/directory/*.gz")

注意:
1、Spark默认访问HDFS
2、Spark默认为HDFS文件的每一个数据块创建一个分区,也可以通过textFile()第二个参数指定,但只能比数据块数量多

示例

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    val conf:SparkConf=new SparkConf().setMaster("local[2]").setAppName("workcount")
    val sc:SparkContext=SparkContext.getOrCreate(conf)
    println("------------相对路径-------------------------")
    //相对路径
     val lines=sc.textFile("in/word.txt")
    lines.collect.foreach(println)

    println("--------------绝对路径--------------------------")
    //绝对路径
    val lines2=sc.textFile("D:/ideashuju/sparkdemo/in/word.txt")
    lines.collect.foreach(println)
    println("---------------hdfs-----------------")
    val linesHDFS=sc.textFile("hdfs://hadoop100:9000/kb09workspace/*2.txt")
    linesHDFS.collect.foreach(println)
  }
}

结果展示:在这里插入图片描述

RDD的创建(三)

其他创建RDD的方法

  • SparkContext.wholeTextFiles():可以针对一个目录中的大量小文件返回<filenam,fileContext>作为PairRDD

     普通RDD:org.apache.spark.rdd.RDD[data_type]
     ParirRDD:org.apache.spark.rdd.RDD[(key_type,value_type)]
     //Spark为包含键值对类型的RDD提供了一些专有的操作,比如:reduceByKey()、groupBy()......
     //也可以通过键值对集合创建PairRDD:sc.parallelize(List((1,2),(1,3)))
    

RDD的创建(四)

其他创建RDD的方法

  • SparkContext.sequenceFileK,V

     Hadoop SequenceFile的读写支持
    
  • SparkContext.hadoopRDD()、newAPIHadoopRDD()

     从Hadoop接口API创建
    
  • SparkContext.objectFile()

     RDD.saveAsObjectFile()的逆操作
    

RDD创建方式的最佳实战

测试环境

  • 使用内存集合创建RDD
  • 使用本地文件创建RDD

生产环境

  • 使用HDFS文件创建RDD

RDD分区

分区是RDD被拆分并发送到节点的不同块之一

  • 我们拥有的分区越多,得到的并行性就越强
  • 每个分区都是被分发到不同Worker Node的候选者
  • 每个分区对应一个Task
    在这里插入图片描述

RDD操作

分为lazy和non-lazy两种

  • Transformation(lazy):也称转换操作、转换算子
  • Actions(non-lazy):立即执行,也称动作操作、动作算子

RDD转换算子

对于转换操作,RDD的所有转换都不会直接计算结果

  • 仅记录作用于RDD上的操作
  • 当遇到动作算子(Action)时才会进行真正计算
    在这里插入图片描述

RDD常用的转换算子(一)

map算子

  • 对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD
  • 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应
  • 输入分区与输出分区一一对应
//将原RDD中每个元素都乘以2来产生一个新的RDD
val a=sc.parallelize(1 to 9)
val b=a.map(x=>x*2)
a.collect
b.collect
//map把普通RDD变成PairRDD
val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
val b=a.map(x=>(x,1))
b.collect

示例

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object mapdemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("map")
    val sc:SparkContext = SparkContext.getOrCreate(conf)
    val rdd1:RDD[Int]=sc.makeRDD(1 to 10)
    val a:RDD[Int]=rdd1.map(_*2)
    a.collect.foreach(println)
    println("-----------------------------------")
    println(rdd1.partitions.size)
    println("------------------------------------")
    val rdd2:RDD[String]=sc.makeRDD(List("kb02","kb03","kb03","kb04","kb05"))
    val b:RDD[(String,Int)]=rdd2.map((_,1))
    b.collect.foreach(println)
    }
}

结果展示:
在这里插入图片描述

RDD常用的转换的算子(二)

filter算子

  • 对元素进行过滤,对每个元素应用指定函数,返回值为true的元素保留在新的RDD中
val a=sc.parallelize(1 to 10)
a.filter(_%2==0).collect      
a.filter(_<4).collect	
//map&filter
val rdd=sc.parallelize(List(1 to 6))
val mapRdd=rdd.map(_*2)
mapRdd.collect
val filterRdd=mapRdd.filter(_>5)
filterRdd.collect

示例

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object mapdemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("map")
    val sc:SparkContext = SparkContext.getOrCreate(conf)
    val rdd3:RDD[Int]=sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
    val c:RDD[Int]=rdd3.filter(_%2==0)
    c.collect.foreach(println)
    }
}

结果展示:在这里插入图片描述

RDD常用的转换算子(三)

mapValues算子

  • 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD
val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
val b=a.map(x=>(x.length,x))
b.mapValues("x"+_+"x").collect
输出结果:
Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

示例

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object mapdemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("map")
    val sc:SparkContext = SparkContext.getOrCreate(conf)
    val rdd4:RDD[String]=sc.makeRDD(List("tiger","dog","lion","cat","eagle","panther"))
    val d:RDD[(Int,String)]=rdd4.map(x=>(x.length,x))
    d.collect.foreach(println)
    val e:RDD[(Int,String)]=d.mapValues(x=>"_"+x+"_")
    println("----------------------------------------")
    e.collect.foreach(println)
    }
}

结果展示:在这里插入图片描述

RDD常用的转换算子(四)

更多常用转换算子

  • distinct
  • reduceByKey
  • groupByKey
  • sortByKey
  • union
  • join
  • count
val dis = sc.parallelize(List(1,2,3,4,5,6,7,8,9,9,2,6))
dis.distinct.collect
dis.distinct(2).partitions.length

val a = sc.parallelize(List("dog", "salmon", "pig"), 3)
val f = a.map(x=>(x.length,x))
f.reduceByKey((a,b)=>(a+b)).collect
f.reduceByKey(_+_).collect
f.groupByKey.collect

val a = sc.parallelize(List("dog", "salmon", "pig"), 3)
val f = a.map(x=>(x.length,x))
f.sortByKey().collect
f.sortByKey(false).collect

val u1 = sc.parallelize(1 to 3)
val u2 = sc.parallelize(3 to 4)
u1.union(u2).collect
(u1 ++ u2).collect
u1.intersection(u2).collect

val j1 = sc.parallelize(List("abe", "abby", "apple"))
                                                                           .map(a => (a, 1))
val j2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, 1))
j1.join(j2).collect
j1.leftOuterJoin(j2).collect
j1.rightOuterJoin(j2).collect

示例

  • 示例一
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc=new SparkContext(conf)
    val rdd1:RDD[Int]= sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 2, 6))
    val rdd2:RDD[Int]=rdd1.distinct
    println("rdd1分区数:"+rdd1.partitions.length)
    println("rdd2分区数:"+rdd2.partitions.length)
    rdd2.collect.foreach(println)

    val rdd3:RDD[Int]=rdd1.distinct(2)
    println("rdd3分区数:"+rdd3.partitions.length)
  }
}

结果展示:在这里插入图片描述

  • 示例二
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object mapdemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("map")
    val sc:SparkContext = SparkContext.getOrCreate(conf)
    println("-----------reduceByKey---------------------")
    val reduceByKeyRdd1:RDD[(Int,String)]=d.reduceByKey((a,b)=>a+b)
  reduceByKeyRdd1.collect.foreach(println)

    println("-------------groupByKey---------------------")

    val groupByKeyRdd:RDD[(Int,Iterable[String])]=d.groupByKey()
    groupByKeyRdd.collect.foreach(println)


    println("----------------sortByKey-------------------")
    val sortByKeyRdd:RDD[(Int,String)]=d.sortByKey()
    sortByKeyRdd.collect.foreach(println)


    println("----------------sortByKey false-------------------")
    val sortByKeyRdd2:RDD[(Int,String)]=d.sortByKey(false)
    sortByKeyRdd2.collect.foreach(println)


    println("-------------------union----------------------------")
    val u1:RDD[Int]=sc.makeRDD(List(1,2,3))
    val u2:RDD[Int]=sc.makeRDD(List(3,4))
    val x:RDD[Int]=u1.union(u2)
    val y:RDD[Int]=(u1++u2)
    val z:RDD[Int]=u1.intersection(u2)
    println("-------------------x---------------------------")
    x.collect.foreach(println)
    println("------------------y-------------------------------")
    y.collect.foreach(println)
    println("-------------------z-------------------------------")
    z.collect.foreach(println)

    println("-------------------join-------------------------")
    val j1=sc.makeRDD(List("abe","baby","apple")).map((_,1))
    val j2=sc.makeRDD(List("apple","beatty","beatrice")).map((_,1))
    println("-------------------f----------------------------")
    val f=j1.join(j2)
    f.collect.foreach(println)
    println("-------------------g----------------------------------")
    val g=j1.leftOuterJoin(j2)
    g.collect.foreach(println)
    println("---------------------h---------------------------------")
    val h=j1.rightOuterJoin(j2)
    h.collect.foreach(println)
  }
}

结果展示:在这里插入图片描述

RDD动作算子

本质上动作算子通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行
所有的动作算子都是急迫性(non-lazy),RDD遇到Action就会立即计算

RDD常用动作算子(一)

count

  • 返回的是数据集中的元素的个数
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.count

collect(应注意到,前面所有转换算子操作都结合了collect动作算子进行计算输出)
以Array返回RDD的所有元素。一般在过滤或者处理足够小的结果的时候使用

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.collect

RDD常用动作算子(二)

take

  • 返回前n个元素
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.take(3)

first

  • 返回RDD第一个元素
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.first

RDD常用动作算子(三)

reduce:根据指定函数,对RDD中元素进行两两计算,返回计算结果

val a=sc.parallelize(1 to 100)
a.reduce((x,y)=>x+y)
a.reduce(_+_)		//与上面等价
val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1)))
b.reduce((x,y)=>{(x._1+y._1,x._2+y._2)})		//(AABBC,6)

foreach:对RDD中的每个元素都使用指定函数,无返回值

val a=sc.parallelize(1 to 100)
a.reduce((x,y)=>x+y)
a.reduce(_+_)		//与上面等价
val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1)))
b.reduce((x,y)=>{(x._1+y._1,x._2+y._2)})		//(AABBC,6)

示例

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ActionRddDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("action")

    val sc=new SparkContext(conf)

    val rdd1:RDD[Int]=sc.makeRDD(1 to 10,1)
    val sum:Int=rdd1.reduce((x,y)=>{println(x,y);x+y})
    println("总和:"+sum)
    }
}

结果展示:
在这里插入图片描述

RDD常用动作算子(四)

lookup:用于PairRDD,返回K对应的所有V直值

val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.lookup('a')		//输出WrappedArray(1, 2)

最值:返回最大值、最小值

val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.lookup('a')		//输出WrappedArray(1, 2)

saveAsTextFile:保存RDD数据至文件系统

val rdd=sc.parallelize(1 to 10,2)
rdd.saveAsTextFile("hdfs://hadoop000:8020/data/rddsave/")

示例

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ActionRddDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("action")

    val sc=new SparkContext(conf)
    //存入本地
    rdd1.saveAsTextFile("in/rdd1.txt")
    //存入HDFS
    rdd1.saveAsTextFile("hdfs://hadoop100:9000/kb09workspace/rdd1demodata/")
  }
}

结果展示:
在这里插入图片描述
在这里插入图片描述


本文链接: http://www.dtmao.cc/news_show_350305.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?