Flink-SQL

Flink 具有两个关系型 API:Table API 与 SQL。

Table API 是 Scala 和 Java 语言集成查询的 API,可以非常直观的方式组合来自关系算子的查询。例如, 选择、过滤和连接。

Flink SQL 支持基于实现 SQL 标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop.version>3.1.3</hadoop.version>
<flink.version>1.13.6</flink.version>
<flink.table.version>1.7.2</flink.table.version>
<flink.kafka.version>1.11.2</flink.kafka.version>
<mysql.version>5.1.48</mysql.version>
<fastjson.version>1.2.76</fastjson.version>
<play.json.version>2.9.2</play.json.version>
<kafka.version>2.0.0</kafka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

创建 TableEnvironment

方法一:静态创建

1
2
3
4
5
6
7
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tabEnv: TableEnvironment = TableEnvironment.create(settings)

方法二:通过 Stream

1
2
3
4
5
6
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

val see: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val tabEnv: TableEnvironment =
StreamTableEnvironment.create(see)

数据装载

方法一:内存数据模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
val dataType: DataType = DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("gender", DataTypes.STRING())
)

val table: Table = tabEnv.fromValues(
dataType,
row(1, "henry1", 22, "男"),
row(2, "henry2", 28, "女"),
row(3, "henry3", 31, "女"),
row(4, "henry4", 26, "男"),
row(5, "henry5", 18, "男")
)

table.printSchema()
/*
(
`id` INT,
`name` STRING,
`age` INT,
`gender` STRING
)
*/

// 已经过期
//tabEnv.registerTable("stu",table)
tabEnv.createTemporaryView("stu",table)


方法二:数据源 Connector

0.1、数据类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
-- 一、原子数据类型

--字符串类型:定长字符串,n代表字符的定长,取值范围为[1, 2147483647]。如果不指定n,则默认为1。
CHAR、👉 CHAR(n)

--可变长字符串,n代表字符的最大长度,取值范围为[1, 2147483647]。如果不指定n,则默认为1。STRING等同于VARCHAR(2147483647)。
VARCHARVARCHAR(n)、👉 STRING

--二进制字符串类型:定长二进制字符串,n代表定长,取值范围为[1, 2147483647]。如果不指定n,则默认为1。
BINARYBINARY(n):

--可变长二进制字符串,n代表字符的最大长度,取值范围为[1, 2147483647]。如果不指定n,则默认为1。BYTES等同于VARBINARY(2147483647)。
VARBINARYVARBINARY(n)、BYTES

--精确数值类型:固定长度和精度的数值类型,p代表数值位数(长度),取值范围为[1, 38];s代表小数点后的位数(精度),取值范围为[0, p]。如果不指定,p默认为10,s默认为0。
👉 DECIMAL(p, s)、NUMERIC(p, s):

--有损精度数值类型
TINYINT -- -128到127的1字节大小的有符号整数。
SMALLINT -- -32768到32767的2字节大小的有符号整数。
👉 INTINTEGER -- -2147483648到2147483647的4字节大小的有符号整数。
👉 BIGINT -- -9223372036854775808到9223372036854775807的8字节大小的有符号整数。

--浮点类型
FLOAT --4字节大小的单精度浮点数值。
DOUBLEDOUBLE PRECISION --8字节大小的双精度浮点数值。

--布尔类型
👉 BOOLEAN

