SPARK快学大数据分析概要

2022-12-19,,,

Spark 是一个用来实现快速而通用的集群计算的平台。在速度方面,Spark 扩展了广泛使用的MapReduce 计算模型,而且高效地支持更多计算模式,包括交互式查询和流处理。在处理大规模数据集时,速度是非常重要的。Spark 的一个主要特点就是能够在内存中进行计算,因而更快。不过即使是必须在磁盘上进行的复杂计算,Spark 依然比MapReduce 更加高效。

总的来说,Spark 适用于各种各样原先需要多种不同的分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理。通过在一个统一的框架下支持这些不同的计算,Spark使我们可以简单而低耗地把各种处理流程整合在一起。而这样的组合,在实际的数据分析过程中是很有意义的。

Spark 所提供的接口非常丰富。除了提供基于Python、Java、Scala 和SQL 的简单易用的API 以及内建的丰富的程序库以外,Spark 还能和其他大数据工具密切配合使用。例如,Spark 可以运行在Hadoop 集群上,访问包括Cassandra 在内的任意Hadoop 数据源。

Spark 项目包含多个紧密集成的组件。各组件间密切结合的设计原理有这样几个优点,首先,软件栈中所有的程序库和高级组件都可以从下层的改进中获益。比如,当Spark 的核心引擎新引入了一个优化时,SQL 和机器学习程序库也都能自动获得性能提升。其次,运行整个软件栈的代价变小了。不需要运行5 到10 套独立的软件系统了,一个机构只需要运行一套软件系统即可。最后,密切结合的原理的一大优点就是,我们能够构建出无缝整合不同处理模型的应用。例如,利用Spark,你可以在一个应用中实现将数据流中的数据使用机器学习算法进行实时分类。与此同时,数据分析师也可以通过SQL 实时查询结果数据,比如将数据与非结构化的日志文件进行连接操作。不仅如此,有经验的数据工程师和数据科学家还可以通过Python shell 来访问这些数据,进行即时分析。其他人也可以通过独立的批处理应用访问这些数据。IT 团队始终只需要维护一套系统即可。

Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core 中还包含了对弹性分布式数据集(resilient distributed dataset,简
称RDD)的API 定义。RDD 表示分布在多个计算节点上可以并行操作的元素集合,是Spark 主要的编程抽象。Spark Core 提供了创建和操作这些集合的多个API。还有dataset,dataframe;

Spark SQL 是Spark 用来操作结构化数据的程序包。通过Spark SQL,我们可以使用SQL或者Apache Hive 版本的SQL 方言(HQL)来查询数据。Spark SQL 支持多种数据源,比如Hive 表、Parquet 以及JSON 等。除了为Spark 提供了一个SQL 接口,Spark SQL 还支持开发者将SQL 和传统的RDD 编程的数据操作方式相结合,不论是使用Python、Java 还是Scala,开发者都可以在单个的应用中同时使用SQL 和复杂的数据分析。

Spark Streaming 是Spark 提供的对实时数据进行流式计算的组件。比如生产环境中的网页服务器日志,或是网络服务中用户提交的状态更新组成的消息队列,都是数据流。Spark Streaming 提供了用来操作数据流的API,并且与Spark Core 中的RDD API 高度对应。这样一来,程序员编写应用时的学习门槛就得以降低,不论是操作内存或硬盘中的数据,还是操作实时数据流,程序员都更能应对自如。从底层设计来看,Spark Streaming 支持与Spark Core 同级别的容错性、吞吐量以及可伸缩性。

Spark 中还包含一个提供常见的机器学习(ML)功能的程序库,叫作MLlib。MLlib 提供了很多种机器学习算法,包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。MLlib 还提供了一些更底层的机器学习原语,包括一个通用的梯度下降优化算法。所有这些方法都被设计为可以在集群上轻松伸缩的架构。

GraphX 是用来操作图(比如社交网络的朋友关系图)的程序库,可以进行并行的图计算。

集群管理器

