Spark Sql自定义UDF函数进行字段加密解密

Spark Sql自定义UDF函数进行字段加密解密

一、函数实现

建议将自定义函数实现,单独建对象保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import java.nio.charset.StandardCharsets
import java.util.Base64
import javax.crypto.Cipher
import javax.crypto.spec.SecretKeySpec

/**
* 提供基于AES算法的加密和解密工具类。
*/
object SparkUtil {

/**
* 根据提供的密钥字符串初始化SecretKeySpec对象。
* 密钥长度必须是16(AES-128)、24(AES-192)或32(AES-256)字节。
*
* @param secret AES加密算法的密钥字符串
* @return 初始化后的SecretKeySpec对象
* @throws RuntimeException 如果密钥长度不符合要求
*/
private def secretInit(secret: String): SecretKeySpec = {
// 定义一个数组,包含AES密钥允许的长度(以字节为单位)。
val allowNumBits: Array[Int] = Array(16, 24, 32)
// 检查提供的密钥长度是否在允许的范围内。
if (allowNumBits.contains(secret.length)) {
// 如果密钥长度有效,则使用UTF-8编码将密钥字符串转换为字节数组,并创建`SecretKeySpec`对象。
new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "AES")
} else {
// 如果密钥长度无效,则抛出一个RuntimeException异常,指出密钥长度不符合要求。
throw new RuntimeException(s"AES secret size of numBits ${secret.length} not in permitted values (${allowNumBits.mkString(",")})")
}
}

/**
* 使用AES算法加密字符串。
*
* @param src 需要加密的原始字符串
* @param secret AES加密算法的密钥字符串
* @return 加密后的Base64编码字符串
*/
def encrypt(src: String, secret: String): String = {
// 获取Cipher实例,指定使用AES算法。这里的"AES"默认使用AES/ECB/PKCS5Padding模式,但具体行为可能依赖于Java加密扩展(JCE)的实现。
val cipher: Cipher = Cipher.getInstance("AES")
// 使用提供的密钥初始化Cipher实例为加密模式。这里调用了secretInit方法来验证密钥长度并生成SecretKeySpec对象。
cipher.init(Cipher.ENCRYPT_MODE, secretInit(secret))
// 将待加密的字符串转换为UTF-8编码的字节数组。
val bytesToEncrypt: Array[Byte] = src.getBytes(StandardCharsets.UTF_8)
// 使用Cipher实例对字节数组进行加密,得到加密后的字节数组。
val encryptedBytes: Array[Byte] = cipher.doFinal(bytesToEncrypt)
// 将加密后的字节数组编码为Base64格式的字符串,以便于存储或传输。
Base64.getEncoder.encodeToString(encryptedBytes)
}

/**
* 使用AES算法解密字符串。
*
* @param dest 需要解密的Base64编码字符串
* @param secret AES加密算法的密钥字符串
* @return 解密后的原始字符串
*/
def encrypt(src: String, secret: String): String = {
// 获取Cipher实例,指定使用AES算法
val cipher: Cipher = Cipher.getInstance("AES")
// 初始化Cipher为加密模式,并传入通过secretInit方法生成的SecretKeySpec对象
cipher.init(Cipher.ENCRYPT_MODE, secretInit(secret))
// 对原始字符串进行加密,得到加密后的字节数据
val bytes: Array[Byte] = cipher.doFinal(src.getBytes(StandardCharsets.UTF_8))
// 将加密后的字节数据编码为Base64格式的字符串
Base64.getEncoder.encodeToString(bytes)
}

}

二、导入对象实现的方法,并在 SparkSession 中注册

在 Spark 环境下导入对象实现的方法,并在 SparkSession 中注册 UDF 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import core.SparkUtil.{encrypt, decrypt}  

// 注册加密UDF
spark.udf.register(
"aes_encrypt",
(src: String, secret: String) => encrypt(src, secret),
StringType
)

// 注册解密UDF
spark.udf.register(
"aes_decrypt",
(src: String, secret: String) => decrypt(src, secret),
StringType
)

三、在 Spark Sql 中调用注册函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
val frm: DataFrame = spark.createDataFrame(Seq(
Test(1,Array("money","freedom"),Map("java"->85,"mysql"->67)),
Test(2,Array("beauty","beauty"),Map("java"->72,"mysql"->90)),
Test(3,Array("sports","beauty"),Map("java"->76,"html"->52))
))

val secret = "helloSparkSqlUTF"

val frmEncrypt: DataFrame = frm
.select($"id",
callUDF(
"aes_encrypt",
array_join($"hobbies", ","),
lit(secret)
).as("encrypted_hobbies")
)

frmEncrypt.show()

frmEncrypt
.select($"id",
split(
callUDF(
"aes_decrypt",
$"encrypted_hobbies",
lit(secret)
),
","
).as("hobbies")
).show()

四、结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+---+--------------------+
| id| encrypted_hobbies|
+---+--------------------+
| 1|LWvLzxr2cD8G5Yp14...|
| 2|2yv84k+T/22B2Bu5y...|
| 3|Tw3SptnDTE4sh1sgn...|
+---+--------------------+

+---+----------------+
| id| hobbies|
+---+----------------+
| 1|[money, freedom]|
| 2|[beauty, beauty]|
| 3|[sports, beauty]|
+---+----------------+

Spark Sql自定义UDF函数进行字段加密解密
https://leaf-domain.gitee.io/2025/03/22/bigdata/spark/Spark Sql自定义UDF函数进行字段加密解密/
作者
叶域
发布于
2025年3月22日
许可协议