--日期、时间类型
👉 DATE --由年-月-日组成的不带时区含义的日期类型,取值范围为[0000-01-01, 9999-12-31]。
👉 TIMETIME(p) --由小时:分钟:秒[.小数秒]组成的不带时区含义的时间数据类型,精度高达纳秒,取值范围为[00:00:00.000000000, 23:59:59.9999999]
👉TIMESTAMPTIMESTAMP(p)、TIMESTAMP WITHOUT TIME ZONE、TIMESTAMP(p) WITHOUT TIME ZONE -- 由年-月-日 小时:分钟:秒[.小数秒]组成的不带时区含义的时间类型,取值范围为[0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中p代表小数秒的位数,取值范围为[0, 9],如果不指定p,默认为6。
TIMESTAMP WITH TIME ZONE、TIMESTAMP§ WITH TIME ZONE --由年-月-日 小时:分钟:秒[.小数秒] 时区组成的带时区含义的时间类型,取值范围为[0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中p代表小数秒的位数,取值范围为[0, 9],如果不指定p,默认为6。
TIMESTAMP_LTZ、TIMESTAMP_LTZ§ --与TIMESTAMP WITH TIME ZONE类似,但时区信息不是携带在数据中的,而是由Flink SQL任务的全局配置决定的。
-- 二、复合数据类型
ARRAY --数组类型,类似于Java的array。
MULTISET --集合类型,类似于Java的List。
ROW --对象类型,可以包含多个字段,每个字段有自己的类型和名称,类似于Java的Object或Scala的Case Class。
MAP --映射类型,包含键值对,键和值都可以是任意类型。
1、文件 : FileSystem SQL Connector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING,
...
part_name1 INT,
part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///path/to/whatever', -- required: path to a directory
'format' = '...', -- required: file system specify a format,
-- Please refer to Table Formats
-- section for more details
'partition.default-name' = '...', -- optional: default dynamic partition
-- column value is null/empty string
-- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly reduce the number of file for filesystem sink but may lead data skew, the default value is false.
'sink.shuffle-by-partition.enable' = '...',
...
)
CSV

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>

案例

1
2
3
4
5
1,henry1,22,男
2,henry2,28,女
3,henry3,31,女
4,henry4,26,男
5,henry5,18,男
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
tabEnv.executeSql(
"""
|create table stu_csv(
| id int,
| name string,
| age int,
| gender string
|) with (
| 'connector'='filesystem',
| 'path'='file:///E:\BaiduSyncdisk\bigdata\projects\2_datawarehouse_live\livestock\livewarehouse\files\stu_csv.csv',
| 'format'='csv',
| 'csv.filed-delimiter'=',',
| 'csv.ignore-parse-errors'='true'
|)
|""".stripMargin)

tabEnv.from("stu_csv")
.select($"*")
.execute()
.print()
JSON

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>

案例

1
2
3
4
{"id":1,"name":"henry1","age":22,"gender":"男"}
{"id":2,"name":"henry2","age":18,"gender":"女"}
{"id":3,"name":"henry3","age":27,"gender":"男"}
{"id":4,"name":"henry4","age":19,"gender":"男"}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
tabEnv.executeSql(
"""
|create table stu_json(
| id int,
| name string,
| age int,
| gender string
|) with (
| 'connector'='filesystem',
| 'path'='file:///E:\BaiduSyncdisk\bigdata\projects\2_datawarehouse_live\livestock\livewarehouse\files\stu_csv.csv',
| 'format'='json',
| 'json.ignore-parse-errors'='true'
|)
|""".stripMargin)

tabEnv.from("stu_json")
.select($"*")
.execute()
.print()
2、消息队列
依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
kafka : source

主要用于数据的实时传输和存储,适用于需要高效数据传输的场景‌

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tabEnv.executeSql(
"""
|create table student(
| id int,
| name string,
| age int,
| gender string
|) with (
| 'connector'='kafka',
| 'topic'='flink_kafka_student',
| 'properties.bootstrap.servers'='single:9092',
| 'properties.group.id'='fks_consume',
| 'scan.startup.mode'='earliest-offset',
| 'format'='csv',
| 'csv.field-delimiter'=',',
| 'csv.ignore-parse-errors'='true'
|)
|""".stripMargin)

tabEnv.sqlQuery(
"""
| select * from student
|""".stripMargin)
.execute()
.print()
upsert kafka : source and sink

作为source时,Upsert Kafka Connector生产changelog流,每条数据记录代表一个更新或删除事件。
作为sink时,它将INSERT/UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。
这个连接器特别适用于需要更新或删除操作的数据处理场景‌

依赖同 kafka

查看磁盘空间

1
2
3
4
5
# 以磁盘分区GB为单位显示所有挂载点的磁盘空间使用情况
df -h --block-size=G

# 查看特定目录所占用的磁盘空间
du -sh --block-size=G /PATH

java 数据合并

1
2
3
4
// 出现异常:java.lang.OutOfMemoryError: GC overhead limit exceeded
// 因为需要创建4000多万个对象,内存来不及回收
// 我电脑内存34G,尝试用如下方案优化
-Xms16g -Xmx16g -Xss1g -XX:-UseGCOverheadLimit

启动 kafka

1
2
zkServer.sh start
kafka-server-start.sh -daemon /opt/software/kafka_2.12-2.8.0/config/server.properties

flume 采集数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# hdfs sink
a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/project_ebs/act_log_extract/flume_config/position-file/transaction_pos.log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/ebs_act_log/transaction_log/part-.*
a1.sources.r1.fileHeader = false

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/project_ebs/act_log_extract/flume_config/channel-checkpoint
a1.channels.c1.dataDirs = /root/project_ebs/act_log_extract/flume_config/channel-data

a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.fileType = DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.path = hdfs://single:9000/external_ebs/transaction
a1.sinks.s1.hdfs.filePrefix = event-
a1.sinks.s1.hdfs.fileSuffix = .json
a1.sinks.s1.hdfs.rollInterval = 180
a1.sinks.s1.hdfs.rollSize = 134217728
a1.sinks.s1.hdfs.rollCount = 0

a1.sinks.s1.channel = c1
a1.sources.r1.channels = c1


# hive sink
a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/project_events/tail_dir_log/tail_pos.log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/project_events/store/.*.csv
a1.sources.r1.fileHeader = true
a1.sources.r1.headers.f1.headerKey1 = store

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/project_events/checkpoint
a1.channels.c1.dataDirs = /root/project_events/file_channel_data

a1.sinks.s1.type = hive
a1.sinks.s1.hive.metastore = thrift://master01:9083
a1.sinks.s1.hive.database = ods_eb
a1.sinks.s1.hive.table = ods_store_review
#a1.sinks.s1.hive.partition = 2018
a1.sinks.s1.useLocalTimeStamp = false
a1.sinks.s1.round = true
a1.sinks.s1.roundValue = 10
a1.sinks.s1.roundUnit = minute
a1.sinks.s1.serializer = DELIMITED
a1.sinks.s1.serializer.delimiter = ","
a1.sinks.s1.serializer.fieldnames = transaction_id,store_id,review_score

a1.sinks.s1.channel = c1
a1.sources.r1.channels = c1
3、数据库
HBase

依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

案例

虚拟机:single
数据表:yb12211:for_mysql_import
列族:base
字段:rowkey, name, age, gender, phone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tabEnv.executeSql(
"""
|create table student(
| id int,
| base ROW<test_age int, test_gender string, test_name string, test_phone string>,
| primary key(id) not enforced
|) with (
| 'connector'='hbase-2.2',
| 'table-name'='yb12211:for_mysql_import',
| 'zookeeper.quorum'='single:2181'
|)
|""".stripMargin)

tabEnv.sqlQuery(
"""
| select
| id,
| base.test_age,
| base.test_gender,
| base.test_name,
| base.test_phone
| from student
|""".stripMargin)
.execute()
.print()
JDBC

依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>

案例

虚拟机:single
数据库:test_only
数据表:customer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
tabEnv.executeSql(
"""
| create table student(
| customer_id int,
| gender string,
| customer_name string,
| email string,
| address string,
| country string,
| `language` string,
| job_title string,
| job string,
| credit_type string,
| credit_no string,
| primary key(customer_id) not enforced
|) with (
| 'connector'='jdbc',
| 'url'='jdbc:mysql://single:3306/test_only',
| 'driver'='com.mysql.cj.jdbc.Driver',
| 'table-name'='customer',
| 'username'='henry',
| 'password'='ybremote'
|)
|""".stripMargin)

tabEnv.sqlQuery(
"""
| select * from student
|""".stripMargin)
.execute()
.print()
4、数据生成:用于性能测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
tabEnv.executeSql(
"""
|create table stu_csv(
| id int,
| name string,
| age int,
| gender int
|) with (
| 'connector'='datagen',
| 'number-of-rows'='100',
| 'fields.id.start'='1',
| 'fields.id.end'='100',
| 'fields.id.kind'='sequence',
| 'fields.name.length'='5',
| 'fields.name.kind'='random',
| 'fields.age.min'='18',
| 'fields.age.max'='24',
| 'fields.age.kind'='random',
| 'fields.gender.min'='0',
| 'fields.gender.max'='1',
| 'fields.gender.kind'='random'
|)
|""".stripMargin)

tabEnv.from("stu_csv")
.select($"*")
.execute()
.print()

窗口函数

流数据是持续实时产生的,不可能等所有数据收集齐才计算,通常也不会每条都计算。可以通过水位线和窗口的方式将数据按时间分段,逐段处理。

建表时指定水位线

1
2
3
4
5
6
7
8
CREATE TABLE table_name(
...,
time_field timestamp(3),
WATERMARK for time_field as time_field - 'N' seconds|minutes|...
) WITH (
connector='kafka',
...
)

查询时开窗

group window

在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组 “分组窗口”(Group Window)函数。

滚动窗口:TUMBLE(time_attr, interval)

​ 窗口上边界:TUMBLE_START(time_field, INTERVAL ‘N’ seconds|minutes|…)
​ 窗口下边界:TUMBLE_END(time_field, INTERVAL ‘N’ seconds|minutes|…)
​ EventTime窗口结束时间(不含边界):TUMBLE_ROWTIME
​ ProcessTime窗口结束时间(不含边界):TUMBLE_PROCTIME

滑动窗口:HOP(time_attr, interval, interval)
会话窗口:SESSION(time_attr, interval)

1
2
3
4
5
select
...,
TUMBLE_START(time_field, INTERVAL 'N' seconds|minutes|...) as window_start
from table_name
group by ..., TUMBLE(time_field, INTERVAL 'N' seconds|minutes|...)
tvf window

从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数。

目前 Flink 提供了以下几个窗口 TVF:

Tumble Windows
Hop Windows
Cumulate Windows
Session Windows

窗口表值函数可以完全替代传统的分组窗口函数。窗口 TVF 更符合 SQL 标准,性能得到了优化,拥有更强大的功能;可以支持基于窗口的复杂计算,例如窗口 Top-N、窗口联结(window join)等等。

在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:

“窗口起始点”(window_start)
“窗口结束点”(window_end)
“窗口时间”(window_time) <=> window_end - 1ms

在 SQL 中的声明方式,与以前的分组窗口是类似的,直接调用 TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。

滚动窗口

与DataStream API 中的定义一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。

TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

1
2
3
SELECT * FROM TABLE(
TUMBLE(TABLE table_name, DESCRIPTOR(time_field), INTERVAL 'N' SECONDS|MINUTES|...)
);
滑动窗口

滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数。

HOP(TABLE data, DESCRIPTOR(timecol), size, step)

1
2
3
4
5
6
7
8
SELECT * FROM TABLE(
HOP(
TABLE table_name,
DESCRIPTOR(time_field),
INTERVAL 'N' SECONDS|MINUTES|...,
INTERVAL 'n' SECONDS|MINUTES|...
)
);
累计窗口

滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。

不过在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。

例如,我们按天来统计网站的 PV(Page View,页面浏览量),如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累积窗口”(Cumulate Window)。

时间窗口内的分段累计:CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

cumulate window 维护了 slice state 和 merged state,每个 slice state 结束后会合并到 merged state中。

1
2
3
4
5
6
7
8
SELECT * FROM TABLE(
CUMULATE(
TABLE table_name,
DESCRIPTOR(time_field),
INTERVAL 'N' SECONDS|MINUTES|..., -- 输出划分单位
INTERVAL 'n' SECONDS|MINUTES|... -- 窗口大小
)
);

常用函数

比较函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-- 关系运算符:select null=null; 返回为null
= <> > >= < <=

--非空判断
IS [NOT] NULL

👉--不同于
v1 IS [NOT] DISTINCT FROM v2

👉--双区间
v1 [not] BETWEEN [ ASYMMETRIC(开区间) | SYMMETRIC(闭区间) ] v2 AND v3

--模糊:
s1 [not] LIKE string2

👉--正则
s1 [not] SIMILAR TO s2

--列举:要求 v1 和列表中的所有值类型一致
v1 [not] IN (v2,...,vn)

--子查询
[not] EXISTS (sub-query) --子查询至少返回一行时返回true
value [not] IN (sub-query)

比较逻辑

1
2
3
4
5
6
--逻辑运算符
and or not

--逻辑判断
b is [NOT] TRUE|FALSE
👉b is [NOT] UNKNOWN

算术函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
--正负号
+ numeric
- numeric

--算术运算符
+ - * /

--取余
%MOD(numeric1, numeric2)

--幂运算
POWER(numeric1, numeric2)
ABS(numeric)
CEIL(numeric)
FLOOR(numeric)
ROUND(numeric, integer)
TRUNCATE(numeric1, integer2)

--随机数
RAND() --介于 0.0(含)和 1.0(不含)之间
RAND(integer) --介于 0.0(含)和 1.0(不含)之间的伪随机双精度值
👉RAND_INTEGER(integer) --介于 0(含)和整数(不含)之间

--随机序列
UUID()

--以二进制格式返回整数的字符串表示形式。如果整数为 NULL,则返回NULL。
BIN(integer)

--以十六进制格式返回整数数值或字符串的字符串表示形式
HEX(numeric 或 string)

字符串函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
👉--拼接
string1 || string2
CONCAT(string1, string2,...) --任何参数为 NULL则返回 NULL。
CONCAT_WS(string1, string2, string3,...) --第一个字符拼接符不为null则不为null

--字符个数
CHAR_LENGTH(string)

--转大写
UPPER(string)

--转小写
LOWER(string)

👉--字串位置
POSITION(sub IN src)

👉--从参数2去除第一个参数1,默认为两边,通过参数可以只去除头或尾
TRIM([ BOTH | LEADING | TRAILING ] string1 FROM string2) --
LTRIM(string) --去除左边的空格
RTRIM(string) --去除右边的空格

--重复字符串整数次
REPEAT(string, integer)

--正则匹配
REGEXP(src, regex)

--正则替换
REGEXP_REPLACE(src, from, to)

--正则分组提取:小括号表示分组,groupId分组编号,0表示整个字符串
REGEXP_EXTRACT(src, regex[, goupId])

--替换
REPLACE(src, from, to)

👉--替换指定位置字符串
OVERLAY(src PLACING replace FROM intPos(不含) [ FOR size ])

--从指定位置截取指定长度的字串
SUBSTRING(src FROM intPos(包含) [ FOR size ])
--截取src从from开始长度为size的子字符串
SUBSTR(src[, from[, size]])

--首字母大写
INITCAP(string)

--填充
LPAD(string1, integer, string2)
RPAD(string1, integer, string2)

--返回 base64 解码的结果,解码
FROM_BASE64(string)
--返回 base64 编码的结果,编码
TO_BASE64(string)

--返回string的第一个字符的阿斯克码数值
ASCII(string)
--返回阿斯克码对应的字符
CHR(integer)

--字符集解码
DECODE(binary, string)
--字符集编码
ENCODE(string1, string2)

--返回s2在s1中第一次出现的位置
INSTR(s1, s2)
LOCATE(s1, s2[, pos]) --s2在s1的pos之后第一次出现的位置

--字符串截取
LEFT(string, integer) --从左边截取
RIGHT(string, integer) --从右边截取

--URL解析:protocol, host, path, query
PARSE_URL(url, PART_NAME1[, PART_NAME2])

--反转
REVERSE(string)

👉--分隔后取第N(0~len-1)值
SPLIT_INDEX(src, splitor, integer1) --用分割符分割后取第n个字串(从0开始)

--转map
--一个参数('k1=v1,...,kn=vn')为标准格式
--三个参数(src, pairSplit, kvSplit):值为 k1:v1;...;kn:vn
STR_TO_MAP(src[, pairSplit, kvSplit]])

