Spark数据处理

字符串函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 提取
// 1、提取 json
json_tuple(jsonCol:Column, fields:String*) // fields => field1,field2
get_json_object(jsonCol:Column, path:String) // path => $.field1[.field2]
// 与 json_tuple 和 get_json_object 对比,前者只能提取字符串,from_json 可以定制匹配类型
val schema: StructType = StructType(Seq(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("isMember", BooleanType),
StructField("salary", FloatType)
))
from_json(json:Column,schema:StructType).as(alia:String) // 将 json 转为结构体
from_json(lit("{\"name\":\"张三\",\"age\":22,\"isMember\":false,\"salary\":3566.38}"),schema).as("stu")

select($"stu.name",$"stu.age",$"stu.isMember",$"stu.salary")
// 2、提取:CSV 与 from_json 共享 schema
val options:Map[String,String] = Map("header" -> "true", "lineSep" -> "\n")
from_csv(lit("henry,22,true,3454.23"),schema,options).as("stu")
select($"stu.name",$"stu.age",$"stu.isMember",$"stu.salary")

// 3、提取:正则分组
regexp_extract(col:Column, pattern:String, groupId:Int)

// 分裂与截取
split(col:Column,pattern:String)
substring(col:Column,pos:Int,len:Int)
substring_index(col:Column,sep:String,groupId:Int)
// groupId +N 从左向右前N个
// groupId -N 从右向左前N个
// 第N个 substring_index(substring_index(COL,SEP,+N),SEP,-1)

// 子字符串在字段中的未知
locate(subStr:String,col:Column) // 有则>0,否则=0
instr(col:Column,subStr:String)

// 字符串拼接
concat(cols:Column*)
concat_ws(sep:String,cols:Column*)

// 内容长度
length(col:Column) // 字符长度
// 字节长度,未提供算子,需要通过 spark.sql(""" select octet_length(...)""") 实现

// 定长填充
lpad(col:Column,len:Int,pad:String)
rpad(col:Column,len:Int,pad:String)

// 清除两端空格
ltrim(col:Column)
rtrim(col:Column)
trim(col:Column)

// 大小写转换
initcap(col:Column) // 每个单词首字母大写
upper(col:Column) // 全大写
lower(col:Column) // 全小写

hash(col:Column) // 去哈希值
regexp_replace(col:Column,pattern:String,replace:String) // 正则替换
translate(col:Column,from:String,to:String) // 按字母转换
reverse(col:Column) // 翻转

// 转码
encode(col:Column, charSet:String)
decode(col:Column, charSet:String)

// 非对称加密
sha1(col:Column)
md5(col:Column)

日期函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
quarter(dateCol:Column)     		// 季
month(dateCol:Column) // 月
dayofweek(dateCol:Column) // 星期几 :周日~周六 1~7
weekofyear(dateCol:Column) // 年周
dayofmonth(dateCol:Column) // 月日
dayofyear(dateCol:Column) // 年日
hour(dateCol:Column) // 时
minute(dateCol:Column) // 分
second(dateCol:Column) // 秒
last_day(dateCol:Column) // 日期当月最后一天

// 【日期计算】
datediff(dateBig:Column,dateSmall:Column) // 两个日期之间天数差
months_between(dateBig:Column,dateSmall:Column) // 两个日期之间月数差
date_add(dateCol:Column,days:Int), // 日期按天计算
add_months(dateCol:Column,months:Int) // 日期按月计算
// 星期格式,英文单词前三个字母,如:MON TUE ...
next_day(dateCol:Column,weekFormat:String) // 下一个星期几(未至本周,已至下周)

current_date() // 当前日期 yyyy-MM-dd
current_timestamp() // 当前日期 yyyy-MM-dd HH:mm:ss
unix_timestamp() // 当前时间戳 1715307735 秒

// 如下操作必须进行 SparkSession 的配置
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

// 【日期转时间戳】十位整数
// 获取系统当前时间戳
// 等同于 unix_timestamp(current_timestamp(), yyyy-MM-dd HH:mm:ss)
unix_timestamp()
// 获取指定日期时间戳
// 等同于 unix_timestamp(2013-07-25 00:00:00, yyyy-MM-dd HH:mm:ss
unix_timestamp(dateCol:Column)
// 获取指定日期指定格式时间戳
// 等同于 unix_timestamp(2013-07-25 00:00:00, yyyy-MM-dd)
unix_timestamp(dateCol:Column,format:String)

