spark rdd 转换和动作

周海汉 2017-07-22
2017-07-22

概述

本文对spark rdd的转换和动作进行总结和实际操作演示. RDD(Resilient Distributed Datasets),弹性分布式数据集, 是spark分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型.即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。 rdd 的分布式,因为rdd支持分区, 自动把一个rdd根据partition分发到n台spark设备进行处理. 这些对用户完全透明. 用户感觉和操作本地数据一样.

rdd通过parallelize和textFile或流来创建. 再通过转换得到新的rdd. 转换的过程不是立即执行, 而是在需要动作action时才开始. 这样方便系统进行自动优化.

rdd操作示例

parallelize一般用于测示创建rdd.

scala> val square = sc.parallelize(List(1,2,3,4))
scala> val sq = square.map(x=>x*x).collect()
sq: Array[Int] = Array(1, 4, 9, 16)


scala> val drink1=sc.parallelize(List("coffee","tea","coffee","panda","monkey"))
scala> val rdd2 = sc.parallelize(List("coffee","money","kitty","猫"))

scala> val r11= rdd1.union(rdd2).collect()
r11: Array[String] = Array(coffee, tea, coffee, panda, monkey, coffee, money, kitty, 猫)

scala> val r12 = rdd1.intersection(rdd2).collect()
r12: Array[String] = Array(coffee)

scala> val r13 = rdd1.subtract(rdd2).collect()
r13: Array[String] = Array(tea, panda, monkey)


scala> val users=sc.parallelize(List("user1","user2"))
scala> val tags = sc.parallelize(List("经济","政治","文化"))

scala> users.cartesian(tags).collect()
res1: Array[(String, String)] = Array((user1,经济), (user1,政治), (user1,文化), (user2,经济), (user2,政治), (user2,文化))


普通RDD操作

转换 Transformation

转换有如下一些种类

  • map(func)
  • filter(func)
  • repartition(numPartitions)
  • flatMap(func)
  • repartitionAndSortWithinPartitions(partitioner)
  • mapPartitions(func)
  • join(otherDataset, [numTasks])
  • mapPartitionsWithIndex(func)
  • cogroup(otherDataset, [numTasks])
  • sample(withReplacement, fraction, seed)
  • cartesian(otherDataset)
  • coalesce(numPartitions)

基础转换

测试用RDD 包含 {1, 2, 3, 3}

函数 目的 示例 结果
map 对RDD每个元素应用函数, 并返回一个RDD结果 rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap 对RDD每个元素应用函数, 并返回一个RDD结果,包含迭代器返回的内容.常用于抽取单词. rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter 返回一个RDD结果, 由通过了filter的元素组成. rdd.filter(x => x != 1) {2, 3, 3}
distinct 移除重复元素 rdd.distinct() {1, 2, 3}
sample(withReplacement, fraction, [seed]) 对 RDD 抽样,withReplacement是指是否有放回的抽样为true为放回,为false为不放回,fraction为抽样占总数据量的比值 rdd.sample(false, 0.5) non-deterministic

两个RDD转换

RDD分别包含 {1, 2, 3} 和 {3, 4, 5}

函数 目的 示例 结果
union() 两个RDD并集. rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() RDD 交集 rdd.intersection(other) {3}  
subtract() 从一个RDD移除全部存在于另一个RDD的元素.如移除训练数据. rdd.subtract(other) {1, 2}  
cartesian() 两个RDD的笛卡尔积, 一个RDD中每个元素和另一个RDD的每个元素两两组合 rdd.cartesian(other) {(1, 3), (1,4), … (3,5)}

普通RDD 动作(Action)

基础动作

RDD 包含 {1, 2, 3, 3}

函数 目的 示例 结果
collect() 返回 RDD 全部元素 rdd.collect() {1, 2, 3, 3}
count() RDD 元素总计 rdd.count() 4
countByValue() RDD 每个元素出现次数总计 rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) 返回 RDD 中num个元素 rdd.take(2) {1, 2}
top(num) 返回 RDD 中top num个元素 rdd.top(2) {3, 3}
takeOrdered(num)(ordering) 返回 RDD 中num个元素, 基于给定排序 rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(withReplacement, num, [seed]) 返回 RDD 中随机num个元素, withReplacement表示抽样是否放回. 放回会有重复 rdd.takeSample(false, 1) non-deterministic
reduce(func) 并发将RDD中的元素联合起来.如加和. rdd.fold((x, y) => x + y) 9
fold(zero)(func) 类reduce函数, 但提供初始值设置. rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) 类reduce函数, 但提供初始值设置. 可以返回不同类型 rdd.aggregate(0, 0) ({case (x, y) => (y._1() + x, y._2() + 1)}, {case (x, y) => (y._1() + x._1(), y._2() + x._2()) }) (9, 4)
foreach(func) 对RDD中每个元素应用函数. rdd.foreach(func)