时间函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
--时间单位:
SECONDMINUTEHOURDAY,WEEK,MONTH,QUARTER,YEAR

--日期格式化
DATE_FORMAT(timestamp, format)

CURRENT_DATE UTC 时区 系统日期
CURRENT_TIME UTC 时区 系统时间
CURRENT_TIMESTAMP
LOCALTIMESTAMP
LOCALTIME
EXTRACT(timeinterval unit FROM temporal)

YEAR(date)
QUARTER(date)
MONTH(date)
WEEK(date) --年周
DAYOFYEAR(date)
DAYOFMONTH(date)
DAYOFWEEK(date) --1~7
HOUR(timestamp)
MINUTE(timestamp)
SECOND(timestamp)

FLOOR(timepoint TO timeintervalunit)
CEIL(timepoint TO timeintervalunit)
(timepoint1, temporal1) OVERLAPS (timepoint2, temporal2) --时间间隔重叠
--select (TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR); 返回true
--select (TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR); 返回false

DATE string 日期字符串(yyyy-MM-dd)转为 DATE 类型
--select Date '2021-09-08'; 返回 2021-09-08
TIME string 时间字符串转(HH:mm:ss)为 TIME 类型
--select TIME '08:45:17'; 返回 08:45:17


--日期按格式转为 Data 类型
TO_DATE(dt[, format])