就底层而言,Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark 支持在各种集群管理器(cluster manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark 自带的一个简易调度器,叫作独立调度器。如果要在没有预装任何集群管理器的机器上安装Spark,那么Spark自带的独立调度器可以让你轻松入门;而如果已经有了一个装有Hadoop YARN 或Mesos的集群,通过Spark 对这些集群管理器的支持,你的应用也同样能运行在这些集群上。

spark的主要应用分两大类:数据科学应用和数据处理应用

Spark 通过一系列组件支持各种数据科学任务。Spark shell 通过提供Python 和Scala 的接口,使我们方便地进行交互式数据分析。Spark SQL 也提供一个独立的SQL shell,我们可以在这个shell 中使用SQL 探索数据,也可以通过标准的Spark 程序或者Spark shell 来进行SQL 查询。机器学习和数据分析则通过MLlib 程序库提供支持。另外,Spark 还能支持调用R 或者Matlab 写成的外部程序。

Spark的存储层次

Spark 不仅可以将任何Hadoop 分布式文件系统(HDFS)上的文件读取为分布式数据集,也可以支持其他支持Hadoop 接口的系统,比如本地文件、亚马逊S3、Cassandra、Hive、
HBase 等。我们需要弄清楚的是,Hadoop 并非Spark 的必要条件,Spark 支持任何实现了Hadoop 接口的存储系统。Spark 支持的Hadoop 输入格式包括文本文件、SequenceFile、
Avro、Parquet 等

Spark 本身是用Scala 写的,运行在Java 虚拟机(JVM)上。

Spark 带有交互式的shell,可以作即时数据分析。

Spark shell 可用来与分布式存储在许多机器的内存或者硬盘上的数据进行交互,并且处理过程的分发由Spark 自动控制完成。由于Spark 能够在工作节点上把数据读取到内存中,所以许多分布式计算都可以在几秒钟之内完成,哪怕是那种在十几个节点上处理TB 级别的数据的计算。这就使得一般需要在shell 中完成的那些交互式的即时探索性分析变得非常适合Spark。

设置日志输出控制级别:

conf文件夹下log4j.properties.template复制一份为log4j.properties,log4j.rootCategory=INFO, console修改为log4j.rootCategory=WARN, console,只输出警告或者更严重的日志。

spark shell运行spark程序:

从上层来看,每个Spark 应用都由一个驱动器程序(driver program)来发起集群上的各种并行操作。驱动器程序包含应用的main 函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。

驱动器程序通过一个SparkContext 对象来访问Spark。这个对象代表对计算集群的一个连接。shell 启动时已经自动创建了一个SparkContext 对象,是一个叫作sc 的变量。

一旦有了SparkContext,你就可以用它来创建RDD,然后进行各种操作。要执行这些操作,驱动器程序一般要管理多个执行器(executor)节点。你可以将这个shell连接到集群(或者本地)上来进行并行的数据分析。最后,我们有很多用来传递函数的API,可以将对应操作运行在集群上。

独立应用

一旦完成了应用与Spark 的连接,接下来就需要在你的程序中导入Spark 包并且创建SparkContext。你可以通过先创建一个SparkConf 对象来配置你的应用,然后基于这个SparkConf 创建一个SparkContext 对象。

val conf = new SparkConf().setMaster("local[4]").setAppName("My App")

setMaster("local[4]") 本地4核4线程运行spark,如果是集群则是集群的url

setAppName设置应用名

rdd操作之后,关闭Spark 可以调用SparkContext 的stop() 方法

SparkSession实质上是SQLContext和HiveContext的组合

RDD编程

在任何时候都能进行重算是我们为什么把RDD 描述为“弹性”的原因。当保存RDD 数据的一台机器失败时,Spark 还可以使用这种特性来重算出丢掉的分区,这一过程对用户是完全透明的。

Spark 中的RDD 就是一个不可变的分布式对象集合。每个RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。

用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比如list 和set)。

RDD 支持两种类型的操作: 转化操作(transformation) 和行动操作(action)。

转化操作会由一个RDD 生成一个新的RDD。

行动操作会对RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中。

转化操作和行动操作的区别在于Spark 计算RDD 的方式不同。虽然你可以在任何时候定义新的RDD,但Spark 只会惰性计算这些RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。惰性的好处是:如果每次转换操作都保存下数据,就会占用大量的空间,相反,形成转化链后,它就可以只计算求结果时真正需要的数据。

