ClickHouse Theory

[TOC]

ClickHouse Theory

特点(快)

C++开发,列式数据库,在线分析处理,SQL查询,实时分析

表引擎四大类,20多种引擎,如:MergeTree

高速读写:类 LSM Tree结构。类似 HBase Memcache写入,版本与时间戳,后台定期 Compact,Merge Sort后落盘,Major Compact 后历史版本删除。顺序写入 append,且写入后只读

数据分区,线程级并行:单查询满CPU,QPS弱,CPU开销大。

命令行

1
2
clickhouse-client -password YOUR_PASSWORD --query "select ..."
clickhouse-client -m --password

数据类型

整数

有符号

Int8 , Int16 , Int32 , Int64

无符号

UInt8 , UInt16 , UInt32 , UInt64

浮点型

👉 注意:尽量用整数,将小数放大

Float32 , Float64

Decimal(s) s表示精度

Decimal32(s) 1-9 Decimal(9-s,s)

Decimal64(s) 1-18 Decimal(18-s,s)

Decimal128(s) 1-38 Decimal(38-s,s)

布尔型

没有单独的类型来存储布尔值。可以使用 UInt8 类型,取值限制为 0 或 1。

字符串

String varchar

FixedString(N) char(N)

枚举类型

本质:字符串的语义映射便于存储的整数

语法:“String”=Int

种类:Enum8,Enum16

案例:Enum8(“Male”=1, “Female”=2)

特殊:通过 cast(枚举字段, Int8) 函数转为整数

插入时:可以数字可以字符 INSERT INTO tablename VALUES (‘Male’),(‘Female’),(1);

时间类型

Date yyyy-MM-dd

Datetime yyyy-MM-dd HH:mm:ss

Datetime64 yyyy-MM-dd HH:mm:ss.SSS

  • Datetime64([3,6,9], timeZone)
  • Datetime64(3, ‘Asia/Shanghai’)

数组

标准声明:Array(T)

  • T:类似泛型,但不建议 T 为数组,尤其是 MergeTree 引擎

使用语法:array(1,2,3)

简易语法:[1,2,3]

扩展:toTypeName(字段名) 获取字段类型

元组

Tuple(T1, T2, …)

键值对

Map(K,V)

允许空值

Nullable(T)

不推荐 👆,用各种类型允许的非业务合法值

引擎

和 MySql 一样存在多个引擎

数据库引擎

表引擎

  1. 决定数据的存储方式和位置
  2. 支持的查询类型
  3. 是否支持数据并发访问
  4. 是否支持多线程
  5. 如果使用索引
  6. 数据复制参数

👉 注意:引擎名称大小写敏感,不能写错

合并树家族
  • 👉 MergeTree

    • 支持索引和分区
  • ReplacingMergeTree

    • 去重
    1
    2
    3
    4
    5
    ➢ 实际上是使用 order by 字段作为唯一键
    ➢ 去重不能跨分区
    ➢ 只有同一批插入(新版本)或合并分区时才会进行去重
    ➢ 认定重复的数据保留,版本字段值最大的
    ➢ 如果版本字段相同则按插入顺序保留最后一笔
  • SummingMergeTree

    • 求和
    1
    2
    3
    4
    5
    6
    7
    ➢ 以 SummingMergeTree()中指定的列作为汇总数据列
    ➢ 可以填写多列必须数字列,如果不填,以所有非维度列且为数字列的字段为汇总数
    据列
    ➢ 以 order by 的列为准,作为维度列
    ➢ 其他的列按插入顺序保留第一行
    ➢ 不在一个分区的数据不会被聚合
    ➢ 只有在同一批次插入(新版本)或分片合并时才会进行聚合
  • AggregatingMergeTree

    • 聚合
  • CollapsingMergeTree

  • VersionedCollapsingMergeTree

  • GraphiteMergeTree