--将时间字符串按格式转为 TIMESTAMP 类型
TO_TIMESTAMP(dt[, format])
--将时间字符串“yyyy-MM-dd HH:mm:ss[.SSS]”转为 TIMESTAMP 类型
TIMESTAMP string

--返回系统当前时间
NOW()

--返回当前系统的时间戳
UNIX_TIMESTAMP()
--按format返回dt对应的整数秒
UNIX_TIMESTAMP(dt[, format])

--整数(按格式)转时间
FROM_UNIXTIME(numeric[, format])

--时间差
TIMESTAMPDIFF(UNIT, dt1, dt2)
--时间前后移动
TIMESTAMPADD(UNIT, int, dt)

👉--转时区:UTC, Asia/Shanghai, America/Los_Angeles
CONVERT_TZ(dt, tz1, tz2) 将日期时间格式字符串 从时区string2转换为时区string3

条件函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
--等值多重条件
CASE value WHEN value1_1 THEN result1 ELSE resultZ END
--区间多重条件
CASE WHEN condition1 THEN result1 ELSE resultZ END 满足某一个条件时

--等值判断:如果value1为NULL返回value2,否则返回value1。
NULLIF(value1, value2)

--返回第一个不为 NULL 的值
COALESCE(value1, value2 [, value3 ]* )