默认情况下,Spark 的RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist() 让Spark 把这个RDD 缓存下来。

创建RDD 最简单的方式就是把程序中一个已有的集合传给SparkContext 的parallelize()方法

val lines = sc.parallelize(List("pandas", "i like pandas"))

更常用的方式是从外部存储中读取数据来创建RDD

makeRDD与parallelize功能基本相同,但makeRDD可以接收参数

scala> val lines=sc.parallelize(List(1,2,5,3))

lines: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[159] at parallelize at <console>:34

scala> val lines1=sc.makeRDD(List(1,2,5,3))

lines1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[160] at makeRDD at <console>:34

scala> val lst=List(8,9,5,4)

lst: List[Int] = List(8, 9, 5, 4)

scala> val lines1=sc.makeRDD(lst)

lines1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[161] at makeRDD at <console>:36

RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的RDD 的操作,比如map() 和filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count() 和first()。

我们在驱动器程序中使用take() 获取了RDD 中的少量元素。然后在本地遍历这些元素,并在驱动器端打印出来。RDD 还有一个collect() 函数,可以用来获取整个RDD 中的数据。如果你的程序把RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用collect(),因此,collect() 不能用在大规模数据集上。

在大多数情况下,RDD 不能通过collect() 收集到驱动器进程中,因为它们一般都很大。此时,我们通常要把数据写到诸如HDFS 或Amazon S3 这样的分布式的存储系统中。你可以使用saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把RDD 的数据内容以各种自带的格式保存起来。

需要注意的是,每当我们调用一个新的行动操作时,整个RDD 都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化,RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前Spark 不会开始计算

Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算

常见的转化操作和行动操作

针对各个元素的转化操作:

两个最常用的转化操作是map() 和filter()

flatMap() 和map() 的区别。你可以把flatMap() 看作将返回的迭代器“拍扁”,这样就得到了一个由各列表中的元素组成的RDD,而不是一个由列表组成的RDD。

scala> val lines12=sc.makeRDD(List("ni hao","i love you","haha"))

lines12: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[167] at makeRDD at <console>:34

scala> lines12.map(x=>x.split(" ")).collect

res70: Array[Array[String]] = Array(Array(ni, hao), Array(i, love, you), Array(haha))

scala> lines12.flatMap(x=>x.split(" ")).collect

res71: Array[String] = Array(ni, hao, i, love, you, haha)

scala> val lines12=sc.makeRDD(List("ni hao","i love you","haha","haha"))

lines12: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[177] at makeRDD at <console>:34

scala> lines12.distinct.collect

res74: Array[String] = Array(i love you, ni hao, haha)

伪集合操作

RDD不是集合,但可以进行合并,相交等操作

最简单的集合操作是union(other),它会返回一个包含两个RDD 中所有元素的RDD。

scala> val lines122=sc.makeRDD(List("ni hao","i love you","haha","haha","see you"))

lines122: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[181] at makeRDD at <console>:34

scala> val lines12=sc.makeRDD(List("ni hao","i love you","haha","haha"))

lines12: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[182] at makeRDD at <console>:34

scala> lines12.union(lines122).distinct.collect

res77: Array[String] = Array(i love you, ni hao, see you, haha)

scala> lines12.union(lines122).collect

res78: Array[String] = Array(ni hao, i love you, haha, haha, ni hao, i love you, haha, haha, see you)

Spark 还提供了intersection(other) 方法,只返回两个RDD 中都有的元素。intersection()在运行时也会去掉所有重复的元素(单个RDD 内的重复元素也会一起移除)。intersection() 的性能却要差很多,因为它需要通过网络混洗数据来发现共有的元素。
scala> lines12.intersection(lines122).collect
res80: Array[String] = Array(i love you, ni hao, haha)

subtract(other) 函数接收另一个RDD 作为参数,返回一个由只存在于第一个RDD 中而不存在于第二个RDD 中的所有元素组成的RDD。和intersection() 一样,它也需要数据混洗。

scala> lines12.subtract(lines122).collect

res81: Array[String] = Array()

scala> lines122.subtract(lines12).collect

res82: Array[String] = Array(see you)

cartesian返回笛卡尔积

