Contents
  1. 1. 启动
    1. 1.1. docker
  2. 2. 概念
    1. 2.1. 数据类型
    2. 2.2. 数据库Engine
    3. 2.3. 表Engine
    4. 2.4. 函数
    5. 2.5. Projection加速查询 (类似MV)
    6. 2.6. 分布式表
    7. 2.7. 数据格式操作
    8. 2.8. 主键
  3. 3. 原理
  4. 4. 数据分布
    1. 4.1. 加载过程
    2. 4.2. 压缩块
    3. 4.3. 分布式 副本
  5. 5. kafka接入ck
    1. 5.1. 更新kafka配置
  6. 6. spark连接clickhouse
    1. 6.1. housepower
    2. 6.2. jdbc

启动

clickhouse start

clickhouse-client

docker

1
2
docker pull clickhouse/clickhouse-server:22.8.14.53
docker run -di --name=clickhouse-server -p 8123:8123 clickhouse/clickhouse-server:22.8.14.53

概念

数据类型

枚举值
低基数string

数据库Engine

  • Atomic(默认)
    • 数据存储在目录:/clickhouse_path/store/xxx/UUID/
    • 特别命令:exchange tables t1 and t2
  • Replicated(整库复制)
    • 基于Atomic,将DDL日志写入ZooKeeper并在给定数据库的所有副本上执行的元数据复制。
  • Lazy
    • 只在内存中保存上次访问后expiration_time_in_seconds秒,常用于Log
  • SQLite(映射访问)
  • MySQL(映射访问)
  • MaterializeMySQL(副本)
    • ClickHouse服务器作为MySQL副本工作,创建ClickHouse数据库,包含MySQL中所有的表,以及这些表中的所有数据。它读取binlog并执行DDL和DML查询。
  • PostgreSQL(映射访问)
  • MaterializePostgreSQL(副本)
    • ClickHouse服务器作为PostgreSQL副本工作。

表Engine

  • MergeTree(使用replicated引擎)
    • MergeTree
      • 快速插入,后台合并
      • 主键 或 order by排序
      • 支持分区/副本/采样sample by/TTL
    • ReplacingMergeTree
      • 删除排序键值相同的重复项
    • SummingMergeTree
      • ClickHouse可能不会完整的汇总所有行,会按片段把所有具有相同主键的行合并为一行,以至于不同的数据片段中会包含具有相同主键的行,即单个汇总片段将会是不完整的。
    • AggregatingMergeTree
      • 插入数据,需使用带有 -State- 聚合函数
    • CollapsingMergeTree
    • VersionedCollapsingMergeTree
    • GraphiteMergeTree
      • 时序数据
  • 日志(适合小表≤100w)
    • TinyLog
    • StripeLog
    • Log
  • 集成
    • Kafka
    • MySQL
    • ODBC
    • JDBC
    • HDFS
  • 其他
    • Distributed
    • MaterializedView
    • Dictionary
    • Merge
    • File
    • Null
    • Set
    • Join
    • URL
    • View
    • Memory
    • Buffer

函数

bar(price, 0, 1000000, 80 )

Projection加速查询 (类似MV)

允许我们通过存储任意格式的预先聚合的数据来提高查询速度。在执行时,如果 ClickHouse 认为 Projection 可以提高查询的性能,它将使用 Projection(何时使用由 ClickHouse 决定)。

分布式表

实际上是一种view(像HDFS联邦),映射到ClickHouse集群的本地表。 从分布式表中执行SELECT查询会使用集群所有分片的资源。 您可以为多个集群指定configs,并创建多个分布式表,为不同的集群提供视图。

ClickHouse负责所有副本的数据一致性,并在失败后自动运行恢复过程。建议将ZooKeeper集群部署在单独的服务器上(其中没有其他进程,包括运行的ClickHouse)

数据格式操作

1
2
3
4
5
6
7
8
9
10
# CSV
$ clickhouse-client --format_csv_delimiter="|" --query="INSERT INTO test.csv FORMAT CSV" < data.csv

# Parquet
$ cat {filename} | clickhouse-client --query="INSERT INTO {some_table} FORMAT Parquet"
$ clickhouse-client --query="SELECT * FROM {some_table} FORMAT Parquet" > {some_file.pq}