--双重条件
IF(condition, true_value, false_value)

--都是字母返回true,否则返回false
IS_ALPHA(string) 字符串中的所有字符都是字母,则返回真,否则返回假。

--能够转化为数值(包含整数和小数)返回true,否则返回false
IS_DECIMAL(string)

--都为数值返回true,否则返回false
IS_DIGIT(string) 如果字符串中的所有字符都是数字,则返回真,否则返回假。s

类型转换函数

1
cast(value as TYPE)

集合函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--构建数组:数组名[n]表示取数组的第n个元素,索引从 1 开始
array [ value1,value2... ]
--select arr[2] from (select array [1,4,5] arr)t; 返回 4
--select array ['a','b','c']; 返回 [a, b, c]

--构建map集合:集合名[key值]表示取key对应的value
map [key1,value1,key2,value2]
--select mp[2] from (select map [1,'a',2,'b'] mp)t; 返回 b
--select map [1,'a',2,'b']; 返回 {1=a, 2=b}

👉--返回map中键值对或数组中元素数量
CARDINALITY(map)
--select CARDINALITY(map [1,'a',2,'b']); 返回 2
--select CARDINALITY(array [1,2,3,0,0]); 返回5

👉--返回数组的唯一元素(数组长度1);如果数组为空,则返回 NULL;如果数组有多个元素,则抛出异常。
ELEMENT(array)
--select ELEMENT(array [1,2,3,0,0]) ; --执行失败
--select ELEMENT(array [2]) ; 返回2