scala> lines122.cartesian(lines12).collect
res83: Array[(String, String)] = Array((ni hao,ni hao), (ni hao,i love you), (ni hao,haha), (ni hao,haha), (i love you,ni hao), (i love you,i love you), (i love you,haha), (i love you,haha), (haha,ni hao), (haha,i love you), (haha,haha), (haha,haha), (haha,ni hao), (see you,ni hao), (haha,i love you), (see you,i love you), (haha,haha), (see you,haha), (haha,haha), (see you,haha))

scala> lines12.filter(x=>x.contains("o")).collect
res89: Array[String] = Array(ni hao, i love you)

取样

scala> lines.collect res100: Array[Int] = Array(1, 2, 5, 3)

scala> lines.sample(false,0.5).collect

res101: Array[Int] = Array(1, 3)

scala> lines.sample(false,0.5).collect

res102: Array[Int] = Array(1, 2, 5, 3)

scala> lines.sample(false,0.5).collect

res103: Array[Int] = Array(2, 5, 3)

scala> lines.sample(false,0.5).collect

res104: Array[Int] = Array(1, 3)

 行动操作

scala> lines.collect
res90: Array[Int] = Array(1, 2, 5, 3)

行动操作reduce()。它接收一个函数作为参数,这个函数要操作两个RDD 的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数+,可以用它来对我们的RDD 进行累加。使用reduce(),可以很方便地计算出RDD中所有元素的总和、元素的个数,以及其他类型的聚合操作

scala> lines.collect

res114: Array[Int] = Array(1, 2, 5, 3)

scala> lines.reduce(_+_)

res115: Int = 11

scala> lines.reduce((x,y)=>x+y)

res116: Int = 11

fold() 和reduce() 类似,接收一个与reduce() 接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作的单位元素;也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如+对应的0,* 对应的1,或拼接操作对应的空列表)。

def fold(zeroValue: Int)(op: (Int, Int) => Int): Int

scala> val arr=sc.makeRDD(Array(1,2,5,3),1)

arr: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[263] at makeRDD at <console>:34

scala> arr.fold(1)(_+_)

res191: Int = 13

scala> arr.fold(0)(_+_)

res192: Int = 11

scala> val arr=sc.makeRDD(Array(1,2,5,3),2)

arr: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[264] at makeRDD at <console>:34

scala> arr.fold(0)(_+_)

res193: Int = 11

scala> arr.fold(1)(_+_)

res194: Int = 14

scala> val arr=sc.makeRDD(Array(1,2,5,3),4)

arr: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[265] at makeRDD at <console>:34

scala> arr.fold(1)(_+_)

res195: Int = 16

上例可以看出fold初始值不为0的情况下,结果和RDD的partition的数量有很多关系,以4个数的数据为例:分了4个区,一个区一个数,每个数字加1,得到2,3,6,4,各个分区求和1+(2+3+6+4)=16;以4个数分两个区为例:(1,2)一个分区,(5,3)一个分区,分区求和  1+1+2=4,1+5+3=9,两个分区求和1+4+9=14;只有一个分区的时候分区求和:1+1+2+5+3=12,一个分区求和1+12=13

数组的计算,数组没有分区,如下:

scala> val arr1=Array(1,2,5,3) arr1: Array[Int] = Array(1, 2, 5, 3)

scala> arr1.fold(0)(_+_) res198: Int = 11

scala> arr1.fold(1)(_+_) res199: Int = 12

scala> arr1.fold(3)(_+_) res200: Int = 14

aggregate() 函数则把我们从返回值类型必须与所操作的RDD 类型相同的限制中解放出来。与fold() 类似,使用aggregate() 时,需要提供我们期待返回的类型的初始值。然后通过一个函数把RDD 中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。

scala> val arr=sc.makeRDD(Array(1,2,5,3),4)

arr: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[265] at makeRDD at <console>:34

scala> arr.fold(1)(_+_) res195: Int = 16

scala> arr.aggregate(1)((x,y)=>x+y,(m,n)=>m+n)

res196: Int = 16

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue. seqOp的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不同类型的值。因此,需要一个操作seqOp来把分区中的元素T合并成一个U,另外一个操作combOp把所有U聚合。

求平均值

scala> val arr=sc.makeRDD(Array(1,2,5,3),2)