reduce

对两个元素应用reduce中的函数, 得到一个新的元素, 再用新元素和集合中的元素进行reduce运算. 如+, 求和,计数和其他聚合运算.

scala> val data = sc.parallelize(List(1,2,3,4))
scala> data.collect()
res33: Array[Int] = Array(1, 2, 3, 4)
scala> data.reduce(_ + _)
res35: Int = 10

fold

fold()和reduce类似. 但会对每个分区置初始zero值(+为0,乘为1, 连接列表为空列表).

fold 定义:

def fold(zeroValue: T)(op: (T, T) => T): T
scala> val data = sc.parallelize(List(1,2,3,4))
scala> data.collect()
res33: Array[Int] = Array(1, 2, 3, 4)
scala> data.fold(0)((x,y)=>x+y)
res40: Int = 10
scala> data.fold(1)((x,y)=>x+y)
res41: Int = 15

aggregate

聚合函数可以避免使用map. 和fold差不多, 需要传入一个初始值函数, 再传入一个每个分区的累积函数. 最后传入一个分区之间的累积函数.

下面的代码是求均值. 参数(0,0) 表示总和0,计数0. 参数(acc, value) ,acc表示累计值, value是当前值.其中acc是元组. _1为第一个元素, 为累积值. _2为第二个元素,表示累计计数. 第三个函数(acc1, acc2) 则是各分区的归总combine.

val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
scala> val result = data.aggregate((0,0))( (acc,value)=>(acc._1 + value, acc._2+1),(acc1,acc2)=>(acc1._1+acc2._1, acc1._2 + acc2._2))
result: (Int, Int) = (10,4)

scala> val avg = result._1/result._2
avg: Int = 2

# pairRDD 操作

pairRDD是特殊的RDD, 包含key->value元组键值对的RDD, 有特殊操作.

很多变换会返回pairRDD, 也可以将普通RDD转为pairRDD. 如通过map()函数, 将行的第一个单词作为key,行作为value.

 val pairs = lines.map(x => (x.split(" ")(0), x))

## 单个pairRDD上的变换 rdd={(1, 2), (3, 4), (3, 6)}

函数 目的 示例 结果
reduceByKey(func) 用同一个key将值联合. rdd.reduceByKey((x,y)=>x+y) {(1,2),(3,10)}
groupByKey() 用同一个key将值分组 rdd.groupByKey() {(1,[2]),(3,[4,6])}
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) 用同一个key将值联合, 但返回不同类型    
mapValues(func) 将函数应用于每个值, 不改变key. rdd.mapValues(x=>x+1) {(1,3),(3,5),(3,7)}
flatMapValues(func) 将函数应用于一个返回pair RDD每个值的迭代器, 对每个返回的的值, 用原来的key生成键值对.常用于 tokenization. rdd.flatMapValues(x=>(x to 5) {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}
keys() 返回 RDD 的键集合 rdd.keys() {1,3,3}
values() 返回 RDD 的值集合 rdd.values() {2,4,6}
sortByKey() 返回 RDD, 用键排序. rdd.sortByKey() {(1,2),(3,4),(3,6)}

combineByKey

对每个key求均值 input ={(1, 2), (3, 4), (3, 6)} combineByKey 可以省去map的过程.


 scala> val input=sc.parallelize(List((1,2),(3,4),(3,6))

scala> val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

scala> result.collectAsMap().map(println(_))
(1,2.0)
(3,5.0)

说明

  • 第一个参数相当于map. (v) => (v, 1), 表示将key都转为计数为1的元组

  • 第二个参数 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1) 表示对每一个分区的acc元组的第一个值求和, 第二个计数

  • 第三个参数 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) 表示对每个分区的累积元组进行总的加和和计数.
  • map操作进行求均值

两个 pair RDD的变换

(rdd = {(1,2),(3,4),(3,6)}, other = {(3,9)})

函数 目的 示例 结果
subtractByKey 将另一个RDD存在的key从本RDD移除. rdd.subtractByKey(other) {(1,2)}
join 两个 RDD 进行 inner join . rdd.join(other) {(3,(4,9)),(3,(6,9))}
rightOuterJoin 两个 RDD 进行右连接, key必须在右边的RDD存在 rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))}
leftOuterJoin 两个 RDD 进行左连接, key必须在左边的RDD存在 rdd.leftOuterJoin(other) {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))}
cogroup 两个RDD具有同一个键, 则组合成一个组. rdd.cogroup(other) {(1,([2],[])),(3,([4,6],[9]))}

参考

《Learning spark》


如非注明转载, 均为原创. 本站遵循知识共享CC协议,转载请注明来源