// 【日期转时区】
to_utc_timestamp(lit("2013-07-25 00:00:00"),"GMT+8")

// 【时间戳转日期】
// 将指定时间戳转日期:from_unixtime(lit(1715307735L))
from_unixtime(numCol:Column)
// 将指定时间戳按指定格式转日期:from_unixtime(lit(1715307735L),"yyyy-MM-dd")
from_unixtime(numCol:Column, format:String)

// 进保留日期 yyyy-MM-dd
to_date(dateCol:Column)

数学函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
abs(col:Column)							// 绝对值
degrees(col:Column) // 弧度转角度
radians(col:Column) // 角度转弧度
sin(col:Column) // 正弦
cos(col:Column) // 余弦
tan(col:Column) // 正切
asin(col:Column) // 反正弦
acos(col:Column) // 反余弦
atan(col:Column) // 反正切
round(col:Column,n:Int) // 四舍五入
ceil(col:Column) // 向上取整
floor(col:Column) // 向下取整
format_number(col:Column,n:Int) // 数值格式化
pow(col:Column,n:Int) // 幂
log(n:Int,col:Column) // 对数
rand() // 随机数
greatest(col:Column*) // 多列最大值
least(col:Column*) // 多列最小值

聚合函数

1
2
3
4
5
6
7
8
9
count(col:Column)							// 计数
countDistinct(col:Column,cols:Column*) // 去重计数
sum(numCol:Column) // 求和
sumDistinct(numCol:Column) // 去重求和
avg(numCol:Column) // 求均值
max(numCol:Column) // 求最大值
min(numCol:Column) // 求最小值
collect_set(col:Column) // 去重收集
collect_list(col:Column) // 列表收集

集合函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
array
size(collectCol:Column)
array(cols:Column*) // 一行中的多列转为单列数组类型
array_sort(arrayCol:Column)
array_contains(arrayCol:Column,value:Any)
array_distinct(arrayCol:Column)
array_join(arrayCol:Column,sep:String,nullReplacement:String)

array_except(arrayCol:Column)
array_intersect(arrayCol:Column)
array_union(arrayCol:Column)
map
map_keys(mapCol:Column)
map_values(mapCol:Column)
map_entries(mapCol:Column)

spark 不支持 grouping sets API

1
2
3
4
5
6
7
8
9
10
11
12
13
>// 可以借助 spark.sql(sql:String) 来实现
spark.sql("""
select grouping__id,a,b,c,count(*) as cnt from T
group by a,b,c grouping sets(a,b,c,(a,c))
""")