arr: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[268] at makeRDD at <console>:34

scala> val res=arr.aggregate(0,0)((acc,number)=>(acc._1+number,acc._2+1),(part1,part2)=>(part1._1+part2._1,part1._2+part2._2))

res: (Int, Int) = (11,4)

scala> res._1/res._2

res227: Int = 2

首先,初始值是(0,0),这个值在后面2步会用到。然后,(acc,number) => (acc._1 + number, acc._2 + 1),acc._1相当于初始值第一个0,acc._2相当于初始值第二个0,number即是函数定义中的T,这里即是List中的元素。上例RDD分了2个分区,可能是(1,2),(5,3),分区计算结果是(3,2),(8,2),再执行(part1,part2)=>(part1._1+part2._1,part1._2+part2._2),即(3+8,2+2)得(11,4)

scala> val res=arr.aggregate(0,0)((acc,number)=>(acc._1+1,acc._2+number),(part1,part2)=>(part1._1+part2._1,part1._2+part2._2))
res: (Int, Int) = (4,11)

scala> val res=arr.aggregate(0,0,1)((acc,number)=>(acc._1+1,acc._2+number,acc._3*number),(part1,part2)=>(part1._1+part2._1,part1._2+part2._2,part1._3*part2._3))
res: (Int, Int, Int) = (4,11,30)  //4是RDD元素个数,11为RDD各个元素之和,30为各RDD元素的乘积1*2*5*3=30

scala> val arr=sc.makeRDD(Array(1,2,5,3,5,8,5,7),2)

arr: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[307] at makeRDD at <console>:34

scala> arr.countByValue()  //返回的是Map类型

res259: scala.collection.Map[Int,Long] = Map(5 -> 3, 1 -> 1, 2 -> 1, 7 -> 1, 3 -> 1, 8 -> 1)

scala> arr.takeOrdered(3)

res263: Array[Int] = Array(1, 2, 3)

scala> arr.takeOrdered(1)

res264: Array[Int] = Array(1)

scala> arr.take(2)

res265: Array[Int] = Array(1, 2)

有些函数只能用于特定类型的RDD,比如mean() 和variance() 只能用在数值RDD 上,而join() 只能用在键值对RDD 上

scala> arr.mean() //均值
res283: Double = 4.5

scala> arr.variance//方差
res284: Double = 5.0

在不同RDD类型间转换

在Scala 中,将RDD 转为有特定函数的RDD(比如在RDD[Double] 上进行数值操作)是由隐式转换来自动处理的,需要加上import org.apache.spark.SparkContext._ 来使用这些隐式转换

持久化(缓存)

Spark RDD 是惰性求值的,而有时我们希望能多次使用同一个RDD。如果简单地对RDD 调用行动操作,Spark 每次都会重算RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。

为了避免多次计算同一个RDD,可以让Spark 对数据进行持久化。当我们让Spark 持久化存储一个RDD 时,计算出RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。

出于不同的目的,我们可以为RDD 选择不同的持久化级别

scala> import org.apache.spark.storage._

import org.apache.spark.storage._

scala> arr1.persist(StorageLevel.DISK_ONLY)

res6: arr1.type = MapPartitionsRDD[3] at distinct at <console>:31

scala> arr1.collect

res7: Array[Int] = Array(8, 2, 1, 3, 7, 5)

scala> arr1.unpersist()

res9: arr1.type = MapPartitionsRDD[3] at distinct at <console>:31

如果要缓存的数据太多,内存中放不下,Spark 会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算。但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘。不论哪一种情况,都不必担心你的作业因为缓存了太多数据而被打断。不过,缓存不必要的数据会导致有用的数据被移出内存,带来更多重算的时间开销。

最后,RDD 还有一个方法叫作unpersist(),调用该方法可以手动把持久化的RDD 从缓存中移除。

键值对操作
键值对RDD 是Spark 中许多操作所需要的常见数据类型。键值对RDD 通常用来进行聚合计算.键值对RDD 提供了一些新的操作接口(比如统计每个产品的评论,将数据中键相同的分为一组,将两个不同的RDD 进行分组合并等)

