Spark RDD算子
图片无法加载请跳转CSDN:Spark RDD算子-CSDN博客
转换算子(Transformation Operators)
类别 | 算子名称 | 简要介绍 |
---|---|---|
映射类算子 | map | 对RDD中的每个元素进行操作,返回一个新的RDD |
flatMap | 类似于map,但每个输入元素可映射到0或多个输出元素 | |
mapPartitions | 对RDD的每个分区中的元素进行操作,返回一个新的RDD | |
mapPartitionsWithIndex | 类似于mapPartitions,但提供了分区索引 | |
mapValues | 对键值对RDD中的每个值进行操作,返回一个新的键值对RDD | |
flatMapValues | 对键值对RDD中的每个值进行操作,每个值可映射到0或多个输出元素 | |
过滤类算子 | filter | 过滤出满足条件的元素,返回一个新的RDD |
distinct | 去除重复元素,返回一个新的RDD | |
分组类算子 | groupBy | 根据指定的函数对RDD中的元素进行分组,返回一个新的RDD |
groupByKey | 对键值对RDD中的值进行分组,返回一个新的键值对RDD | |
reduceByKey | 对具有相同键的值进行合并,返回一个新的键值对RDD | |
aggregateByKey | 使用指定的聚合函数和中间结果进行聚合,返回一个新的键值对RDD | |
combineByKey | 类似于aggregateByKey,但允许更灵活的聚合过程 | |
cogroup | 对两个或多个键值对RDD进行分组,返回一个新的RDD | |
排序类算子 | sortByKey | 根据键对键值对RDD进行排序,返回一个新的键值对RDD |
sortBy | 根据指定的函数对RDD进行排序,返回一个新的RDD | |
repartitionAndSortWithinPartitions | 对RDD进行重新分区并在每个分区内排序,返回一个新的RDD | |
连接类算子 | union | 合并两个RDD,返回一个新的RDD |
intersection | 计算两个RDD的交集,返回一个新的RDD | |
subtract | 从一个RDD中移除另一个RDD中的元素,返回一个新的RDD | |
cartesian | 计算两个RDD的笛卡尔积,返回一个新的RDD | |
join | 根据键连接两个键值对RDD,返回一个新的键值对RDD | |
leftOuterJoin | 左外连接两个键值对RDD,返回一个新的键值对RDD | |
rightOuterJoin | 右外连接两个键值对RDD,返回一个新的键值对RDD | |
fullOuterJoin | 全外连接两个键值对RDD,返回一个新的键值对RDD | |
分区类算子 | sample | 随机采样RDD中的元素,返回一个新的RDD |
randomSplit | 随机划分RDD中的元素,返回一个RDD数组 | |
pipe | 将RDD的每个分区作为外部进程的输入,返回一个新的RDD | |
coalesce | 减少RDD的分区数,返回一个新的RDD | |
repartition | 增加或减少RDD的分区数,并进行数据洗牌,返回一个新的RDD | |
partitionBy | 根据指定的分区器对键值对RDD进行分区,返回一个新的键值对RDD |
行动算子(Action Operators)
类别 | 算子名称 | 简要介绍 |
---|---|---|
聚合类算子 | reduce | 对RDD中的元素进行归约操作,返回一个结果 |
fold | 类似于reduce,但带有初始值 | |
aggregate | 使用指定的聚合函数和中间结果对RDD进行聚合,返回一个结果 | |
计数类算子 | count | 计算RDD中的元素数量,返回一个长整型结果 |
countByKey | 计算每个键的出现次数,返回一个键值对Map | |
countByValue | 计算每个值的出现次数,返回一个值对Map | |
收集类算子 | collect | 将RDD的所有元素收集到驱动程序,返回一个数组 |
collectAsMap | 将键值对RDD的所有元素收集到驱动程序,返回一个Map | |
take | 获取RDD的前n个元素,返回一个列表 | |
takeSample | 随机获取RDD的样本,返回一个列表 | |
takeOrdered | 获取排序后的RDD的前n个元素,返回一个列表 | |
获取类算子 | first | 获取RDD的第一个元素,返回一个元素 |
top | 获取排序后的RDD的前n个元素,返回一个列表 | |
遍历类算子 | foreach | 对RDD中的每个元素执行操作,不返回结果 |
foreachPartition | 对RDD的每个分区中的元素执行操作,不返回结果 | |
保存类算子 | saveAsTextFile | 将RDD保存到文本文件中 |
saveAsSequenceFile | 将RDD保存为序列文件 | |
saveAsObjectFile | 将RDD保存为对象文件 | |
saveAsHadoopFile | 将RDD保存为Hadoop文件 | |
saveAsNewAPIHadoopFile | 将RDD保存为新API的Hadoop文件 | |
saveAsHadoopDataset | 将RDD保存为Hadoop数据集 |
一、转换算子(Transformation Operators)
1、逐条处理
1 |
|
2、扁平化处理
1 |
|
3、分区内逐行处理
【✔ 】:以分区为单位(分区不变)逐行处理数据
map VS mapPartitions
1、数量:前者一进一出IO数量一致,后者多进多出IO数量不一定一致
2、性能:前者多分区逐条处理,后者各分区并行逐条处理更佳,常时间占用内存易导致OOM,内存小不推荐
3、类型:两者入口和出口类型都必须一致,后者进出都必须是迭代器
1
2
3
4
5
6
7
8
9
>// 在同一个 mapPartitions 操作中进行过滤和转换操作,可以减少对数据的多次遍历,从而提高性能。
>mapParitions(
>// ≈ 子查询
>it.filter(...) // 谓词下缀
>)
>// 分别使用 mapPartitions 和 filter 进行转换和过滤操作,增加计算开销。
>mapParitions(...)
>fielter(...) // where
1 |
|
4、转内存数组
分区的数据转为同类型的内存数组,分区不变 rdd:RDD[T]
1 |
|
5、数据过滤
过滤规则 f:T=>Boolean,保留满足条件数据,分区不变,不推荐用
【数据可能倾斜】某些分区的数据量远远超过了其他分区,造成数据分布不均匀
1 |
|
6、数据分组
同键同组同区,同区可多组;打乱shuffle,按f:T=>K规则,分区不变,【数据可能倾斜】
1 |
|
7、数据抽样
函数名:
sample
参数:withReplacement:Boolean
是否有放回抽样fraction:Double
抽样率seed:Long
随机种子,默认为当前时间戳(一般缺省)
无放回抽样:sample(false, 0.4)
=> 抽样40%的数据,40条左右
有放回抽样:sample(true, 0.4)
=> 每条数据被抽取的概率为40% (可能有重复的元素)
1 |
|
8、数据去重
采用该方法去重,数据规模比较大的情况下,数据压力比较大,因为数据需要在不同的分区间比较
一般采用分组的方式,将去重字段作为分组字段,在不同的分区内并行去重
1 |
|
9、数据排序
处理数据f:T=>K,升降序asc: Boolean,分区数numPartitions:Int
默认排序前后分区一致,【有shuffle】,除非重新设定 numPartitions
全局排序,多分区间交换数据,效率较低。优化见 PairRDD
若:K为基本类型,则无需提供第二参数列表中的隐式参数 ord: Ordering[K]
若:K为自定义类型,则必须提供第二参数
1 |
|
sortBy
1 |
|
sortByKey
1 |
|
10、交并补差
多个类型 RDD[T]:纵向
交并差操作:数据类型一致,根据元素 equals 认定是否相同
【自定义类型】:必须重写 equals 方法,因为默认等值判断 == 判断地址
拉链操作:要求分区数和分区内的数据量一致
1 |
|
11、拉链操作
1 |
|
12、键值映射类算子
map
1 |
|
mapPartitions
1 |
|
13、键值分组聚合类
reduceByKey
1 |
|
foldByKey
1 |
|
aggregateByKey
1 |
|
1 |
|
combineByKey
1 |
|
1 |
|
groupByKey
1 |
|
14、关联聚合
groupWith
1 |
|
cogroup
1 |
|
15、关联操作
【关联操作】:1V1
横向,根据键做关联
重载:numPartitions:Int 或 partitioner:Partitioner
1 |
|
二、行动算子(Action Operators)
【返回】所有元素分别在分区间和分区内执行【聚集】操作的结果
reduce & fold 分区内和分区间执行相同操作,且类型与元素类型一致
aggregate 分区内和分区间执行不同操作,且类型与元素类型不一致
1 |
|
1 |
|
持久化至文本文件,重载追加压缩功能
1
2
3
>import org.apache.hadoop.io.compress.{BZip2Codec, SnappyCodec}
>import io.airlift.compress.lzo.LzopCodec
>rdd.saveAsTextFile("out_path",classOf[BZip2Codec])
1 |
|
三、Spark RDD 并行度控制
默认的并行度:200
分区数的体现方式:
分区数
numPartitions
|numSlices: Int
示例:
val rdd = sc.parallelize(data, numPartitions = 5)
分区逻辑
1
partitionIndex = fieldName.hashCode() % numPartitions
扩展随机字段:
0 ~ numPartitions
分区器
partitioner: Partitioner
(针对键值对 RDD)- 默认的分区器:
HashPartitioner
- 默认的分区器:
再分区算子
用于对数据进行重新分配。
- coalesce:
coalesce(numPartitions: Int, shuffle: Boolean)
- 用于减少分区数,通常在不需要洗牌时使用。
- 示例:
val coalescedRDD = rdd.coalesce(2, shuffle = false)
- repartition:
repartition(numPartitions: Int)
- 等同于
coalesce(numPartitions, true)
,用于增加或减少分区数,并进行数据洗牌。 - 示例:
val repartitionedRDD = rdd.repartition(4)
- 等同于