概述
本文对spark rdd的转换和动作进行总结和实际操作演示. RDD(Resilient Distributed Datasets),弹性分布式数据集, 是spark分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型.即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。 rdd 的分布式,因为rdd支持分区, 自动把一个rdd根据partition分发到n台spark设备进行处理. 这些对用户完全透明. 用户感觉和操作本地数据一样.
rdd通过parallelize和textFile或流来创建. 再通过转换得到新的rdd. 转换的过程不是立即执行, 而是在需要动作action时才开始. 这样方便系统进行自动优化.
rdd操作示例
parallelize一般用于测示创建rdd.
scala> val square = sc.parallelize(List(1,2,3,4))
scala> val sq = square.map(x=>x*x).collect()
sq: Array[Int] = Array(1, 4, 9, 16)
scala> val drink1=sc.parallelize(List("coffee","tea","coffee","panda","monkey"))
scala> val rdd2 = sc.parallelize(List("coffee","money","kitty","猫"))
scala> val r11= rdd1.union(rdd2).collect()
r11: Array[String] = Array(coffee, tea, coffee, panda, monkey, coffee, money, kitty, 猫)
scala> val r12 = rdd1.intersection(rdd2).collect()
r12: Array[String] = Array(coffee)
scala> val r13 = rdd1.subtract(rdd2).collect()
r13: Array[String] = Array(tea, panda, monkey)
scala> val users=sc.parallelize(List("user1","user2"))
scala> val tags = sc.parallelize(List("经济","政治","文化"))
scala> users.cartesian(tags).collect()
res1: Array[(String, String)] = Array((user1,经济), (user1,政治), (user1,文化), (user2,经济), (user2,政治), (user2,文化))
普通RDD操作
转换 Transformation
转换有如下一些种类
- map(func)
- filter(func)
- repartition(numPartitions)
- flatMap(func)
- repartitionAndSortWithinPartitions(partitioner)
- mapPartitions(func)
- join(otherDataset, [numTasks])
- mapPartitionsWithIndex(func)
- cogroup(otherDataset, [numTasks])
- sample(withReplacement, fraction, seed)
- cartesian(otherDataset)
- coalesce(numPartitions)
基础转换
测试用RDD 包含 {1, 2, 3, 3}
函数 | 目的 | 示例 | 结果 |
---|---|---|---|
map | 对RDD每个元素应用函数, 并返回一个RDD结果 | rdd.map(x => x + 1) | {2, 3, 4, 4} |
flatMap | 对RDD每个元素应用函数, 并返回一个RDD结果,包含迭代器返回的内容.常用于抽取单词. | rdd.flatMap(x => x.to(3)) | {1, 2, 3, 2, 3, 3, 3} |
filter | 返回一个RDD结果, 由通过了filter的元素组成. | rdd.filter(x => x != 1) | {2, 3, 3} |
distinct | 移除重复元素 | rdd.distinct() | {1, 2, 3} |
sample(withReplacement, fraction, [seed]) | 对 RDD 抽样,withReplacement是指是否有放回的抽样为true为放回,为false为不放回,fraction为抽样占总数据量的比值 | rdd.sample(false, 0.5) | non-deterministic |
两个RDD转换
RDD分别包含 {1, 2, 3} 和 {3, 4, 5}
函数 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 两个RDD并集. | rdd.union(other) | {1, 2, 3, 3, 4, 5} |
intersection() | RDD 交集 | rdd.intersection(other) {3} | |
subtract() | 从一个RDD移除全部存在于另一个RDD的元素.如移除训练数据. | rdd.subtract(other) {1, 2} | |
cartesian() | 两个RDD的笛卡尔积, 一个RDD中每个元素和另一个RDD的每个元素两两组合 | rdd.cartesian(other) | {(1, 3), (1,4), … (3,5)} |
普通RDD 动作(Action)
基础动作
RDD 包含 {1, 2, 3, 3}
函数 | 目的 | 示例 | 结果 |
---|---|---|---|
collect() | 返回 RDD 全部元素 | rdd.collect() | {1, 2, 3, 3} |
count() | RDD 元素总计 | rdd.count() | 4 |
countByValue() | RDD 每个元素出现次数总计 | rdd.countByValue() | {(1, 1), (2, 1), (3, 2)} |
take(num) | 返回 RDD 中num个元素 | rdd.take(2) | {1, 2} |
top(num) | 返回 RDD 中top num个元素 | rdd.top(2) | {3, 3} |
takeOrdered(num)(ordering) | 返回 RDD 中num个元素, 基于给定排序 | rdd.takeOrdered(2)(myOrdering) | {3, 3} |
takeSample(withReplacement, num, [seed]) | 返回 RDD 中随机num个元素, withReplacement表示抽样是否放回. 放回会有重复 | rdd.takeSample(false, 1) | non-deterministic |
reduce(func) | 并发将RDD中的元素联合起来.如加和. | rdd.fold((x, y) => x + y) | 9 |
fold(zero)(func) | 类reduce函数, 但提供初始值设置. | rdd.fold(0)((x, y) => x + y) | 9 |
aggregate(zeroValue)(seqOp, combOp) | 类reduce函数, 但提供初始值设置. 可以返回不同类型 | rdd.aggregate(0, 0) ({case (x, y) => (y._1() + x, y._2() + 1)}, {case (x, y) => (y._1() + x._1(), y._2() + x._2()) }) | (9, 4) |
foreach(func) | 对RDD中每个元素应用函数. | rdd.foreach(func) | 无 |
reduce
对两个元素应用reduce中的函数, 得到一个新的元素, 再用新元素和集合中的元素进行reduce运算. 如+, 求和,计数和其他聚合运算.
scala> val data = sc.parallelize(List(1,2,3,4))
scala> data.collect()
res33: Array[Int] = Array(1, 2, 3, 4)
scala> data.reduce(_ + _)
res35: Int = 10
fold
fold()和reduce类似. 但会对每个分区置初始zero值(+为0,乘为1, 连接列表为空列表).
fold 定义:
def fold(zeroValue: T)(op: (T, T) => T): T
scala> val data = sc.parallelize(List(1,2,3,4))
scala> data.collect()
res33: Array[Int] = Array(1, 2, 3, 4)
scala> data.fold(0)((x,y)=>x+y)
res40: Int = 10
scala> data.fold(1)((x,y)=>x+y)
res41: Int = 15
aggregate
聚合函数可以避免使用map. 和fold差不多, 需要传入一个初始值函数, 再传入一个每个分区的累积函数. 最后传入一个分区之间的累积函数.
下面的代码是求均值. 参数(0,0) 表示总和0,计数0. 参数(acc, value) ,acc表示累计值, value是当前值.其中acc是元组. _1为第一个元素, 为累积值. _2为第二个元素,表示累计计数. 第三个函数(acc1, acc2) 则是各分区的归总combine.
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
scala> val result = data.aggregate((0,0))( (acc,value)=>(acc._1 + value, acc._2+1),(acc1,acc2)=>(acc1._1+acc2._1, acc1._2 + acc2._2))
result: (Int, Int) = (10,4)
scala> val avg = result._1/result._2
avg: Int = 2
# pairRDD 操作
pairRDD是特殊的RDD, 包含key->value元组键值对的RDD, 有特殊操作.
很多变换会返回pairRDD, 也可以将普通RDD转为pairRDD. 如通过map()函数, 将行的第一个单词作为key,行作为value.
val pairs = lines.map(x => (x.split(" ")(0), x))
## 单个pairRDD上的变换 rdd={(1, 2), (3, 4), (3, 6)}
函数 | 目的 | 示例 | 结果 |
---|---|---|---|
reduceByKey(func) | 用同一个key将值联合. | rdd.reduceByKey((x,y)=>x+y) | {(1,2),(3,10)} |
groupByKey() | 用同一个key将值分组 | rdd.groupByKey() | {(1,[2]),(3,[4,6])} |
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) | 用同一个key将值联合, 但返回不同类型 | ||
mapValues(func) | 将函数应用于每个值, 不改变key. | rdd.mapValues(x=>x+1) | {(1,3),(3,5),(3,7)} |
flatMapValues(func) | 将函数应用于一个返回pair RDD每个值的迭代器, 对每个返回的的值, 用原来的key生成键值对.常用于 tokenization. | rdd.flatMapValues(x=>(x to 5) | {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} |
keys() | 返回 RDD 的键集合 | rdd.keys() | {1,3,3} |
values() | 返回 RDD 的值集合 | rdd.values() | {2,4,6} |
sortByKey() | 返回 RDD, 用键排序. | rdd.sortByKey() | {(1,2),(3,4),(3,6)} |
combineByKey
对每个key求均值 input ={(1, 2), (3, 4), (3, 6)} combineByKey 可以省去map的过程.
scala> val input=sc.parallelize(List((1,2),(3,4),(3,6))
scala> val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
scala> result.collectAsMap().map(println(_))
(1,2.0)
(3,5.0)
说明
-
第一个参数相当于map. (v) => (v, 1), 表示将key都转为计数为1的元组
-
第二个参数 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1) 表示对每一个分区的acc元组的第一个值求和, 第二个计数
- 第三个参数 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) 表示对每个分区的累积元组进行总的加和和计数.
- map操作进行求均值
两个 pair RDD的变换
(rdd = {(1,2),(3,4),(3,6)}, other = {(3,9)})
函数 | 目的 | 示例 | 结果 |
---|---|---|---|
subtractByKey | 将另一个RDD存在的key从本RDD移除. | rdd.subtractByKey(other) | {(1,2)} |
join | 两个 RDD 进行 inner join . | rdd.join(other) | {(3,(4,9)),(3,(6,9))} |
rightOuterJoin | 两个 RDD 进行右连接, key必须在右边的RDD存在 | rdd.rightOuterJoin(other) | {(3,(Some(4),9)),(3,(Some(6),9))} |
leftOuterJoin | 两个 RDD 进行左连接, key必须在左边的RDD存在 | rdd.leftOuterJoin(other) | {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))} |
cogroup | 两个RDD具有同一个键, 则组合成一个组. | rdd.cogroup(other) | {(1,([2],[])),(3,([4,6],[9]))} |
参考
《Learning spark》
如非注明转载, 均为原创. 本站遵循知识共享CC协议,转载请注明来源