Contents
  1. 1. Release
  2. 2. 入口
    1. 2.1. sql
    2. 2.2. shell
  3. 3. 建表
  4. 4. 历史版本
    1. 4.1. 列出历史版本
    2. 4.2. 查询历史版本
    3. 4.3. 清理历史数据
    4. 4.4. Rollback
  5. 5. 数据更新
  • 事务日志
  • https://docs.delta.io/1.2.1/delta-utility.html

    Release

    Delta Lake 2.0.x - for spark 3.2

    Delta Lake 2.1/2.x - for spark 3.3

    入口

    sql

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    spark-sql \
    --jars delta-core_2.12-2.0.2.jar,delta-storage-2.0.2.jar \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"


    spark-sql \
    --jars delta-spark_2.12-3.2.1.jar,delta-storage-3.2.1.jar \
    --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
    --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

    shell

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    spark-shell \
    --jars delta-core_2.12-1.2.1.jar,delta-storage-1.2.1.jar

    val spark = SparkSession.builder
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .enableHiveSupport()
    .getOrCreate()

    // ==read write==
    val data = spark.range(0, 5)
    data.write.format("delta").save("/tmp/delta-table")
    val df = spark.read.format("delta").load("/tmp/delta-table")
    df.show()


    // ==update delete==
    import io.delta.tables._
    import org.apache.spark.sql.functions._

    val deltaTable = DeltaTable.forPath("/tmp/delta-table")
    deltaTable.toDF.show()

    // Delete every even value
    deltaTable.delete(condition = expr("id % 2 == 0"))

    spark.sql("DELETE FROM people10m WHERE birthDate < '1955-01-01'").show(false)


    // Upsert (merge) new data
    val newData = spark.range(0, 7).toDF
    deltaTable.as("oldData")
    .merge(newData.as("newData"),
    "oldData.id = newData.id")
    .whenMatched
    .update(Map("id" -> col("newData.id")))
    // .updateAll()
    .whenNotMatched
    .insert(Map("id" -> col("newData.id")))
    // .insertAll()
    .execute()

    MERGE INTO people10m
    USING people10mupdates
    ON people10m.id = people10mupdates.id
    WHEN MATCHED THEN
    UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName
    WHEN NOT MATCHED
    THEN INSERT (
    id,
    firstName
    )
    VALUES (
    people10mupdates.id,
    people10mupdates.firstName
    )

    with changes as (
    select seqid, userid, serviceid, startdate, enddate, updatetime
    from label.bmp_mq
    where year='2022' and month='05' and day='14')
    MERGE INTO label.bmp_service_sub
    USING changes
    ON label.bmp_service_sub.seqid = changes.seqid
    WHEN MATCHED THEN
    UPDATE SET *
    WHEN NOT MATCHED
    THEN INSERT *

    建表

    1.CREATE TABLE test.people10m (id INT, firstName STRING) USING DELTA

    1. 可以将现有的hive-parquet表转换为delta表
      1
      spark.sql("CONVERT TO DELTA <TBL>")

    历史版本

    列出历史版本

    deltaTable.history().show(5, false)
    DESCRIBE HISTORY

    查询历史版本

    当前sql方式不支持

    spark.read.format(“delta”).option(“versionAsOf”,1)
    .load(“/hive/warehouse/label.db/bmp_service_sub”)
    .where(“seqid= ‘2733648354’ “).show()

    清理历史数据

    // collapse small files into larger ones
    OPTIMIZE default.people10m

    // Clean up snapshots
    deltaTable.vacuum(24) // 小时

    VACUUM dbName.tableName; // 7天之前

    set spark.databricks.delta.retentionDurationCheck.enabled = false;
    VACUUM dbName.tableName RETAIN 24 HOURS;
    VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN;

    Rollback

    需要hadoop3.0版本,否则会报Path不可序列化 commit
    deltaTable.restoreToVersion(90)

    数据更新

    delta lake以增量写文件的方式支持数据的更新和删除。

    匹配数据,定位需要删除的行和涉及的文件;
    将这些文件中需要保留的数据重写到新的文件,然后给旧文件打上墓碑标记。
    删除、更新、合并(merge)都是这个流程。

    事务日志

    事务日志是delta lake的核心,它记录了delta表相关的所有commit操作。

    • delta表是一个目录,表的根目录除了表数据外,有一个_delta_log目录,用来存放事务日志;
    • 事务日志记录了从最初的delta表开始的所有commit事件,每个commit形成一个json文件,文件名是严格递增的,文件名就是版本号。
    • (默认)每10个json合并成一个parquet格式的checkpoint文件,记录之前所有的commit。
    • 事务日志有一个最新checkpoint的文件(_delta_log/_last_checkpoint),spark读的时候会自动跳到最新的checkpoint,然后再读之后的json。
    • delta lake 使用乐观的并发控制,当多个用户同时写数据时,(读数据是读当前最新版本的快照,互不影响),都是生成一个新版本的数据文件(文件名不重复),在提交commit时生成下一个版本的日志文件,因为日志版本号是连续递增的,如果检测到了同名的文件已存在,则说明有其他用户执行了新的commit,此时进行冲突检测,如果检测通过,则更新当前的snapshot,然后继续提交commit,如果未通过冲突检测,则报错。
    • 因为事务日志的存在,可以找到历史版本的数据,这也是时间穿梭的实现原理,delta lake可以根据commit记录生成历史版本的数据。
    • 新版本的数据生成后,旧版本的数据不会立刻从磁盘删除,可以使用 VACUUM 命令来删除磁盘上的历史版本数据。