👉值访问函数

1
2
3
4
5
--按名称从 Flink 复合类型(例如Tuple、POJO)返回字段的值
tableName.compositeType.field

--返回 Flink 复合类型(例如,Tuple、POJO)的平面表示,将其每个直接子类型转换为单独的字段。在大多数情况下,平面表示的字段与原始字段的命名类似,但使用美元分隔符(例如,mypojo$mytuple$f0)。
tableName.compositeType.*

分组功能函数

1
2
3
4
5
6
--返回唯一标识分组键组合的整数
GROUP_ID()

--返回给定分组表达式的位向量
GROUPING(expression1 [, expression2]* )
GROUPING_ID(expression1 [, expression2]* )

哈希函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
--以 32 个十六进制数字的字符串形式返回string的 MD5 哈希值;如果字符串为 NULL,则返回NULL。
MD5(string)
--以 40 个十六进制数字的字符串形式返回string的 SHA-1 哈希值;如果字符串为 NULL,则返回NULL。
SHA1(string)
--以 56 个十六进制数字的字符串形式返回string的 SHA-224 哈希值;如果字符串为 NULL,则返回NULL。
SHA224(string)
--以 64 个十六进制数字的字符串形式返回string的 SHA-256 哈希值;如果字符串为 NULL,则返回NULL。
SHA256(string)
--以 96 个十六进制数字的字符串形式返回string的 SHA-384 哈希值;如果字符串为 NULL,则返回NULL。
SHA384(string)
--以 128 个十六进制数字的字符串形式返回string的 SHA-512 哈希值;如果字符串为 NULL,则返回NULL。
SHA512(string)
--使用 SHA-2 系列散列函数(SHA-224、SHA-256、SHA-384 或 SHA-512)返回散列。第一个参数字符串是要散列的字符串,第二个参数hashLength是结果的位长(224、256、384或 512)。如果string或hashLength为 NULL,则返回NULL
SHA2(string, hashLength)