让用户控制键值对RDD 在各节点上分布情况的高级特性:分区,使用可控的分区方式把常被一起访问的数据放到同一个节点上,可以大大减少应用的通信开销。这会带来明显的性能提升.

scala> val strdd=sc.makeRDD(Array("tian yontao","tian zhidan","liu zhiyng","ma huateng"))

strdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at makeRDD at <console>:32

scala> val arrlist=pairdd.map(x=>(x.split(" ")(0),x)).collect
arrlist: Array[(String, String)] = Array((tian,tian yontao), (tian,tian zhidan), (liu,liu zhiyng), (ma,ma huateng))

scala> val pardd=sc.parallelize(arrlist)  //创建pair rdd(键值对RDD)

pardd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[11] at parallelize at <console>:36

scala> pardd.collect

res23: Array[(String, String)] = Array((tian,tian yontao), (tian,tian zhidan), (liu,liu zhiyng), (ma,ma huateng))

scala> pardd.groupByKey().collect
res32: Array[(String, Iterable[String])] = Array((ma,CompactBuffer(ma huateng)), (liu,CompactBuffer(liu zhiyng)), (tian,CompactBuffer(tian yontao, tian zhidan)))

scala> pardd.reduceByKey((x,y)=>x+'-'+y).collect
res37: Array[(String, String)] = Array((ma,ma huateng), (liu,liu zhiyng), (tian,tian yontao-tian zhidan))

combineByKey( createCombiner,mergeValue,mergeCombiners,partitioner)

combineByKey() 是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate() 一样,combineByKey() 可以让用户返回与输入数据的类型不同的返回值。要理解combineByKey(), 要先理解它在处理数据时是如何处理每个元素的。由于combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作createCombiner() 的函数来创建那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD 中第一次出现一个键时发生。如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并。由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners() 方法将各个分区的结果进行合并。

scala> val parrdd=sc.parallelize(List((1,10),(1,30),(2,50),(3,20),(2,81),(5,10)),2)

parrdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:32

