CHEATSHEET March 19, 2024

ClickHouse CheatSheet

Words count 27k Reading time 24 mins. Read count 0

启动

默认绑定端口9000 与hdfs冲突,修改tcp_port
默认配置文件/etc/clickhouse-server/config.xml
自定义配置文件目录 /etc/clickhouse-server/config.d/

clickhouse start
clickhouse stop
clickhouse-client

docker

1
2
docker pull clickhouse/clickhouse-server:22.8.14.53
docker run -di --name=clickhouse-server -p 8123:8123 clickhouse/clickhouse-server:22.8.14.53

概念

数据类型

枚举值
低基数string

数据库Engine

  • Atomic(默认)
    • 数据存储在目录:/clickhouse_path/store/xxx/UUID/
    • 特别命令:exchange tables t1 and t2
  • Replicated(整库复制)
    • 基于Atomic,将DDL日志写入ZooKeeper并在给定数据库的所有副本上执行的元数据复制。
  • Lazy
    • 只在内存中保存上次访问后expiration_time_in_seconds秒,常用于Log
  • SQLite(映射访问)
  • MySQL(映射访问)
  • MaterializeMySQL(副本)
    • ClickHouse服务器作为MySQL副本工作,创建ClickHouse数据库,包含MySQL中所有的表,以及这些表中的所有数据。它读取binlog并执行DDL和DML查询。
  • PostgreSQL(映射访问)
  • MaterializePostgreSQL(副本)
    • ClickHouse服务器作为PostgreSQL副本工作。

表Engine

  • MergeTree(使用replicated引擎)
    • MergeTree
      • 快速插入,后台合并
      • 主键 或 order by排序
      • 支持分区/副本/采样sample by/TTL
    • ReplacingMergeTree
      • 删除排序键值相同的重复项
    • SummingMergeTree
      • ClickHouse可能不会完整的汇总所有行,会按片段把所有具有相同主键的行合并为一行,以至于不同的数据片段中会包含具有相同主键的行,即单个汇总片段将会是不完整的。
    • AggregatingMergeTree
      • 插入数据,需使用带有 -State- 聚合函数
    • CollapsingMergeTree
    • VersionedCollapsingMergeTree
    • GraphiteMergeTree
      • 时序数据
  • 日志(适合小表≤100w)
    • TinyLog
    • StripeLog
    • Log
  • 集成
    • Kafka
    • MySQL
    • ODBC
    • JDBC
    • HDFS
  • 其他
    • Distributed
    • MaterializedView
    • Dictionary
    • Merge
    • File
    • Null
    • Set
    • Join
    • URL
    • View
    • Memory
    • Buffer

函数

bar(price, 0, 1000000, 80 )

Projection加速查询 (类似MV)

允许我们通过存储任意格式的预先聚合的数据来提高查询速度。在执行时,如果 ClickHouse 认为 Projection 可以提高查询的性能,它将使用 Projection(何时使用由 ClickHouse 决定)。

分布式表

实际上是一种view(像HDFS联邦),映射到ClickHouse集群的本地表。 从分布式表中执行SELECT查询会使用集群所有分片的资源。 您可以为多个集群指定configs,并创建多个分布式表,为不同的集群提供视图。

ClickHouse负责所有副本的数据一致性,并在失败后自动运行恢复过程。建议将ZooKeeper集群部署在单独的服务器上(其中没有其他进程,包括运行的ClickHouse)

数据格式操作

1
2
3
4
5
6
7
8
9
10
# CSV
$ clickhouse-client --format_csv_delimiter="|" --query="INSERT INTO test.csv FORMAT CSV" < data.csv

# Parquet
$ cat {filename} | clickhouse-client --query="INSERT INTO {some_table} FORMAT Parquet"
$ clickhouse-client --query="SELECT * FROM {some_table} FORMAT Parquet" > {some_file.pq}

# ORC
$ cat filename.orc | clickhouse-client --query="INSERT INTO some_table FORMAT ORC"
$ clickhouse-client --query="SELECT * FROM {some_table} FORMAT ORC" > {filename.orc}

主键

primary keys: not unique, determines how the data is sorted => 理解为carbondata的sort columns