聚合函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
--默认情况下或使用 ALL,返回表达式不为 NULL的输入行数。去重使用 DISTINCT。
COUNT([ ALL ] expression | DISTINCT expression1 [, expression2]*)
--返回输入行数
COUNT(*) COUNT(1)

--默认情况下或使用关键字 ALL,返回所有输入行中表达式的平均值(算术平均值)。去重使用 DISTINCT。
AVG([ ALL | DISTINCT ] expression)
--默认情况下或使用关键字 ALL,返回所有输入行中表达式的平均值(算术和)。去重使用 DISTINCT。
SUM([ ALL | DISTINCT ] expression)
--默认情况下或使用关键字 ALL,返回所有输入行中表达式的最大值。
MAX([ ALL | DISTINCT ] expression)
--默认情况下或使用关键字 ALL,返回所有输入行中表达式的最小值。
MIN([ ALL | DISTINCT ] expression)
--默认情况下或使用关键字 ALL,返回所有输入行中表达式的总体标准偏差。去重使用 DISTINCT。
STDDEV_POP([ ALL | DISTINCT ] expression)
--默认情况下或使用关键字 ALL,返回所有输入行中表达式的样本标准偏差。去重使用 DISTINCT。
STDDEV_SAMP([ ALL | DISTINCT ] expression)
--默认情况下或使用关键字 ALL,返回所有输入行中表达式的总体方差(标准差的平方)。去重使用 DISTINCT。
VAR_POP([ ALL | DISTINCT ] expression)
--默认情况下或使用关键字 ALL,返回所有输入行中表达式的样本方差(样本标准差的平方)。去重使用 DISTINCT。
VAR_SAMP([ ALL | DISTINCT ] expression)

--默认情况下或使用关键字 ALL,返回所有输入行的多组表达式。NULL 值将被忽略。去重使用 DISTINCT。
COLLECT([ ALL | DISTINCT ] expression)

--窗口函数

--返回值在一组值中的排名。结果是一加先前分配的等级值。与函数 rank 不同,dense_rank 不会在排名序列中产生间隙。仅在眨眼规划器中受支持。
DENSE_RANK()
--根据窗口分区内行的顺序,为每一行分配一个唯一的序列号,从一开始。
ROW_NUMBER()
--返回窗口中当前行之前偏移第 th 行处的expression值。的默认值的偏移是1和默认值默认为NULL。仅在眨眼规划器中受支持。
LEAD(expression [, offset] [, default] )
--返回窗口中当前行后偏移第 th 行处的expression值。的默认值的偏移是1和默认值默认为NULL。 仅在眨眼规划器中受支持。
LAG(expression [, offset] [, default])
--返回一组有序值中的第一个值。 仅在眨眼规划器中受支持。
FIRST_VALUE(expression)
--返回一组有序值中的最后一个值。仅在眨眼规划器中受支持。
LAST_VALUE(expression)

👉--连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符。分隔符的默认值为“,”。仅在眨眼规划器中受支持。
LISTAGG(expression [, separator])

Flink-SQL
https://leaf-domain.gitee.io/2024/10/21/bigdata/flink/flink_04_sql/
作者
叶域
发布于
2024年10月21日
许可协议