scala> parrdd.combineByKey((value)=>(value,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).collect

res49: Array[(Int, (Int, Int))] = Array((2,(131,2)), (1,(40,2)), (3,(20,1)), (5,(10,1)))

scala> parrdd.combineByKey((value)=>(value,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).map{case(k,v)=>(k,v._1/v._2)}.collect()

res50: Array[(Int, Int)] = Array((2,65), (1,20), (3,20), (5,10))

连接

将有键的数据与另一组有键的数据一起使用是对键值对数据执行的最有用的操作之一。连接数据可能是pair RDD 最常用的操作之一。连接方式多种多样:右外连接、左外连接、交叉连接以及内连接。

scala> val arrnum=sc.makeRDD(Array((1,1), (2,7), (3,1), (4,1), (1,21), (5,1), (2,1), (1,5), (1,10)))

arrnum: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[98] at makeRDD at <console>:32

scala> val maprdd1=arrnum.reduceByKey((x,y)=>x+y)

maprdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[99] at reduceByKey at <console>:34

scala> maprdd1.collect

res105: Array[(Int, Int)] = Array((4,1), (1,37), (5,1), (2,8), (3,1))

scala> val maps=arrnum.countByKey

maps: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 4, 2 -> 2, 3 -> 1, 4 -> 1)

scala> val maprdd2=sc.makeRDD(maps.toArray)

maprdd2: org.apache.spark.rdd.RDD[(Int, Long)] = ParallelCollectionRDD[102] at makeRDD at <console>:36

scala> maprdd2.collect

res106: Array[(Int, Long)] = Array((5,1), (1,4), (2,2), (3,1), (4,1))
scala> maprdd1.join(maprdd2).collect
res108: Array[(Int, (Int, Long))] = Array((4,(1,1)), (1,(37,4)), (5,(1,1)), (2,(8,2)), (3,(1,1)))

scala> maprdd1.join(maprdd2).map{case (x,y)=>(x,y._1/y._2)}.collect
res119: Array[(Int, Long)] = Array((4,1), (1,9), (5,1), (2,4), (3,1))

subtract:减去,减

Scala通过case语句提供了形式简单、功能强大的模式匹配功能。

在Scala中,被“{}”包含的一系列case语句可以被看成是一个函数字面量,它可以被用在任何普通的函数字面量适用的地方,例如被当做参数传递。

subtractByKey :删掉RDD 中键与other RDD 中的键相同的元素

scala> val maprdd11=maprdd1.filter{case (x,y)=>x!=1} //对第一个元素进行筛选
maprdd11: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[129] at filter at <console>:36
scala> maprdd2.collect
res132: Array[(Int, Long)] = Array((5,1), (1,4), (2,2), (3,1), (4,1))

scala> maprdd11.collect
res134: Array[(Int, Int)] = Array((4,1), (5,1), (2,8), (3,1))

scala> maprdd2.subtractByKey(maprdd11).collect
res133: Array[(Int, Long)] = Array((1,4))

rightOuterJoin 对两个RDD 进行连接操作,确保第一个RDD 的键必须存在(右外连接)

leftOuterJoin 对两个RDD 进行连接操作,确保第二个RDD 的键必须存在(左外连接)

cogroup 将两个RDD 中拥有相同键的数据分组到一起

scala> maprdd2.cogroup(maprdd11).collect
res140: Array[(Int, (Iterable[Long], Iterable[Int]))] = Array((4,(CompactBuffer(1),CompactBuffer(1))), (1,(CompactBuffer(4),CompactBuffer())), (5,(CompactBuffer(1),CompactBuffer(1))), (2,(CompactBuffer(2),CompactBuffer(8))), (3,(CompactBuffer(1),CompactBuffer(1))))

  def lookup(key: Int): Seq[Int]

返回给定键对应的所有的值

scala> maprdd1.lookup(1)
res120: Seq[Int] = ArrayBuffer(37)

def sortByKey(ascending: Boolean,numPartitions: Int): org.apache.spark.rdd.RDD[(Int, Long)]

scala> maprdd2.sortByKey(false).collect //降序排列

res161: Array[(Int, Long)] = Array((5,1), (4,1), (3,1), (2,2), (1,4))

scala> maprdd2.sortByKey().collect //默认升序

res162: Array[(Int, Long)] = Array((1,4), (2,2), (3,1), (4,1), (5,1))

并行度调优

每个RDD 都有固定数目的分区,分区数决定了在RDD 上执行操作时的并行度。在执行聚合或分组操作时,可以要求Spark 使用给定的分区数。Spark 始终尝试根据集群的大小推断出一个有意义的默认值,但是有时候你可能要对并行度进行调优来获取更好的性能表现。

大多数操作符都能接收第二个参数,这个参数用来指定分组结果或聚合结果的RDD 的分区数.

有时,我们希望在除分组操作和聚合操作之外的操作中也能改变RDD 的分区。对于这样的情况,Spark 提供了repartition() 函数。它会把数据通过网络进行混洗,并创建出新的分区集合。切记,对数据进行重新分区是代价相对比较大的操作。Spark 中也有一个优化版的repartition(),叫作coalesce()。

scala> testrdd.partitions.size
res180: Int = 4
scala> testrdd.coalesce(2).partitions.size
res183: Int = 2

scala> testrdd.coalesce(10,true).partitions.size

res188: Int = 10

scala> testrdd.coalesce(10).partitions.size //shuffle 默认为false,如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变

res189: Int = 4

scala> testrdd.coalesce(10).partitions.size
res185: Int = 4

scala> testrdd.repartition(2).partitions.size

res186: Int = 2

scala> testrdd.repartition(6).partitions.size

res187: Int = 6

数据分区

Spark 对数据集在节点间的分区进行控制,在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程序可以通过控制RDD 分区方式来减少通信开销。

分区并不是对所有应用都有好处的——比如,如果给定RDD 只需要被扫描一次,我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。

Spark 中所有的键值对RDD 都可以进行分区。系统会根据一个针对键的函数对元素进行分组。尽管Spark 没有给出显示控制每个键具体落在哪一个工作节点上的方法(部分原因是Spark 即使在某些节点失败时依然可以工作),但Spark 可以确保同一组的键出现在同一个节点上。

cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。Spark通过调用rdd.unpersit来释放缓存和persist一样,这是通过SparkContext.unpersistRDD来实现的。

一张很大的用户信息表——也就是一个由(UserID, UserInfo) 对组成的RDD,其中UserInfo 包含一个该用户所订阅的主题的列表。该应用会周期性地将这张表与一个小文件(events)进行组合,这个小文件中存着过去五分钟内发生的事件——其实就是一个由(UserID, LinkInfo) 对组成的表,存放着过去五分钟内某网站各用户的访问情况。

Scala 自定义分区方式
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // 构造100个分区
.persist()

由于在构建userData 时调用了partitionBy(),Spark 就知道了该RDD 是根据键的哈希值来分区的,这样在调用join() 时,Spark 就会利用到这一点。具体来说,当调用userData.join(events) 时,Spark 只会对events 进行数据混洗操作,将events 中特定UserID 的记录发送到userData 的对应分区所在的那台机器上。这样,需要通过网络传输的数据就大大减少了,程序运行速度也可以显著提升了。

注意,partitionBy() 是一个转化操作,因此它的返回值总是一个新的RDD,但它不会改变原来的RDD。RDD 一旦创建就无法修改。因此应该对partitionBy() 的结果进行持久化,并保存为userData,而不是原来的sequenceFile() 的输出。此外,传给partitionBy() 的100 表示分区数目,它会控制之后对这个RDD 进行进一步操作(比如连接操作)时有多少任务会并行执行。总的来说,这个值至少应该和集群中的总核心数一样。

可以使用RDD 的partitioner 属性来获取RDD 的分区方式。

scala> val arrnum=sc.makeRDD(Array((1,1), (2,7), (3,1), (4,1), (1,21), (5,1), (2,1), (1,5), (1,10)),3)

arrnum: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[210] at makeRDD at <console>:32

scala> arrnum.partitioner

res219: Option[org.apache.spark.Partitioner] = None

scala> arrnum.partitions.size

res220: Int = 3

scala> val arrnum00=arrnum.partitionBy(new HashPartitioner(2))

arrnum00: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[212] at partitionBy at <console>:38

scala> arrnum00.partitioner

res221: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

从分区中获益的操作

就Spark 1.0 而言,能够从数据分区中获益的操作有cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey() 以及lookup()。

对于像reduceByKey() 这样只作用于单个RDD 的操作,运行在未分区的RDD 上的时候会导致每个键的所有对应值都在每台机器上进行本地计算,只需要把本地最终归约出的结果值从各工作节点传回主节点,所以原本的网络开销就不算大。

而对于诸如cogroup() 和join() 这样的二元操作,预先进行数据分区会导致其中至少一个RDD(使用已知分区器的那个RDD)不发生数据混洗。如果两个RDD 使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个RDD 是通过mapValues() 从另一个RDD 中创建出来的,这两个RDD 就会拥有相同的键和分区方式),或者其中一个RDD 还没有被计算出来,那么跨节点的数据混洗就不会发生了。