# ORC
$ cat filename.orc | clickhouse-client --query="INSERT INTO some_table FORMAT ORC"
$ clickhouse-client --query="SELECT * FROM {some_table} FORMAT ORC" > {filename.orc}

主键

primary keys: not unique, determines how the data is sorted => 理解为carbondata的sort columns

Every 8,192 rows or 10MB of data (referred to as the index granularity) creates an entry in the primary key index file. This granularity concept creates a sparse index that can easily fit in memory, and the granules represent a stripe of the smallest amount of column data that gets processed during SELECT queries.

The primary key can be defined using the PRIMARY KEY parameter.
If you define a table without a PRIMARY KEY specified, then the key becomes the tuple specified in the ORDER BY clause.
If you specify both a PRIMARY KEY and an ORDER BY, the primary key must be a subset of the sort order.

原理

OLAP毫秒级

prototype: to do just a single task well: to filter and aggregate data as fast as possible
列存
索引:内存只存储必要列的信息
压缩
向量查询 / 列式处理
可扩展性
关注底层细节: 根据每个查询的特点从多种变式中选择最合适的数据结构实现

Share Nothing 架构

shard + replica; shard间隔离

Table -> Part(类似segment 目录隔离) -> 颗粒级别的列 (单独的bin数据文件和mrk元信息文件)

数据分布

1
2
3
4
5
6
7
8
9
10
11
12
id_test 表名
├── all_1_1_0 分区
│ ├── ID.bin 默认 lz4 压缩
│ ├── ID.mrk2 记录数据块在 bin 文件中的偏移量
│ ├── StringID.bin
│ ├── StringID.mrk2
│ ├── checksums.txt 校验文件
│ ├── columns.txt 列名以及数据类型
│ ├── count.txt 数据的总行数
│ └── primary.idx 主键索引文件
├── detached
└── format_version.txt

ParitionID_MinBlockNum_MaxBlockNum_level,示例202103_1_1_0,不代表连续的block

ParitionID: 分区字段的值
MinBlockNum: 最小快序号
MaxBlockNum: 最大块序号
Level: 参与合并的次数

BlockNum是表内全局累加,每次创建一个新的分区目录,就会累加1。

加载过程

Input -> granularity=8192 (类似blocklet) -> blocksize (类似block) -> compress -> block落盘

Index:

  • 主键:以index_granularity为单位
  • MinMax:以index_granularity为单位
  • Set (distinct值):以index_granularity为单位
  • BloomFilter

压缩块

数据存储文件column.bin中存储的是一列数据。
一个压缩数据块由头信息和压缩数据两部分组成,头信息固定使用9位字节表示,具体由1个UInt8(1字节)整型和2个UInt32(4字节)整型组成,
分别代表使用的压缩算法类型、压缩后的数据大小和压缩前的数据大小。
每个压缩数据块的体积,按照其压缩前的数据字节大小,都被严格控制在64KB ~ 1MB,
其上下限分别由 min_compress-block_size(默认65535=64KB)与max_compress_block_size(默认1MB)参数指定。

每8192条记录,其实就是一条一级索引,一个索引区间压缩成一个数据块.
新版本的Clickhouse中,会默认开启自适应granularity,新增配置项index_granularity_bytes来使得一个granule的数据大小不仅取决于行数,也取决于数据大小

数据标记文件数据存储文件column.mrk与数据存储文件column.bin文件一一对应,是一级索引与数据块之间关系的数据

分布式 副本

创建集群 指定分片和副本; 注意单个节点只支持存储单个分片单个副本,否则报错 DB::Exception: There are two exactly the same ClickHouse instance
(但可以一个节点 运行多个不同端口实例)

spark-clickhouse-connector的分区数取决于shards数量

internal_replication参数,
为true代表了只写入shard内的一台,与ZooKeeper配合进行复制;
为false代表了写入shard内所有的replica,与分布式表配合进行复制。

使用写分布式表的缺点:
①使用写分布式表进行复制,则可能出现写一边成功一边失败的情况,数据的一致性不可控,
②在一台服务器宕机一阵子以后,再恢复过来则这个时间段里面的数据不能自动恢复从另外的replica恢复过来。

