Contents
  1. 1. Hello World
  2. 2. 概念
    1. 2.1. 主键
  3. 3. 原理
  4. 4. 数据分布
    1. 4.1. 加载过程
    2. 4.2. 压缩块
    3. 4.3. 分布式 副本
  5. 5. kafka接入ck
    1. 5.1. 更新kafka配置
  6. 6. spark连接clickhouse
    1. 6.1. housepower
    2. 6.2. jdbc

Hello World

clickhouse start

clickhouse-client

概念

主键

primary keys: not unique, determines how the data is sorted => 理解为carbondata的sort columns

Every 8,192 rows or 10MB of data (referred to as the index granularity) creates an entry in the primary key index file. This granularity concept creates a sparse index that can easily fit in memory, and the granules represent a stripe of the smallest amount of column data that gets processed during SELECT queries.

The primary key can be defined using the PRIMARY KEY parameter.
If you define a table without a PRIMARY KEY specified, then the key becomes the tuple specified in the ORDER BY clause.
If you specify both a PRIMARY KEY and an ORDER BY, the primary key must be a subset of the sort order.

原理

OLAP毫秒级

prototype: to do just a single task well: to filter and aggregate data as fast as possible
列存
索引:内存只存储必要列的信息
压缩
向量查询 / 列式处理
可扩展性
关注底层细节: 根据每个查询的特点从多种变式中选择最合适的数据结构实现

Share Nothing 架构

shard + replica; shard间隔离

Table -> Part(类似segment 目录隔离) -> 颗粒级别的列 (单独的bin数据文件和mrk元信息文件)

数据分布

1
2
3
4
5
6
7
8
9
10
11
12
id_test 表名
├── all_1_1_0 分区
│ ├── ID.bin 默认 lz4 压缩
│ ├── ID.mrk2 记录数据块在 bin 文件中的偏移量
│ ├── StringID.bin
│ ├── StringID.mrk2
│ ├── checksums.txt 校验文件
│ ├── columns.txt 列名以及数据类型
│ ├── count.txt 数据的总行数
│ └── primary.idx 主键索引文件
├── detached
└── format_version.txt

ParitionID_MinBlockNum_MaxBlockNum_level,示例202103_1_1_0,不代表连续的block

ParitionID: 分区字段的值
MinBlockNum: 最小快序号
MaxBlockNum: 最大块序号
Level: 参与合并的次数

BlockNum是表内全局累加,每次创建一个新的分区目录,就会累加1。

加载过程

Input -> granularity=8192 (类似blocklet) -> blocksize (类似block) -> compress -> block落盘

Index:

  • 主键:以index_granularity为单位
  • MinMax:以index_granularity为单位
  • Set (distinct值):以index_granularity为单位
  • BloomFilter

压缩块

数据存储文件column.bin中存储的是一列数据。
一个压缩数据块由头信息和压缩数据两部分组成,头信息固定使用9位字节表示,具体由1个UInt8(1字节)整型和2个UInt32(4字节)整型组成,
分别代表使用的压缩算法类型、压缩后的数据大小和压缩前的数据大小。
每个压缩数据块的体积,按照其压缩前的数据字节大小,都被严格控制在64KB ~ 1MB,
其上下限分别由 min_compress-block_size(默认65535=64KB)与max_compress_block_size(默认1MB)参数指定。

每8192条记录,其实就是一条一级索引,一个索引区间压缩成一个数据块.
新版本的Clickhouse中,会默认开启自适应granularity,新增配置项index_granularity_bytes来使得一个granule的数据大小不仅取决于行数,也取决于数据大小

数据标记文件数据存储文件column.mrk与数据存储文件column.bin文件一一对应,是一级索引与数据块之间关系的数据

分布式 副本

创建集群 指定分片和副本; 注意单个节点只支持存储单个分片单个副本,否则报错 DB::Exception: There are two exactly the same ClickHouse instance
(但可以一个节点 运行多个不同端口实例)

spark-clickhouse-connector的分区数取决于shards数量

internal_replication参数,
为true代表了只写入shard内的一台,与ZooKeeper配合进行复制;
为false代表了写入shard内所有的replica,与分布式表配合进行复制。

使用写分布式表的缺点:
①使用写分布式表进行复制,则可能出现写一边成功一边失败的情况,数据的一致性不可控,
②在一台服务器宕机一阵子以后,再恢复过来则这个时间段里面的数据不能自动恢复从另外的replica恢复过来。

