当在Spark程序中执行操作时,它会被划分为不同的阶段(stage),这些阶段的划分依据是否包含shuffle操作和宽依赖。现在,让我们一起看看哪些操作会引发shuffle操作。
一各种bykey类聚合算子
1. reduceByKey算子
reduceByKey操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。
案例实现:统计单词出现次数。
object KeyValue02_reduceByKey {
def main(args: Array[String]): Unit ={
// 1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
// 2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
// 3具体业务逻辑
// 3.1 创建第一个RDD
val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
// 3.2 计算相同key对应值的相加结果
val reduce: RDD[(String, Int)] = rdd.reduceByKey((v1,v2) => v1+v2)
// 3.3 触发计算,查看运行结果
reduce.collect()
Thread.sleep(99999)
// 4 关闭连接
sc.stop()
}
}
运行结果截图:
aggregateByKey算子
aggregateByKey操作可以给每一个分区中的每一种key一个初始值,每一个分区用初始值逐步迭代value,函数分区间实现合并每个分区中的结果。
案例实现:求每个分区相同key对应值的最大值,然后相加。
object KeyValue04_aggregateByKey {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",3),("a",5),("b",7),("b",2),("b",4),("b",6),("a",7)),2)
//3.2 取出每个分区相同key对应值的最大值,然后相加
val res: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_, _), _ + _)
//3.3 触发计算,查看运行结果
res.collect()
Thread.sleep(99999)
//4.关闭连接
sc.stop()
}
}
运行结果截图:
注意:groupByKey、foldByKey、combineByKey同样存在shuffle操作,在这里就不一一列举了。
二排序算子
sortByKey算子
sortByKey操作在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。
案例实现:创建一个pairRDD,按照key的正序和倒序进行排序。
object KeyValue07_sortByKey {
def main(args: Array[String]): Unit ={
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")),3)
//3.2 按照key的正序(默认顺序)
val res: RDD[(Int, String)] = rdd.sortByKey(true)
//3.3 触发计算,查看运行结果
res.collect()
Thread.sleep(99999)
//4.关闭连接
sc.stop()
}
}
运行结果截图:
sortBy算子
sortBy算子操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。
案例实现:创建一个RDD,按照数字大小分别实现正序和倒序排序。
object value10_sortBy {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(List(2, 1, 3, 4, 6, 5),3)
// 3.2 默认是升序排
val sortRdd: RDD[Int] = rdd.sortBy(num => num)
//3.3 触发计算,查看运行结果
sortRdd.collect()
Thread.sleep(99999)
//4.关闭连接
sc.stop()
}
}
运行结果截图:
三分区操作类算子
coalesce算子
coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。该操作缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
案例实现:用coalesce实现4个分区合并为两个分区。
object value08_coalesce {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
//3.1 缩减分区
val coalesceRDD: RDD[Int] = rdd.coalesce(2,true)
//3.2 触发计算,查看运行结果
coalesceRDD.collect()
Thread.sleep(99999)
//4.关闭连接
sc.stop()
}
}
运行结果截图:
repartition
repartition操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
案例实现:创建一个4个分区的RDD,对其重新分区。
object value09_repartition {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3. 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 3)
//3.1 重新分区
val repartitionRdd: RDD[Int] = rdd.repartition(2)
//3.2 触发计算,查看运行结果
repartitionRdd.collect()
Thread.sleep(99999)
//4. 关闭连接
sc.stop()
}
}
运行结果截图:
四去重算子
distinct算子
distinct算子操作对内部的元素进行去重,并将去重后的元素放到新的RDD中。
案例实现:对RDD采用多个Task去重。
object value07_distinct {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
// 3.1 创建一个RDD
val distinctRdd: RDD[Int] = sc.makeRDD(List(1, 2, 1, 5, 2, 9, 6, 1))
// 3.2 打印去重后生成的新RDD
distinctRdd.distinct().collect().foreach(println)
// 3.3 对RDD采用多个Task去重,提高并发度
val res: RDD[Int] = distinctRdd.distinct(2)
// 3.4 触发计算,查看运行结果
res.collect()
Thread.sleep(99999)
//4.关闭连接
sc.stop()
}
}
运行结果截图:
五集合操作类算子
intersection算子
intersection这个算子操作对源RDD和参数RDD求交集后返回一个新的RDD。
案例实现:创建两个RDD,求两个RDD的交集。
object DoubleValue01_intersection {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd1: RDD[Int] = sc.makeRDD(1 to 4)
//3.2 创建第二个RDD
val rdd2: RDD[Int] = sc.makeRDD(4 to 8)
//3.3 计算第一个RDD与第二个RDD的交集并打印
val res: RDD[Int] = rdd1.intersection(rdd2)
//3.4 触发计算,查看运行结果
res.collect()
Thread.sleep(99999)
//4.关闭连接
sc.stop()
}
}
运行结果截图:
subtract算子
subtract算子是计算差的一种函数,去除两个RDD中相同元素,不同的RDD将保留下来。
案例实现:创建两个RDD,求第一个RDD与第二个RDD的差集。
object DoubleValue03_subtract {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具体业务逻辑
//3.1 创建第一个RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4)
//3.2 创建第二个RDD
val rdd1: RDD[Int] = sc.makeRDD(4 to 8)
//3.3 计算第一个RDD与第二个RDD的差集
val res: RDD[Int] = rdd.subtract(rdd1)
// 3.4 触发计算,查看运行结果
res.collect()
Thread.sleep(99999)
//4.关闭连接
sc.stop()
}
}
运行结果截图:
六总结
Spark的shuffle算子主要包括去重算子、byKey聚合类算子、排序算子、重分区算子、和集合操作类算子。