val spark = SparkSession.builder .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .enableHiveSupport() .getOrCreate()
// ==read write== val data = spark.range(0, 5) data.write.format("delta").save("/tmp/delta-table") val df = spark.read.format("delta").load("/tmp/delta-table") df.show()
val deltaTable = DeltaTable.forPath("/tmp/delta-table") deltaTable.toDF.show()
// Delete every even value deltaTable.delete(condition = expr("id % 2 == 0"))
spark.sql("DELETE FROM people10m WHERE birthDate < '1955-01-01'").show(false)
// Upsert (merge) new data val newData = spark.range(0, 7).toDF deltaTable.as("oldData") .merge(newData.as("newData"), "oldData.id = newData.id") .whenMatched .update(Map("id" -> col("newData.id"))) // .updateAll() .whenNotMatched .insert(Map("id" -> col("newData.id"))) // .insertAll() .execute() MERGE INTO people10m USING people10mupdates ON people10m.id = people10mupdates.id WHEN MATCHED THEN UPDATE SET id = people10mupdates.id, firstName = people10mupdates.firstName WHEN NOT MATCHED THEN INSERT ( id, firstName ) VALUES ( people10mupdates.id, people10mupdates.firstName ) with changes as ( select seqid, userid, serviceid, startdate, enddate, updatetime from label.bmp_mq where year='2022' and month='05' and day='14') MERGE INTO label.bmp_service_sub USING changes ON label.bmp_service_sub.seqid = changes.seqid WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
建表
1.CREATE TABLE test.people10m (id INT, firstName STRING) USING DELTA
可以将现有的hive-parquet表转换为delta表
1
spark.sql("CONVERT TO DELTA <TBL>")
历史版本
列出历史版本
deltaTable.history().show(5, false) DESCRIBE HISTORY
delta lake 使用乐观的并发控制,当多个用户同时写数据时,(读数据是读当前最新版本的快照,互不影响),都是生成一个新版本的数据文件(文件名不重复),在提交commit时生成下一个版本的日志文件,因为日志版本号是连续递增的,如果检测到了同名的文件已存在,则说明有其他用户执行了新的commit,此时进行冲突检测,如果检测通过,则更新当前的snapshot,然后继续提交commit,如果未通过冲突检测,则报错。