Every 8,192 rows or 10MB of data (referred to as the index granularity) creates an entry in the primary key index file. This granularity concept creates a sparse index that can easily fit in memory, and the granules represent a stripe of the smallest amount of column data that gets processed during SELECT queries.

The primary key can be defined using the PRIMARY KEY parameter.
If you define a table without a PRIMARY KEY specified, then the key becomes the tuple specified in the ORDER BY clause.
If you specify both a PRIMARY KEY and an ORDER BY, the primary key must be a subset of the sort order.

原理

OLAP毫秒级

prototype: to do just a single task well: to filter and aggregate data as fast as possible
列存
索引:内存只存储必要列的信息
压缩
向量查询 / 列式处理
可扩展性
关注底层细节: 根据每个查询的特点从多种变式中选择最合适的数据结构实现

Share Nothing 架构

shard + replica; shard间隔离

Table -> Part(类似segment 目录隔离) -> 颗粒级别的列 (单独的bin数据文件和mrk元信息文件)

数据分布

本地存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
access 用户、角色、额度、策略等.list文件
cores 空
data 数据目录
dictionaries_lib 空
flags 空
format_schemas 空
metadata 建库建表语句
metadata_dropped 空
named_collections 空
preprocessed_configs 运行中的config文件
status 文件记录了server PID,启动时间,版本信息
store 系统表相关数据文件目录
tmp 用于处理大查询时生成的临时文件的存储路径
user_defined 空
user_files 空
user_scripts 空
uuid

1
2
3
4
5
6
7
8
9
10
11
12
id_test 表名
├── all_1_1_0 分区
│ ├── ID.bin 默认 lz4 压缩
│ ├── ID.mrk2 记录数据块在 bin 文件中的偏移量
│ ├── StringID.bin
│ ├── StringID.mrk2
│ ├── checksums.txt 校验文件
│ ├── columns.txt 列名以及数据类型
│ ├── count.txt 数据的总行数
│ └── primary.idx 主键索引文件
├── detached
└── format_version.txt

ParitionID_MinBlockNum_MaxBlockNum_level,示例202103_1_1_0,不代表连续的block

ParitionID: 分区字段的值
MinBlockNum: 最小块序号
MaxBlockNum: 最大块序号
Level: 参与合并的次数

BlockNum是表内全局累加,每次创建一个新的分区目录,就会累加1。

加载过程

Input -> granularity=8192 (类似blocklet) -> blocksize (类似block) -> compress -> block落盘

Index:

  • 主键:以index_granularity为单位
  • MinMax:以index_granularity为单位
  • Set (distinct值):以index_granularity为单位
  • BloomFilter

压缩块

数据存储文件column.bin中存储的是一列数据。
一个压缩数据块由头信息和压缩数据两部分组成,头信息固定使用9位字节表示,具体由1个UInt8(1字节)整型和2个UInt32(4字节)整型组成,
分别代表使用的压缩算法类型、压缩后的数据大小和压缩前的数据大小。
每个压缩数据块的体积,按照其压缩前的数据字节大小,都被严格控制在64KB ~ 1MB,
其上下限分别由 min_compress-block_size(默认65535=64KB)与max_compress_block_size(默认1MB)参数指定。

每8192条记录,其实就是一条一级索引,一个索引区间压缩成一个数据块.
新版本的Clickhouse中,会默认开启自适应granularity,新增配置项index_granularity_bytes来使得一个granule的数据大小不仅取决于行数,也取决于数据大小

数据标记文件数据存储文件column.mrk与数据存储文件column.bin文件一一对应,是一级索引与数据块之间关系的数据

分布式 副本

创建集群 指定分片和副本; 注意单个节点只支持存储单个分片单个副本,否则报错 DB::Exception: There are two exactly the same ClickHouse instance
(但可以一个节点 运行多个不同端口实例)

spark-clickhouse-connector的分区数取决于shards数量

internal_replication参数,
为true代表了只写入shard内的一台,与ZooKeeper配合进行复制;
为false代表了写入shard内所有的replica,与分布式表配合进行复制。

使用写分布式表的缺点:
①使用写分布式表进行复制,则可能出现写一边成功一边失败的情况,数据的一致性不可控,
②在一台服务器宕机一阵子以后,再恢复过来则这个时间段里面的数据不能自动恢复从另外的replica恢复过来。