https://blog.csdn.net/weixin_43786255/article/details/106292490

CREATE DATABASE IF NOT EXISTS bmp ON CLUSTER analysis;
drop table bmp.t_bms_service_sub_local ON CLUSTER analysis;
CREATE TABLE IF NOT EXISTS bmp.t_bms_service_sub_local ON CLUSTER analysis …;
CREATE TABLE bmp.t_bms_service_sub ON CLUSTER analysis() ENGINE = Distributed(‘analysis’, ‘bmp’, ‘t_bms_service_sub_local’, hiveHash(seqid));

kafka接入ck

在集群创建表 -接收kafka数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE wx_log_kafka.kafka_queue_drs_log on CLUSTER cluster_other  (
id Int64,
database String,
schema String,
table String,
opType String,
es Int64,
data Array(String),
old Array(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'xxx:9092,xxx:9092',
kafka_topic_list = 't1',
kafka_group_name = 'CMCC_BI_DBSyn',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 10,
kafka_skip_broken_messages=1000;

在集群创建表 -接收mv数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE IF NOT EXISTS wx_log_kafka.ods_drs_log_local  ON CLUSTER cluster_other
(
id Int64,
database String,
schema String,
table String,
opType String,
es String,
data Array(String),
old Array(String),
kafka_offset Int64,
datetime DateTime,
daytime Date
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/wx_log_kafka/ods_drs_log_local', '{replica}')
ORDER BY (daytime,id)
PARTITION BY daytime
TTL datetime + toIntervalDay(20)
SETTINGS index_granularity=8192

创建集群表 - 供外部访问结果

1
2
3
4
5
6
7
CREATE TABLE wx_log_kafka.ods_drs_log ON CLUSTER cluster_other AS wx_log_kafka.ods_drs_log_local
ENGINE = Distributed(cluster_other, wx_log_kafka, ods_drs_log_local ,rand())
SETTINGS
fsync_after_insert=1,
fsync_directories=1
;

创建mv表 - 写数据到目标表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE MATERIALIZED VIEW wx_log_kafka.kafka_consumer_drs_log  ON CLUSTER cluster_other TO wx_log_kafka.ods_drs_log
AS
SELECT
id ,
database ,
schema ,
table ,
opType ,
es ,
data ,
old ,
_offset ,
toDateTime(_timestamp) AS datetime,
toDateTime(_timestamp) AS daytime
FROM wx_log_kafka.kafka_queue_drs_log

更新kafka配置

DETACH/ATTACH [视图表名]

DETACH TABLE|VIEW|DICTIONARY [IF EXISTS] [db.]name [ON CLUSTER cluster] [PERMANENTLY] [SYNC]
ATTACH TABLE|DICTIONARY [IF NOT EXISTS] [db.]name [ON CLUSTER cluster] …

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
DETACH VIEW wx_log_kafka.kafka_consumer_drs_log ON CLUSTER cluster_other;

--必须删除原底表
drop table wx_log_kafka.kafka_queue_drs_log ON CLUSTER cluster_other;

CREATE TABLE wx_log_kafka.kafka_queue_drs_log on CLUSTER cluster_other (
id Int64,
database String,
schema String,
table String,
opType String,
es Int64,
data Array(String),
old Array(String)
) ENGINE = Kafka SETTINGS kafka_broker_list = 'xxx:9092,xxx:9092',
kafka_topic_list = 't1,t2',
kafka_group_name = 'CMCC_BI_DBSyn',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 10,
kafka_skip_broken_messages=1000;

ATTACH table wx_log_kafka.kafka_consumer_drs_log ON CLUSTER cluster_other;

spark连接clickhouse

housepower

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    <dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-spark-runtime-3.2_2.12</artifactId>
<version>0.2.0</version>
</dependency>

val ck = SparkSession.builder
.config("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "north-191")
.config("spark.sql.catalog.clickhouse.grpc_port", "9100") // 9109
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "xxx")
.config("spark.sql.catalog.clickhouse.database", "bmp")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()

ck.sessionState.catalog.listTables("default")

ck.sql("use clickhouse").show()
ck.sql("show tables").show()
ck.sql("select * from t_bms_service_sub").show()

jdbc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    <dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch9</version>
<classifier>http</classifier>
</dependency>

val sub = spark.read.format("jdbc")
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
.option("url", "jdbc:clickhouse://ip:8123/default")
.option("dbtable", "t ")
.option("user", "default")
.option("password", "")
.load()
.show()