spark.
//.cube($"gender",$"birthYear")
.rollup($"gender",$"birthYear")
.agg(
grouping_id().as("gid"), // grouping_id() 作为不同分组的识别号
count("*").as("cnt")
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
import spark.implicits._

val dfCustomer =
spark.sparkContext.textFile("hdfs://single:9000/spark/practice_01/customers", 4)
.mapPartitions(_.map(_.toCustomer))
.toDF()

/*
【基本查询】
select
col,cols*,agg*
where
conditionCols
group by
col,cols*
having
condition
order by
col asc|desc
limit
n
*/
// 查询 select :$"colName" = col("colName"),支持各种函数对字段处理
val dfSel: DataFrame = df.select(cols:Column*)
val dfSel: DataFrame = dfCustomer.select($"cus_lname", $"cus_city", $"cus_state") // ✔

// 条件筛选 where
/*
newCol:Column = $"cus_state".isNull
newCol:Column = $"cus_state".isNaN
newCol:Column = $"cus_state".isNotNull

newCol:Column = $"cus_state".gt(10) <=> $"cus_state">10
newCol:Column = $"cus_state".geq(10) <=> $"cus_state">=10
newCol:Column = $"cus_state".lt(10) <=> $"cus_state"<10
newCol:Column = $"cus_state".leq(10) <=> $"cus_state"<=10
newCol:Column = $"cus_state".equalTo(10) <=> $"cus_state"===10
newCol:Column = $"cus_state".notEqual(10) <=> $"cus_state"=!=10
newCol:Column = $"cus_state".between(10,20)

newCol:Column = $"cus_state".like("张%")
newCol:Column = $"cus_state".rlike("\\d+")

newCol:Column = $"cus_state".isin(list:Any*)
newCol:Column = $"cus_state".isInCollection(values:Itrable[_])

多条件:
newCol:Column = ColOne and ColTwo 与
newCol:Column = ColOne or ColTwo 或
newCol:Column = not(Column) 非

hive 中的所有函数如:
newCol:Column = length($"cus_lname").gt(6)
*/
val dfWhere: DataFrame = df.where(condition:Column)
val dfWhere: DataFrame = dfSel.where(length($"cus_lname").gt(6)) // ✔
val rddWhere: Dataset[Row] = dfCustomer.where( // ✔
$"cus_state".isin("TX", "PR") and
length($"cus_lname").gt(6)
)

// grouping sets & rollup & cube 为了数仓设计时减少表的数量,将单表或视图中多个维度聚合的结果存放在一张表中,建议做 grouping__id与聚合维度的字典,并根据 grouping__id 做分区。
// hive 中语法形式 select ... from ... group by f1,...,fn grouping_sets(g1,...,gn),注意多个字段独立分组,需要添加 (f1,f2,f3),自定义多分组时 grouping__id 并不连号。
// hive 中语法形式 select ... from ... group by rollup/cube(f1,...,fn),注意多个字段独立分组,需要添加 (f1,f2,f3)
// 分组 group by
val dfGroupBy: RelationalGroupedDataset = df.groupBy(cols:Column*)
val dfGroupBy: RelationalGroupedDataset = df.rollup(cols:Column*)
val dfGroupBy: RelationalGroupedDataset = df.cube(cols:Column*)
val dfGroupBy: RelationalGroupedDataset = dfWhere.groupBy($"cus_state") // ✔

// 聚合 aggregate
val dfAgg: Dataset[Row] = dfGroupBy.agg(grouping_id().as("gid"),...);
val dfAgg: Dataset[Row] = df.agg(expr:Column,exprs:Column*)
val dfAgg: Dataset[Row] = dfGroupBy
.agg(count($"cus_id").as("cus_count")) // ✔

// 二次筛选 having = where
val dfHaving: Dataset[Row] = dfAgg
.groupBy($"cus_state")
.agg(count($"cus_id").as("cus_count"))
.where($"cus_count".geq(100)) // ✔

// 排序
/*
原始字段或聚合字段:
$"field".asc
$"field".asc
原始字段存在空值情况
$"field".asc_nulls_first
$"field".asc_nulls_last
$"field".desc_nulls_first
$"field".desc_nulls_last
*/
val dfOrderBy: Dataset[Row] = df.orderBy(cols:Column*)
val dfOrderBy: Dataset[Row] = dfHaving.orderBy($"cus_count".asc)

// 限制 limit
val dfOrderBy: Dataset[Row] = df.limit(n:Int)
val dfOrderBy: Dataset[Row] = dfOrderBy.limit(10)

/*
【关联查询】
指定连接类型:JoinType:
inner 内部连接:【交集】 缺省默认 ✔
outer = full = fullouter 全外连接:【双全集】
left = leftouter 左外连接:【左全集】:先返回两表笛卡尔积,再进行ON条件筛选
若右表的数据也需要提取,则只能使用left
♥ semi = leftsemi 左外连接:【左交集】:右表只做存在性检查,代替子查询in
只返回左表在右表中有匹配记录的数据,不提取右表数据
♥ anti = leftanti 左外连接:【左差集】:右表只做存在性检查,代替子查询not in
right = rightouter 右外连接:【右全集】
♥ cross 默认:交叉连接:【笛卡尔积】

♥ 潜在的【using】:若join操作采用如下语法【不提供joinType:inner】且【关联字段在两张表中同名】
join(dataset:DataSet[_],oneCol:String)
join(dataset:DataSet[_],cols:Seq[String])
实际的运行结果,两张表中会执行【同名列去重】
*/
val dfJoin: DataSet[Row] = df.join(other:DataSet[_],joinExprs:Column, joinType:String)
val df1: DataFrame = spark.createDataFrame(Seq((1, "henry"), (2, "ariel")))
.toDF("id", "name")
val df2: DataFrame = spark.createDataFrame(Seq((1, 2600), (3, 8800)))
.toDF("id", "salary")

df1.join(df2,Seq("id"),"left|semi|anti|right|full|cross")
.show()

// left
+---+-----+------+
| id| name|salary|
+---+-----+------+
| 1|henry| 2600|
| 2|ariel| null|
+---+-----+------+

// semi
+---+-----+
| id| name|
+---+-----+
| 1|henry|
+---+-----+

// anti
+---+-----+
| id| name|
+---+-----+
| 2|ariel|
+---+-----+

// right
+---+-----+------+
| id| name|salary|
+---+-----+------+
| 1|henry| 2600|
| 3| null| 8800|
+---+-----+------+

// full
+---+-----+------+
| id| name|salary|
+---+-----+------+
| 1|henry| 2600|
| 3| null| 8800|
| 2|ariel| null|
+---+-----+------+

// cross : 不能有关联字段
df1
.crossJoin(df2) // ✔ 方法一
.join(df2,Seq(),"cross") // ✔ 方法二
.join(df2) // ✔ 方法三
.show()
// ✔ 方法三
df1.createTempView("ta")
df2.createTempView("tb")
spark.sql(
"""
|select ta.id,ta.name,tb.salary
|from ta
|cross join tb
|""".stripMargin).show()
+---+-----+---+------+
| id| name| id|salary|
+---+-----+---+------+
| 1|henry| 1| 2600|
| 1|henry| 3| 8800|
| 2|ariel| 1| 2600|
| 2|ariel| 3| 8800|
+---+-----+---+------+

val dfJoin: DataFrame = dfCustomer
.select($"cus_state", $"cus_city", $"cus_id").as("C")
.join(
dfOrder.select($"or_customer_id").as("O"),
$"C.cus_id" === $"O.or_customer_id",
"left"
)
+---------+--------+------+--------------+-----+
|cus_state|cus_city|cus_id|or_customer_id|or_id|
+---------+--------+------+--------------+-----+
| PR| Caguas| 148| 148|15061|
| PR| Caguas| 471| 471|23614|
| PR| Caguas| 471| 471|55431|
| PR| Caguas| 471| 471|64885|
| MA| Revere| 496| 496| 8745|
| MA| Revere| 496| 496|22708|
| IA| Dubuque| 833| 833| 857|
+---------+--------+------+--------------+-----+

val dfJoin: DataFrame = dfCustomer
.select($"cus_state", $"cus_city", $"cus_id").as("C")
.join(
dfOrder.select($"or_customer_id".as("cus_id"),$"or_id").as("O"),
"cus_id" // 默认连接类型:inner,采用 using 优化, 相同字段只显示一个
)
+------+---------+--------+-----+
|cus_id|cus_state|cus_city|or_id|
+------+---------+--------+-----+
| 148| PR| Caguas|15061|
| 148| PR| Caguas|59569|
| 471| PR| Caguas|64885|
| 496| MA| Revere| 8745|
| 496| MA| Revere|61261|
| 833| IA| Dubuque| 857|
+------+---------+--------+-----+

val dfJoinAgg: DataFrame = dfJOin
.groupBy($"C.cus_state", $"C.cus_city")
.agg(count($"C.cus_id").as("order_count"))
.orderBy($"order_count".asc)
+---------+-------------+-----------+
|cus_state| cus_city|order_count|
+---------+-------------+-----------+
| PR| Ponce| 7|
| MA| Malden| 9|
| NJ| Freehold| 10|
| WA| Sumner| 10|
| CA|National City| 10|
| NV| Reno| 11|
| MA| Taunton| 11|
| IL| Bartlett| 12|
| MD| Gwynn Oak| 12|
| AL| Birmingham| 13|
+---------+-------------+-----------+

val dfJoinAgg: DataFrame = dfJOin
.rollup($"C.cus_state", $"C.cus_city")
.agg(count($"C.cus_id").as("order_count"))
+---------+---------------+-----------+
|cus_state| cus_city|order_count|
+---------+---------------+-----------+
| null| null| 68883|
| AZ| null| 1156|
| AL| null| 13|
| PA| null| 1458|
| IL| Carol Stream| 66|
| HI| Mililani| 71|
| NC| Fayetteville| 72|
+---------+---------------+-----------+

val dfJoinAgg: DataFrame = dfJOin
.cube($"C.cus_state", $"C.cus_city")
.agg(count($"C.cus_id").as("order_count"))
+---------+--------------+------------+
|cus_state| cus_city| order_count|
+---------+--------------+------------+
| null| null| 68883|
| null|Mount Prospect| 50|
| null| San Marcos| 89|
| null| Rochester| 63|
| AZ| null| 1156|
| AL| null| 13|
| PA| null| 1458|
| IL| Carol Stream| 66|
| HI| Mililani| 71|
| NC| Fayetteville| 72|
+---------+--------------+------------+

// 数据提取
DataSet[Row] dataSet = df.limit(n:Int)
Array[Row] rows = df.collect()
Array[Row] rows = df.tail(n:Int)
Array[Row] rows = df.take(n:Int)
// 数据展示
df.show([n:Int])
SQL函数:返回Column
常用函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$"NAME" = col("NAME")						// 取列值
as("ALIAS_NAME") // 别名
as(alias:Seq[String]) // 多个别名 pos_explode(Array), explode(Map)
when(CONDITION,V1)....otherwise(VN) // 条件
when($"score"<60,"D")
.when($"score"<80,"C")
.when($"score"<90,"B")
.otherwise("A")
.as("score_level")
lit(VALUE) // 常量列
5 as level
lit(5).as("new_const_col")
withColumn(colName:String, col:Column) // 扩展列
// 通常用于使用【窗口函数】扩展表的列
cast(DataType) // 类型转换
常用函数案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
val frm: DataFrame = spark.createDataFrame(Seq(
Test(1, Array("money", "freedom"), Map("java"->85,"mysql"->67)),
Test(2, Array("beauty", "writing"), Map("java"->72,"mysql"->90)),
Test(3, Array("sports"), Map("java"->76, "html"->52))
))

// 👇 一个select子句中只能出现一个 explode 或 pos_explode
val frmHob = frm: DataFrame
.select($"id",explode($"hobbies").as("hobby"),$"scores")


frmHob
.select($"id",$"hobby",explode($"scores").as(Seq("subject","score")))
+---+-------+-------+-----+
| id| hobby|subject|score|
+---+-------+-------+-----+
| 1| money| java| 85|
| 1| money| mysql| 67|
| 1|freedom| java| 85|
| 1|freedom| mysql| 67|
| 2| beauty| java| 72|
| 2| beauty| mysql| 90|
| 2|writing| java| 72|
| 2|writing| mysql| 90|
| 3| sports| java| 76|
| 3| sports| html| 52|
+---+-------+-------+-----+
分析函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 分析函数
// 方差:每个样本值与全体样本值的平均数之差的平方值的平均数,反应数据的偏离程度
var_samp(numCol:Column) // 样本方差
var_pop(numCol:Column) // 方差
// 标准差:方差的算术平方根,统计分布程度上的测量依据,也能反应数据的偏离程度
stddev_samp(numCol:Column) // 标准样本差
stddev_pop(numCol:Column) // 标准差
// 协方差:两个变量之间的线性关系,两个变量在方向和幅度上的一致性
/*
如果两个变量的变化趋势一致:
两者都大于自身的期望值时,协方差就是正值
一个大于自身期望值,另一个小于自身期望值,协方差就是负值
如果协方差为正,则表明X和Y同向变化,反之,协方差为负则表明反向变化;
协方差的绝对值越大,表示同向或反向的程度越深
*/
covar_pop(numCol1:Column,numCol2:Column) // 协方差
covar_samp(numCol1:Column,numCol2:Column) // 样本协方差
窗口函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// over 从句,配合窗口函数或聚合函数使用
val specFull: WindowSpec = Window // 全局窗口:unbounded preceding ~ following
.partitionBy(col:Column)
val specTop2Curr: WindowSpec = specFull // 排序窗口: unbounded preceding ~ current
.orderBy(col:Column)
val specThree: WindowSpec = specTop2Curr // 粒度窗口:start ~ current ~ stop
.rowsBetween(start:Int, stop:Int) // rowsBetween 物理便宜 rangeBetween 逻辑偏移

// 窗口函数
first(col:Column).over(specFull:WindowSpec) // 窗口内首行
lag(col:Column,offset:Int).over(specOrderBy:WindowSpec) // curr 上 offset 行
lead(col:Column,offset:Int).over(specOrderBy:WindowSpec) // curr 下 offset 行
last(col:Column).over(specFull) // 窗口内末行
nth_value(col:Column,nth:Int).over(specOrderBy:WindowSpec) // 窗口第 n 行
dense_rank().over(specOrderBy:WindowSpec) // 排名
percent_rank().over(specOrderBy:WindowSpec) // 百分比排名
row_number().over(specOrderBy:WindowSpec) // 行号
ntile(n:Int).over(specOrderBy:WindowSpec) // 切片(抽样)
cume_dist().over(specOrderBy:WindowSpec) // 分布

Spark数据处理
https://leaf-domain.gitee.io/2025/03/22/bigdata/spark/Spark数据处理/
作者
叶域
发布于
2025年3月22日
许可协议