https://blog.csdn.net/weixin_43786255/article/details/106292490

CREATE DATABASE IF NOT EXISTS bmp ON CLUSTER analysis;
drop table bmp.t_bms_service_sub_local ON CLUSTER analysis;
CREATE TABLE IF NOT EXISTS bmp.t_bms_service_sub_local ON CLUSTER analysis …;
CREATE TABLE bmp.t_bms_service_sub ON CLUSTER analysis() ENGINE = Distributed(‘analysis’, ‘bmp’, ‘t_bms_service_sub_local’, hiveHash(seqid));

kafka接入ck

在集群创建表 -接收kafka数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE wx_log_kafka.kafka_queue_drs_log on CLUSTER cluster_other  (
id Int64,
database String,
schema String,
table String,
opType String,
es Int64,
data Array(String),
old Array(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'xxx:9092,xxx:9092',
kafka_topic_list = 't1',
kafka_group_name = 'CMCC_BI_DBSyn',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 10,
kafka_skip_broken_messages=1000;

在集群创建表 -接收mv数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE IF NOT EXISTS wx_log_kafka.ods_drs_log_local  ON CLUSTER cluster_other
(
id Int64,
database String,
schema String,
table String,
opType String,
es String,
data Array(String),
old Array(String),
kafka_offset Int64,
datetime DateTime,
daytime Date
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/wx_log_kafka/ods_drs_log_local', '{replica}')
ORDER BY (daytime,id)
PARTITION BY daytime
TTL datetime + toIntervalDay(20)
SETTINGS index_granularity=8192

创建集群表 - 供外部访问结果

1
2
3
4
5
6
7
CREATE TABLE wx_log_kafka.ods_drs_log ON CLUSTER cluster_other AS wx_log_kafka.ods_drs_log_local
ENGINE = Distributed(cluster_other, wx_log_kafka, ods_drs_log_local ,rand())
SETTINGS
fsync_after_insert=1,
fsync_directories=1
;

创建mv表 - 写数据到目标表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE MATERIALIZED VIEW wx_log_kafka.kafka_consumer_drs_log  ON CLUSTER cluster_other TO wx_log_kafka.ods_drs_log
AS
SELECT
id ,
database ,
schema ,
table ,
opType ,
es ,
data ,
old ,
_offset ,
toDateTime(_timestamp) AS datetime,
toDateTime(_timestamp) AS daytime
FROM wx_log_kafka.kafka_queue_drs_log

更新kafka配置

DETACH/ATTACH [视图表名]

DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] [SYNC]
ATTACH TABLE|DICTIONARY [IF NOT EXISTS] [db.]name [ON CLUSTER cluster] …

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
DETACH VIEW wx_log_kafka.kafka_consumer_drs_log ON CLUSTER cluster_other;

--必须删除原底表
drop table wx_log_kafka.kafka_queue_drs_log ON CLUSTER cluster_other;

CREATE TABLE wx_log_kafka.kafka_queue_drs_log on CLUSTER cluster_other (
id Int64,
database String,
schema String,
table String,
opType String,
es Int64,
data Array(String),
old Array(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'xxx:9092,xxx:9092',
kafka_topic_list = 't1,t2',
kafka_group_name = 'CMCC_BI_DBSyn',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 10,
kafka_skip_broken_messages=1000;

ATTACH table wx_log_kafka.kafka_consumer_drs_log ON CLUSTER cluster_other;

spark连接clickhouse

housepower

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    <dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-spark-runtime-3.2_2.12</artifactId>
<version>0.2.0</version>
</dependency>

val ck = SparkSession.builder
.config("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "north-191")
.config("spark.sql.catalog.clickhouse.grpc_port", "9100") // 9109
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "xxx")
.config("spark.sql.catalog.clickhouse.database", "bmp")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()

ck.sessionState.catalog.listTables("default")

ck.sql("use clickhouse").show()
ck.sql("show tables").show()
ck.sql("select * from t_bms_service_sub").show()

jdbc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    <dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch9</version>
<classifier>http</classifier>
</dependency>

val sub = spark.read.format("jdbc")
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
.option("url", "jdbc:clickhouse://ip:8123/default")
.option("dbtable", "t ")
.option("user", "default")
.option("password", "")
.load()
.show()
0%