在spark中,我们知道一切的操作都是基于RDD的。在使用中,RDD有一种非常特殊也是非常实用的format——pair RDD,即RDD的每一行是(key, value)的格式。这种格式很像Python的字典类型,便于针对key进行一些处理。
针对pair RDD这样的特殊形式,spark中定义了许多方便的操作,今天主要介绍一下reduceByKey和groupByKey,因为在接下来讲解《在spark中如何实现SQL中的group_concat功能?》时会用到这两个operations。
groupByKey(numPartitions=None)
groupByKey也是对每个key进行操作,但只生成一个sequence。需要特别注意“Note”中的话,它告诉我们:如果需要对sequence进行aggregation操作(注意,groupByKey本身不能自定义操作函数),那么,选择reduceByKey/aggregateByKey更好。这是因为groupByKey不能自定义函数,我们需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。
reduceByKey(func, numPartitions=None)
reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。
为了更好的理解上面这段话,下面我们使用两种不同的方式去计算单词的个数[2]:val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _) val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))
上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一样的,但是,它们的内部运算过程是不同的。
(1)当采用reduceByKeyt时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合。借助下图可以理解在reduceByKey里究竟发生了什么。 注意在数据对被搬移前同一机器上同样的key是怎样被组合的(reduceByKey中的lamdba函数)。然后lamdba函数在每个区上被再次调用来将所有值reduce成一个最终结果。整个过程如下:
(2)当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。整个过程如下:
因此,在对大数据进行复杂计算时,reduceByKey优于groupByKey。
另外,如果仅仅是group处理,那么以下函数应该优先于 groupByKey :
- combineByKey 组合数据,但是组合之后的数据类型与输入时值的类型不一样
- foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。
最后,对reduceByKey中的func做一些介绍:
如果是用Python写的spark,那么有一个库非常实用:operator[3],其中可以用的函数包括:大小比较函数,逻辑操作函数,数学运算函数,序列操作函数等等。这些函数可以直接通过“from operator import *”进行调用,直接把函数名作为参数传递给reduceByKey即可。如下:
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())
结果:
[('a', 2), ('b', 1)]
官方文档描述:
从源码中可以看出groupByKey()是基于combineByKey()实现的, 只是将 Key 相同的 records 聚合在一起,一个简单的 shuffle 过程就可以完成。ShuffledRDD 中的 compute() 只负责将属于每个 partition 的数据 fetch 过来,之后使用 mapPartitions() 操作进行 aggregate,生成 MapPartitionsRDD,到这里 groupByKey() 已经结束。最后为了统一返回值接口,将 value 中的 ArrayBuffer[] 数据结构抽象化成 Iterable[]。groupByKey() 没有在 map 端进行 combine(mapSideCombine = false),这样设计是因为map 端 combine 只会省掉 partition 里面重复 key 占用的空间;但是,当重复 key 特别多时,可以考虑开启 combine。
实例:
//示例:
List<Integer> data2 = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = sparkContext.parallelize(data2);
// 转为k,v格式
JavaPairRDD<Integer, Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(integer, 1);
}
});
JavaPairRDD<Integer, Iterable<Integer>> groupByKeyRDD = javaPairRDD.groupByKey(2);
System.out.println(groupByKeyRDD.collect());
// 自定义partition
JavaPairRDD<Integer, Iterable<Integer>> groupByKeyRDD3 = javaPairRDD.groupByKey(new Partitioner() {
// partition各数
@Override
public int numPartitions() {
return 10;
}
// partition方式
@Override
public int getPartition(Object o) {
return (o.toString()).hashCode() % numPartitions();
}
});
System.out.println(groupByKeyRDD3.collect());
结果:
[(4,[1]), (6,[1]), (2,[1]), (1,[1]), (3,[1]), (7,[1]), (5,[1])]
自定义分区结果:
[(2,[1]), (3,[1]), (4,[1]), (5,[1]), (6,[1]), (7,[1]), (1,[1])]