数据读取与保存

读取SequenceFile
  scala> import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.io.{IntWritable, Text}

scala> val data = sc.sequenceFile("/spark/parinum/p*", classOf[Text], classOf[IntWritable]).map{case (x,y)=>(x.toString,y.get)}

data: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[216] at map at <console>:37

scala> data.collect

res229: Array[(String, Int)] = Array((zhangsan,100), (wangwu,250), (xiaoma,120), (laozhan,300), (tiandi,60))

保存SequenceFile

scala> numpairdd.collect
res224: Array[(String, Int)] = Array((zhangsan,100), (wangwu,250), (xiaoma,120), (laozhan,300), (tiandi,60))

scala> numpairdd.saveAsSequenceFile("/spark/parinum")

查看hdfs

[root@host conf]# hdfs dfs -ls -R /spark
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
drwxr-xr-x   - root supergroup          0 2018-06-25 16:08 /spark/parinum
-rw-r--r--   1 root supergroup          0 2018-06-25 16:08 /spark/parinum/_SUCCESS
-rw-r--r--   1 root supergroup        125 2018-06-25 16:08 /spark/parinum/part-00000
-rw-r--r--   1 root supergroup        143 2018-06-25 16:08 /spark/parinum/part-00001

SPARK快学大数据分析概要的相关教程结束。

《SPARK快学大数据分析概要.doc》

下载本文的Word格式文档,以方便收藏与打印。