Contents

用途场景:

  • schema演进:
    • 增删改列,重新排序,只修改元数据而不动数据文件
    • 内部使用唯一ID代替列
  • 隐式分区:
    • 无须显式指定分区列作为条件
    • 若分区来自某字段转换,两个字段均可生效分区过滤
    • 时间格式自动转换
    • 不通过目录划分分区
  • 分区演进:只修改元数据,旧数据不变(新分区字段null代替),新数据用新分区
  • 版本管理
    • time travel
    • 版本回滚
    • 版本老化清理后,数据文件不一定删除,直至不被任何版本引用

两级元数据:

  • manifest list: 包含manifests快照列表,及其对应分区字段的取值范围
  • Manifest files: 包含data files列表、及其所属分区、列级统计信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
SPARK_HOME=/data/soft/spark-3.5.2-bin-hadoop3

$SPARK_HOME/bin/spark-sql --jars iceberg-spark-runtime-3.5_2.12-1.6.0.jar \
--driver-memory 2g \
--driver-cores 2 \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 2 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive


CREATE TABLE test.ic (id bigint, data string) USING iceberg;
INSERT INTO test.ic VALUES (1, 'a'), (2, 'b'), (3, 'c');
select * from test.ic;

MERGE INTO test.ic t USING (SELECT 1 as id,'g' as data) u ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.data = u.data
WHEN NOT MATCHED THEN INSERT *

val tbl = spark.read.format("iceberg").load("/hive/warehouse/hudi_ods.db/t_asset_file_last_v2")
val tbl = spark.table("hudi_ods.t_asset_file_last")

select * from hudi_ods.t_asset_file_last.history


import java.util.HashMap
import java.util.Map
import org.apache.iceberg.hive.HiveCatalog

val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration) // Optionally use Spark's Hadoop configuration
val properties = new HashMap[String, String]()
properties.put("warehouse", "/hive/warehouse")
catalog.initialize("hive", properties);

import org.apache.iceberg.Table
import org.apache.iceberg.catalog.TableIdentifier
val name = TableIdentifier.of("hudi_ods", "t_asset_file_last")
//val name = TableIdentifier.of("hudi_ods", "drs_content")
val table = catalog.loadTable(name)


val tsToExpire = System.currentTimeMillis() - (1000 * 60 * 60 * 24 * 5 ) // 5 day
table.expireSnapshots().expireOlderThan(tsToExpire).commit()

SparkActions
.get()
.deleteOrphanFiles(table)
.execute()


$SPARK_HOME/bin/spark-shell --master local[2] --jars spark-avro_2.12-3.2.3.jar

val data = spark.read.format("avro").load("hdfs://bicluster/hive/warehouse/hudi_ods.db/t_asset_file_last_v2/metadata/snap-4703358464354234152-1-9930e33d-1861-4149-adbd-68cc8838ef11.avro")


和delta相似,元数据存储方式为json+avro