https://blog.csdn.net/weixin_43786255/article/details/106292490

CREATE DATABASE IF NOT EXISTS bmp ON CLUSTER analysis;
drop table bmp.t_bms_service_sub_local ON CLUSTER analysis;
CREATE TABLE IF NOT EXISTS bmp.t_bms_service_sub_local ON CLUSTER analysis …;
CREATE TABLE bmp.t_bms_service_sub ON CLUSTER analysis() ENGINE = Distributed(‘analysis’, ‘bmp’, ‘t_bms_service_sub_local’, hiveHash(seqid));

kafka接入ck

在集群创建表 -接收kafka数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE wx_log_kafka.kafka_queue_drs_log on CLUSTER cluster_other  (
id Int64,
database String,
schema String,
table String,
opType String,
es Int64,
data Array(String),
old Array(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'xxx:9092,xxx:9092',
kafka_topic_list = 't1',
kafka_group_name = 'CMCC_BI_DBSyn',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 10,
kafka_skip_broken_messages=1000;

在集群创建表 -接收mv数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE IF NOT EXISTS wx_log_kafka.ods_drs_log_local  ON CLUSTER cluster_other
(
id Int64,
database String,
schema String,
table String,
opType String,
es String,
data Array(String),
old Array(String),
kafka_offset Int64,
datetime DateTime,
daytime Date
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/wx_log_kafka/ods_drs_log_local', '{replica}')
ORDER BY (daytime,id)
PARTITION BY daytime
TTL datetime + toIntervalDay(20)
SETTINGS index_granularity=8192

创建集群表 - 供外部访问结果

1
2
3
4
5
6
7
CREATE TABLE wx_log_kafka.ods_drs_log ON CLUSTER cluster_other AS wx_log_kafka.ods_drs_log_local
ENGINE = Distributed(cluster_other, wx_log_kafka, ods_drs_log_local ,rand())
SETTINGS
fsync_after_insert=1,
fsync_directories=1
;

创建mv表 - 写数据到目标表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE MATERIALIZED VIEW wx_log_kafka.kafka_consumer_drs_log  ON CLUSTER cluster_other TO wx_log_kafka.ods_drs_log
AS
SELECT
id ,
database ,
schema ,
table ,
opType ,
es ,
data ,
old ,
_offset ,
toDateTime(_timestamp) AS datetime,
toDateTime(_timestamp) AS daytime
FROM wx_log_kafka.kafka_queue_drs_log

更新kafka配置

DETACH/ATTACH [视图表名]

DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] [SYNC]
ATTACH TABLE|DICTIONARY [IF NOT EXISTS] [db.]name [ON CLUSTER cluster] …

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
DETACH VIEW wx_log_kafka.kafka_consumer_drs_log ON CLUSTER cluster_other;

--必须删除原底表
drop table wx_log_kafka.kafka_queue_drs_log ON CLUSTER cluster_other;

CREATE TABLE wx_log_kafka.kafka_queue_drs_log on CLUSTER cluster_other (
id Int64,
database String,
schema String,
table String,
opType String,
es Int64,
data Array(String),
old Array(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'xxx:9092,xxx:9092',
kafka_topic_list = 't1,t2',
kafka_group_name = 'CMCC_BI_DBSyn',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 10,
kafka_skip_broken_messages=1000;

ATTACH table wx_log_kafka.kafka_consumer_drs_log ON CLUSTER cluster_other;

spark连接clickhouse

housepower

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    <dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-spark-runtime-3.2_2.12</artifactId>
<version>0.2.0</version>
</dependency>

val ck = SparkSession.builder
.config("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "north-191")
.config("spark.sql.catalog.clickhouse.grpc_port", "9100") // 9109
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "xxx")
.config("spark.sql.catalog.clickhouse.database", "bmp")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()

ck.sessionState.catalog.listTables("default")

ck.sql("use clickhouse").show()
ck.sql("show tables").show()
ck.sql("select * from t_bms_service_sub").show()

jdbc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    <dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch9</version>
<classifier>http</classifier>
</dependency>

val sub = spark.read.format("jdbc")
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
.option("url", "jdbc:clickhouse://ip:8123/default")
.option("dbtable", "t ")
.option("user", "default")
.option("password", "")
.load()
.show()