日志家族
集成家族
  • ODBC

  • JDBC

  • MySQL

  • MongoDB

  • Redis

  • HDFS

  • S3

  • 👉 Kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    (
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
    ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_max_rows_per_message = 1];

    CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
    ) ENGINE = Kafka SETTINGS
    kafka_broker_list = 'localhost:9092',
    kafka_topic_list = 'topic',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4;

    CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
    ) ENGINE = SummingMergeTree(day, (day, level), 8192);

    CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

    SELECT level, sum(total) FROM daily GROUP BY level;

    必选参数

    • kafka_broker_list

      Kafka代理(Broker)列表(如:master01:9092,master02:9092,worker01:9092)

    • kafka_topic_list

      Kafka主题的列表,每个主题都是Kafka中用于存储消息的逻辑分区。

    • kafka_group_name

      Kafka消费者的组名。Kafka的消费者组用于跟踪每个消费者的读取进度,确保消息在组内的消费者之间均衡分配,且每条消息只被组内的一个消费者读取。如果你不希望消息在集群中被重复读取,那么应该在所有地方使用相同的组名。

    • kafka_format

      Kafka消息格式。这个参数使用与SQL的FORMAT函数相同的表示法。
      指定了Kafka消息的序列化和反序列化格式。
      不同的格式支持不同的数据表示方式,如JSONEachRow、CSV等。
      选择合适的格式取决于你的具体需求和数据结构。

    可选参数

    • kafka_schema

      如果格式要求定义模式(schema),则必须使用的参数。这个参数确保了在序列化和反序列化消息时,数据符合预定的数据结构。

    • kafka_num_consumers

      每个表的消费者数量。如果单个消费者的吞吐量不足,可以指定更多的消费者。但消费者总数不应超过主题中的分区数,因为每个分区只能分配给一个消费者,同时也不应大于部署ClickHouse的服务器上的物理核心数。默认值:1。

    • kafka_max_block_size

      poll操作的最大批次大小(以消息数计)。这有助于控制一次性从Kafka读取和处理的数据量。默认值:max_insert_block_size

    • kafka_skip_broken_messages

      Kafka消息解析器对每个块中模式不兼容消息的容忍度。如果kafka_skip_broken_messages = N,则引擎会跳过N个无法解析的Kafka消息(一个消息等于一行数据)。这有助于跳过因模式变更等原因导致的不兼容消息。默认值:0。

    • kafka_commit_every_batch

      是否每消费并处理一个批次就提交一次,而不是在写入整个块后才进行单次提交。这有助于更频繁地更新消费者的偏移量,但可能增加写入的开销。默认值:0(不每批次提交)。

    • kafka_client_id

      客户端标识符。默认情况下为空。这有助于在Kafka的日志和监控中标识来自不同来源的客户端请求。

    • kafka_poll_timeout_ms

      从Kafka进行单次poll操作的超时时间。这定义了等待从Kafka服务器接收数据的时间长度。默认值:stream_poll_timeout_ms

    • kafka_poll_max_batch_size

      单次Kafka poll操作中要拉取的最大消息数量。这有助于控制从Kafka一次性读取的数据量。默认值:max_block_size

    • kafka_flush_interval_ms

      从Kafka刷新数据的超时时间。这定义了将数据从Kafka写入到ClickHouse(或其他存储系统)的时间间隔。默认值:stream_flush_interval_ms

    • kafka_thread_per_consumer

      是否为每个消费者提供独立的线程。启用时,每个消费者将独立并行地刷新数据(否则,来自多个消费者的行将被压缩以形成一个块)。这可以增加并行性,但也可能增加资源消耗。默认值:0(不使用独立线程)。

    • kafka_handle_error_mode

      Kafka引擎处理错误的方式。可能的值包括:default(如果无法解析消息,则抛出异常),stream(异常消息和原始消息将被保存在虚拟列_error_raw_message中)。

    • kafka_commit_on_select

      在执行选择查询时是否提交消息。这通常不是推荐的做法,因为它可能会导致消息被重复处理,但如果出于某些特殊需求需要这样做,可以启用此选项。默认值:false

    • kafka_max_rows_per_message

      对于基于行的格式,每个Kafka消息中写入的最大行数。这有助于控制消息的大小和处理的复杂性。默认值:1(每个消息只写一行)。

  • EmbeddedRocksDB

  • RabbitMQ

  • PostgreSQL

  • S3Queue

其他特殊
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
create table tran_kafka.smt_store_hour_sum_price(
store_id ,
tran_year ,
tran_month ,
tran_day ,
tran_hour,
tran_minute,
minute_sum_amount
)
engine = SumingMergerTree(minute_sum_amount)

optimize table tran_kafka.smt_store_hour_sum_price order by store_id, tran_year, tran_month, tran_day, tran_hour, tran_minute;

order by (store_id, tran_year, tran_month, tran_day, tran_hour)
partition by (tran_year, tran_month, tran_day)


create materialized view v_tran_to_sum
to tran_kafka.smt_store_minute_sum_price
as
with tran_time_dim as (
select
store_id,
toYear(create_time) as tran_year,
toMonth(create_time) as tran_month,
toDate(create_time) as tran_day,
toHour(create_time) as tran_hour,
toMinute(create_time) as tran_minute,
price
from tran_kafka.obs_tran_kafka
)
select
store_id, tran_year, tran_month, tran_day, tran_hour, tran_minute,
sum(price) as minute_sum_amount
from tran_time_dim
group by store_id, tran_year, tran_month, tran_day, tran_hour, tran_minute;


1
2
3
4
insert into t_order_mt3 values
(106,'sku_001',1000.00,'2025-03-09 17:41:30'),
(107,'sku_002',2000.00,'2025-03-09 17:41:30'),
(110,'sku_003',600.00,'2025-03-09 17:41:30');

ClickHouse Theory
https://leaf-domain.gitee.io/2025/03/22/databases/clickhouse/clickhouse_2_theory/
作者
叶域
发布于
2025年3月22日
许可协议