Article May 26, 2022

Delta Lake

Words count 11k Reading time 10 mins. Read count 0

https://docs.delta.io/1.2.1/delta-utility.html

Release

Delta Lake 2.0.x - for spark 3.2

Delta Lake 2.1/2.x - for spark 3.3

入口

sql

1
2
3
4
5
6
7
8
9
10
spark-sql \
--jars delta-core_2.12-2.0.2.jar,delta-storage-2.0.2.jar \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"


spark-sql \
--jars delta-spark_2.12-3.2.1.jar,delta-storage-3.2.1.jar \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
spark-shell \
--jars delta-core_2.12-1.2.1.jar,delta-storage-1.2.1.jar

val spark = SparkSession.builder
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.enableHiveSupport()
.getOrCreate()

// ==read write==
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()


// ==update delete==
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")
deltaTable.toDF.show()

// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

spark.sql("DELETE FROM people10m WHERE birthDate < '1955-01-01'").show(false)


// Upsert (merge) new data
val newData = spark.range(0, 7).toDF
deltaTable.as("oldData")
.merge(newData.as("newData"),
"oldData.id = newData.id")
.whenMatched
.update(Map("id" -> col("newData.id")))
// .updateAll()
.whenNotMatched
.insert(Map("id" -> col("newData.id")))
// .insertAll()
.execute()

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName
WHEN NOT MATCHED
THEN INSERT (
id,
firstName
)
VALUES (
people10mupdates.id,
people10mupdates.firstName
)

with changes as (
select seqid, userid, serviceid, startdate, enddate, updatetime
from label.bmp_mq
where year='2022' and month='05' and day='14')
MERGE INTO label.bmp_service_sub
USING changes
ON label.bmp_service_sub.seqid = changes.seqid
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

建表

1.CREATE TABLE test.people10m (id INT, firstName STRING) USING DELTA

  1. 可以将现有的hive-parquet表转换为delta表
    1
    spark.sql("CONVERT TO DELTA <TBL>")

历史版本

列出历史版本

deltaTable.history().show(5, false)
DESCRIBE HISTORY

查询历史版本

当前sql方式不支持

spark.read.format(“delta”).option(“versionAsOf”,1)
.load(“/hive/warehouse/label.db/bmp_service_sub”)
.where(“seqid= ‘2733648354’ “).show()

清理历史数据

// collapse small files into larger ones
OPTIMIZE default.people10m

// Clean up snapshots
deltaTable.vacuum(24) // 小时

VACUUM dbName.tableName; // 7天之前

set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM dbName.tableName RETAIN 24 HOURS;
VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN;

Rollback

需要hadoop3.0版本,否则会报Path不可序列化 commit
deltaTable.restoreToVersion(90)

数据更新

delta lake以增量写文件的方式支持数据的更新和删除。

匹配数据,定位需要删除的行和涉及的文件;
将这些文件中需要保留的数据重写到新的文件,然后给旧文件打上墓碑标记。
删除、更新、合并(merge)都是这个流程。

事务日志

事务日志是delta lake的核心,它记录了delta表相关的所有commit操作。

  • delta表是一个目录,表的根目录除了表数据外,有一个_delta_log目录,用来存放事务日志;
  • 事务日志记录了从最初的delta表开始的所有commit事件,每个commit形成一个json文件,文件名是严格递增的,文件名就是版本号。
  • (默认)每10个json合并成一个parquet格式的checkpoint文件,记录之前所有的commit。
  • 事务日志有一个最新checkpoint的文件(_delta_log/_last_checkpoint),spark读的时候会自动跳到最新的checkpoint,然后再读之后的json。
  • delta lake 使用乐观的并发控制,当多个用户同时写数据时,(读数据是读当前最新版本的快照,互不影响),都是生成一个新版本的数据文件(文件名不重复),在提交commit时生成下一个版本的日志文件,因为日志版本号是连续递增的,如果检测到了同名的文件已存在,则说明有其他用户执行了新的commit,此时进行冲突检测,如果检测通过,则更新当前的snapshot,然后继续提交commit,如果未通过冲突检测,则报错。
  • 因为事务日志的存在,可以找到历史版本的数据,这也是时间穿梭的实现原理,delta lake可以根据commit记录生成历史版本的数据。
  • 新版本的数据生成后,旧版本的数据不会立刻从磁盘删除,可以使用 VACUUM 命令来删除磁盘上的历史版本数据。
0%