Flink SQL Flink 具有两个关系型 API:Table API 与 SQL。
Table API 是 Scala 和 Java 语言集成查询的 API,可以非常直观的方式组合来自关系算子的查询。例如, 选择、过滤和连接。
Flink SQL 支持基于实现 SQL 标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。
依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 <properties > <maven.compiler.source > 8</maven.compiler.source > <maven.compiler.target > 8</maven.compiler.target > <scala.version > 2.12.10</scala.version > <scala.binary.version > 2.12</scala.binary.version > <hadoop.version > 3.1.3</hadoop.version > <flink.version > 1.13.6</flink.version > <flink.table.version > 1.7.2</flink.table.version > <flink.kafka.version > 1.11.2</flink.kafka.version > <mysql.version > 5.1.48</mysql.version > <fastjson.version > 1.2.76</fastjson.version > <play.json.version > 2.9.2</play.json.version > <kafka.version > 2.0.0</kafka.version > </properties > <dependencies > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-api-scala-bridge_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-planner-blink_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-scala_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-common</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-clients_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > </dependencies >
创建 TableEnvironment 方法一:静态创建 1 2 3 4 5 6 7 import org.apache.flink.table.api.{EnvironmentSettings , TableEnvironment }val settings: EnvironmentSettings = EnvironmentSettings .newInstance() .inStreamingMode() .build()val tabEnv: TableEnvironment = TableEnvironment .create(settings)
方法二:通过 Stream 1 2 3 4 5 6 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment val see: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironmentval tabEnv: TableEnvironment = StreamTableEnvironment .create(see)
数据装载 方法一:内存数据模拟 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 val dataType: DataType = DataTypes .ROW ( DataTypes .FIELD ("id" , DataTypes .INT ()), DataTypes .FIELD ("name" , DataTypes .STRING ()), DataTypes .FIELD ("age" , DataTypes .INT ()), DataTypes .FIELD ("gender" , DataTypes .STRING ()) )val table: Table = tabEnv.fromValues( dataType, row(1 , "henry1" , 22 , "男" ), row(2 , "henry2" , 28 , "女" ), row(3 , "henry3" , 31 , "女" ), row(4 , "henry4" , 26 , "男" ), row(5 , "henry5" , 18 , "男" ) ) table.printSchema() tabEnv.createTemporaryView("stu" ,table)
方法二:数据源 Connector 0.1、数据类型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 CHAR 、👉 CHAR (n)VARCHAR 、VARCHAR (n)、👉 STRINGBINARY 、BINARY (n):VARBINARY 、VARBINARY (n)、BYTES 👉 DECIMAL (p, s)、NUMERIC (p, s): TINYINT SMALLINT 👉 INT 、INTEGER 👉 BIGINT FLOAT DOUBLE 、DOUBLE PRECISION 👉 BOOLEAN 。 👉 DATE 👉 TIME 、TIME (p) 👉TIMESTAMP 、TIMESTAMP (p)、TIMESTAMP WITHOUT TIME ZONE、TIMESTAMP (p) WITHOUT TIME ZONE TIMESTAMP WITH TIME ZONE、TIMESTAMP § WITH TIME ZONE TIMESTAMP_LTZ、TIMESTAMP_LTZ§ ARRAY MULTISET ROW MAP
1、文件 : FileSystem SQL Connector 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE TABLE MyUserTable ( column_name1 INT , column_name2 STRING, ... part_name1 INT , part_name2 STRING ) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem' , 'path' = 'file:///path/to/whatever' , 'format' = '...' , 'partition.default-name' = '...' , 'sink.shuffle-by-partition.enable' = '...' , ... )
CSV 依赖
1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-csv</artifactId > <version > ${flink.version}</version > </dependency >
案例
1 2 3 4 5 1,henry1,22,男 2,henry2,28,女 3,henry3,31,女 4,henry4,26,男 5,henry5,18,男
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 tabEnv.executeSql( """ |create table stu_csv( | id int, | name string, | age int, | gender string |) with ( | 'connector'='filesystem', | 'path'='file:///E:\BaiduSyncdisk\bigdata\projects\2_datawarehouse_live\livestock\livewarehouse\files\stu_csv.csv', | 'format'='csv', | 'csv.filed-delimiter'=',', | 'csv.ignore-parse-errors'='true' |) |""" .stripMargin) tabEnv.from("stu_csv" ) .select($"*" ) .execute() .print()
JSON 依赖
1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-json</artifactId > <version > ${flink.version}</version > </dependency >
案例
1 2 3 4 {"id" :1,"name" :"henry1" ,"age" :22,"gender" :"男" } {"id" :2,"name" :"henry2" ,"age" :18,"gender" :"女" } {"id" :3,"name" :"henry3" ,"age" :27,"gender" :"男" } {"id" :4,"name" :"henry4" ,"age" :19,"gender" :"男" }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 tabEnv.executeSql( """ |create table stu_json( | id int, | name string, | age int, | gender string |) with ( | 'connector'='filesystem', | 'path'='file:///E:\BaiduSyncdisk\bigdata\projects\2_datawarehouse_live\livestock\livewarehouse\files\stu_csv.csv', | 'format'='json', | 'json.ignore-parse-errors'='true' |) |""" .stripMargin) tabEnv.from("stu_json" ) .select($"*" ) .execute() .print()
2、消息队列 依赖 1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-kafka_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency >
kafka : source 主要用于数据的实时传输和存储,适用于需要高效数据传输 的场景
案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 tabEnv.executeSql( """ |create table student( | id int, | name string, | age int, | gender string |) with ( | 'connector'='kafka', | 'topic'='flink_kafka_student', | 'properties.bootstrap.servers'='single:9092', | 'properties.group.id'='fks_consume', | 'scan.startup.mode'='earliest-offset', | 'format'='csv', | 'csv.field-delimiter'=',', | 'csv.ignore-parse-errors'='true' |) |""" .stripMargin) tabEnv.sqlQuery( """ | select * from student |""" .stripMargin) .execute() .print()
upsert kafka : source and sink 作为source时,Upsert Kafka Connector生产changelog流,每条数据记录代表一个更新或删除事件。 作为sink时,它将INSERT/UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。 这个连接器特别适用于需要更新或删除操作 的数据处理场景
依赖同 kafka
查看磁盘空间
1 2 3 4 5 df -h --block-size=Gdu -sh --block-size=G /PATH
java 数据合并
1 2 3 4 -Xms16g -Xmx16g -Xss1g -XX:-UseGCOverheadLimit
启动 kafka
1 2 zkServer.sh start kafka-server-start.sh -daemon /opt/software/kafka_2.12-2.8.0/config/server.properties
flume 采集数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 a1.sources = r1 a1.channels = c1 a1.sinks = s1 a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /root/project_ebs/act_log_extract/flume_config/position-file/transaction_pos.log a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/ebs_act_log/transaction_log/part-.* a1.sources.r1.fileHeader = false a1.channels.c1.type = file a1.channels.c1.checkpointDir = /root/project_ebs/act_log_extract/flume_config/channel-checkpoint a1.channels.c1.dataDirs = /root/project_ebs/act_log_extract/flume_config/channel-data a1.sinks.s1.type = hdfs a1.sinks.s1.hdfs.fileType = DataStream a1.sinks.s1.hdfs.writeFormat = Text a1.sinks.s1.hdfs.path = hdfs://single:9000/external_ebs/transaction a1.sinks.s1.hdfs.filePrefix = event- a1.sinks.s1.hdfs.fileSuffix = .json a1.sinks.s1.hdfs.rollInterval = 180 a1.sinks.s1.hdfs.rollSize = 134217728 a1.sinks.s1.hdfs.rollCount = 0 a1.sinks.s1.channel = c1 a1.sources.r1.channels = c1 a1.sources = r1 a1.channels = c1 a1.sinks = s1 a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /root/project_events/tail_dir_log/tail_pos.log a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/project_events/store/.*.csv a1.sources.r1.fileHeader = true a1.sources.r1.headers.f1.headerKey1 = store a1.channels.c1.type = file a1.channels.c1.checkpointDir = /root/project_events/checkpoint a1.channels.c1.dataDirs = /root/project_events/file_channel_data a1.sinks.s1.type = hive a1.sinks.s1.hive.metastore = thrift://master01:9083 a1.sinks.s1.hive.database = ods_eb a1.sinks.s1.hive.table = ods_store_review a1.sinks.s1.useLocalTimeStamp = false a1.sinks.s1.round = true a1.sinks.s1.roundValue = 10 a1.sinks.s1.roundUnit = minute a1.sinks.s1.serializer = DELIMITED a1.sinks.s1.serializer.delimiter = "," a1.sinks.s1.serializer.fieldnames = transaction_id,store_id,review_score a1.sinks.s1.channel = c1 a1.sources.r1.channels = c1
3、数据库 HBase 依赖
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-hbase-2.2_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > ${hadoop.version}</version > </dependency >
案例
虚拟机:single 数据表:yb12211:for_mysql_import 列族:base 字段:rowkey, name, age, gender, phone
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 tabEnv.executeSql( """ |create table student( | id int, | base ROW<test_age int, test_gender string, test_name string, test_phone string>, | primary key(id) not enforced |) with ( | 'connector'='hbase-2.2', | 'table-name'='yb12211:for_mysql_import', | 'zookeeper.quorum'='single:2181' |) |""" .stripMargin) tabEnv.sqlQuery( """ | select | id, | base.test_age, | base.test_gender, | base.test_name, | base.test_phone | from student |""" .stripMargin) .execute() .print()
JDBC 依赖
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-jdbc_${scala.binary.version}</artifactId > <version > 1.13.6</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 8.0.25</version > </dependency >
案例
虚拟机:single 数据库:test_only 数据表:customer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 tabEnv.executeSql( """ | create table student( | customer_id int, | gender string, | customer_name string, | email string, | address string, | country string, | `language` string, | job_title string, | job string, | credit_type string, | credit_no string, | primary key(customer_id) not enforced |) with ( | 'connector'='jdbc', | 'url'='jdbc:mysql://single:3306/test_only', | 'driver'='com.mysql.cj.jdbc.Driver', | 'table-name'='customer', | 'username'='henry', | 'password'='ybremote' |) |""" .stripMargin) tabEnv.sqlQuery( """ | select * from student |""" .stripMargin) .execute() .print()
4、数据生成:用于性能测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 tabEnv.executeSql(""" |create table stu_csv( | id int, | name string, | age int, | gender int |) with ( | 'connector'='datagen', | 'number-of-rows'='100', | 'fields.id.start'='1', | 'fields.id.end'='100', | 'fields.id.kind'='sequence', | 'fields.name.length'='5', | 'fields.name.kind'='random', | 'fields.age.min'='18', | 'fields.age.max'='24', | 'fields.age.kind'='random', | 'fields.gender.min'='0', | 'fields.gender.max'='1', | 'fields.gender.kind'='random' |) |""" .stripMargin) tabEnv.from("stu_csv" ) .select($"*" ) .execute() .print()
窗口函数 流数据是持续实时产生的,不可能等所有数据收集齐才计算,通常也不会每条都计算。可以通过水位线和窗口的方式将数据按时间分段,逐段处理。
建表时指定水位线 1 2 3 4 5 6 7 8 CREATE TABLE table_name( ..., time_field timestamp (3 ), WATERMARK for time_field as time_field - 'N' seconds| minutes| ... ) WITH ( connector= 'kafka' , ... )
查询时开窗 group window 在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组 “分组窗口”(Group Window)函数。
滚动窗口:TUMBLE(time_attr, interval)
窗口上边界:TUMBLE_START(time_field, INTERVAL ‘N’ seconds|minutes|…) 窗口下边界:TUMBLE_END(time_field, INTERVAL ‘N’ seconds|minutes|…) EventTime窗口结束时间(不含边界):TUMBLE_ROWTIME ProcessTime窗口结束时间(不含边界):TUMBLE_PROCTIME
滑动窗口:HOP(time_attr, interval, interval) 会话窗口:SESSION(time_attr, interval)
1 2 3 4 5 select ..., TUMBLE_START(time_field, INTERVAL 'N' seconds| minutes| ...) as window_startfrom table_namegroup by ..., TUMBLE(time_field, INTERVAL 'N' seconds| minutes| ...)
tvf window
从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数。
目前 Flink 提供了以下几个窗口 TVF:
Tumble Windows Hop Windows Cumulate Windows Session Windows
窗口表值函数可以完全替代传统的分组窗口函数。窗口 TVF 更符合 SQL 标准,性能得到了优化,拥有更强大的功能;可以支持基于窗口的复杂计算,例如窗口 Top-N、窗口联结(window join)等等。
在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:
“窗口起始点”(window_start) “窗口结束点”(window_end) “窗口时间”(window_time) <=> window_end - 1ms
在 SQL 中的声明方式,与以前的分组窗口是类似的,直接调用 TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。
滚动窗口 与DataStream API 中的定义一样,是长度固定、时间对齐、无重叠 的窗口,一般用于周期性的统计计算。
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
1 2 3 SELECT * FROM TABLE ( TUMBLE(TABLE table_name, DESCRIPTOR(time_field), INTERVAL 'N' SECONDS| MINUTES| ...) );
滑动窗口 滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数。
HOP(TABLE data, DESCRIPTOR(timecol), size, step)
1 2 3 4 5 6 7 8 SELECT * FROM TABLE ( HOP( TABLE table_name, DESCRIPTOR(time_field), INTERVAL 'N' SECONDS| MINUTES| ..., INTERVAL 'n' SECONDS| MINUTES| ... ) );
累计窗口 滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。
不过在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。
例如,我们按天来统计网站的 PV(Page View,页面浏览量),如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累积窗口”(Cumulate Window)。
时间窗口内的分段累计:CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
cumulate window 维护了 slice state 和 merged state,每个 slice state 结束后会合并到 merged state中。
1 2 3 4 5 6 7 8 SELECT * FROM TABLE ( CUMULATE( TABLE table_name, DESCRIPTOR(time_field), INTERVAL 'N' SECONDS| MINUTES| ..., INTERVAL 'n' SECONDS| MINUTES| ... ) );
常用函数 比较函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 = <> > >= < <= IS [NOT ] NULL 👉 v1 IS [NOT ] DISTINCT FROM v2 👉 v1 [not ] BETWEEN [ ASYMMETRIC (开区间) | SYMMETRIC (闭区间) ] v2 AND v3 s1 [not ] LIKE string2 👉 s1 [not ] SIMILAR TO s2 v1 [not ] IN (v2,...,vn) [not ] EXISTS (sub- query) value [not ] IN (sub- query)
比较逻辑 1 2 3 4 5 6 and or not b is [NOT ] TRUE | FALSE 👉b is [NOT ] UNKNOWN
算术函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 + numeric - numeric + - * / % 、MOD (numeric1, numeric2)POWER (numeric1, numeric2) ABS (numeric ) CEIL (numeric )FLOOR (numeric ) ROUND(numeric , integer )TRUNCATE (numeric1, integer2) RAND() RAND(integer ) 👉RAND_INTEGER(integer ) UUID() BIN(integer ) HEX(numeric 或 string)
字符串函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 👉 string1 || string2 CONCAT(string1, string2,...) CONCAT_WS(string1, string2, string3,...) CHAR_LENGTH (string)UPPER (string) LOWER (string) 👉POSITION (sub IN src) 👉TRIM ([ BOTH | LEADING | TRAILING ] string1 FROM string2) LTRIM(string) RTRIM(string) REPEAT(string, integer ) REGEXP(src, regex) REGEXP_REPLACE(src, from , to ) REGEXP_EXTRACT(src, regex[, goupId]) REPLACE(src, from , to ) 👉OVERLAY (src PLACING replace FROM intPos(不含) [ FOR size ])SUBSTRING (src FROM intPos(包含) [ FOR size ]) SUBSTR(src[, from [, size]]) INITCAP(string) LPAD(string1, integer , string2) RPAD(string1, integer , string2) FROM_BASE64(string) TO_BASE64(string) ASCII(string) CHR(integer ) DECODE(binary , string) ENCODE(string1, string2) INSTR(s1, s2) LOCATE(s1, s2[, pos]) LEFT (string, integer ) RIGHT (string, integer ) PARSE_URL(url, PART_NAME1[, PART_NAME2]) REVERSE(string) 👉 SPLIT_INDEX(src, splitor, integer1) STR_TO_MAP(src[, pairSplit, kvSplit]])
时间函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 SECOND ,MINUTE ,HOUR ,DAY ,WEEK,MONTH ,QUARTER,YEAR DATE_FORMAT(timestamp , format)CURRENT_DATE UTC 时区 系统日期CURRENT_TIME UTC 时区 系统时间CURRENT_TIMESTAMP LOCALTIMESTAMP LOCALTIME EXTRACT (timeinterval unit FROM temporal) YEAR (date ) QUARTER(date )MONTH (date ) WEEK(date ) DAYOFYEAR(date ) DAYOFMONTH(date ) DAYOFWEEK(date ) HOUR (timestamp )MINUTE (timestamp )SECOND (timestamp )FLOOR (timepoint TO timeintervalunit)CEIL (timepoint TO timeintervalunit) (timepoint1, temporal1) OVERLAPS (timepoint2, temporal2) DATE string 日期字符串(yyyy- MM- dd)转为 DATE 类型 TIME string 时间字符串转(HH:mm:ss)为 TIME 类型 TO_DATE(dt[, format]) TO_TIMESTAMP(dt[, format])TIMESTAMP string NOW() UNIX_TIMESTAMP() UNIX_TIMESTAMP(dt[, format]) FROM_UNIXTIME(numeric [, format]) TIMESTAMPDIFF(UNIT, dt1, dt2) TIMESTAMPADD(UNIT, int , dt) 👉 CONVERT_TZ(dt, tz1, tz2) 将日期时间格式字符串 从时区string2转换为时区string3
条件函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 CASE value WHEN value1_1 THEN result1 ELSE resultZ END CASE WHEN condition1 THEN result1 ELSE resultZ END 满足某一个条件时NULLIF (value1, value2)COALESCE (value1, value2 [, value3 ]* ) IF(condition , true_value, false_value) IS_ALPHA(string) 字符串中的所有字符都是字母,则返回真,否则返回假。 IS_DECIMAL(string) IS_DIGIT(string) 如果字符串中的所有字符都是数字,则返回真,否则返回假。s
类型转换函数
集合函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 array [ value1,value2... ] map [key1,value1,key2,value2] 👉CARDINALITY (map) 👉ELEMENT (array )
👉值访问函数 1 2 3 4 5 tableName.compositeType.field tableName.compositeType.*
分组功能函数 1 2 3 4 5 6 GROUP_ID()GROUPING (expression1 [, expression2]* ) GROUPING_ID(expression1 [, expression2]* )
哈希函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 MD5(string) SHA1(string) SHA224(string) SHA256(string) SHA384(string) SHA512(string) SHA2(string, hashLength)
聚合函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 COUNT ([ ALL ] expression | DISTINCT expression1 [, expression2]* )COUNT (* ) COUNT (1 )AVG ([ ALL | DISTINCT ] expression) SUM ([ ALL | DISTINCT ] expression)MAX ([ ALL | DISTINCT ] expression)MIN ([ ALL | DISTINCT ] expression)STDDEV_POP ([ ALL | DISTINCT ] expression)STDDEV_SAMP ([ ALL | DISTINCT ] expression)VAR_POP ([ ALL | DISTINCT ] expression)VAR_SAMP ([ ALL | DISTINCT ] expression)COLLECT ([ ALL | DISTINCT ] expression)DENSE_RANK () ROW_NUMBER ()LEAD (expression [, offset ] [, default ] ) LAG (expression [, offset ] [, default ]) FIRST_VALUE (expression) LAST_VALUE (expression) 👉LISTAGG (expression [, separator])