Delta Lake 全面解析

Delta Lake 在 Spark + AI Summit 2019 宣布开源,引起了不小的震动,这到底是何方神圣?本文将从什么是 Delta Lake、它有那些特点、它是如何实现的,以及它的出现对未来大数据领域和大数据从业者可能有什么影响这些角度,全面解析这一新一代的文件存储层。

Delta Lake

Delta Lake 是 Databricks (俗称"砖厂") 开源的一个文件存储层,目前运行在 Spark 上。它主要提供了以下功能(摘自官网):

  1. ACID 事务(ACID transactions)
  2. Schema 相关特性:Schema 本地存储、支持 Schema 约束、Schema 演变 (Scalable Metadata Handling、Schema Enforcement、Schema Evolution)
  3. 数据版本控制 (Time Travel (data versioning))
  4. 支持数据更新删除 (Updates and Deletes)
  5. 统一了流数据和批处理数据落地 (Unified batch and streaming sink)

特性详细介绍

下面将会对Delta Lake 的特性详细介绍。

ACID 事务

这里的 ACID 就是指原子性、一致性、隔离性和持久性。一致性、持久性之前就已经实现,这里主要是解释一下原子性和隔离性。
原子性: 一个事务要么成功要么失败,不存在中间状态。这一个特性对于数据的准确性,特别是出现失败时候仍然保持准确性至关重要。当 job 失败时已经写入的数据会自动回滚到未写入的状态,不需要手工处理。
隔离性: 基于乐观的并发控制实现可序列花的隔离级别。乐观的并发控制在竞争不很激烈的情况下,会提高性能。可序列花的隔离级别保证了即使并发执行读或写操作,仍然保证像在串行读写一样。
具体的,Delta Lake 支持并发读取、并发追加(追加的内容不依赖于任何的读取已经存在的数据),但不支持并发修改,出现并发修改会抛出 concurrent modification exception。
隔离性保证无论是并发批处理操作、流操作或者是批流并发操作,数据都是准确的

Schema 相关特性

Hive Metastore 一般用中心化的存储(如 MySQL) 对 Schema 进行管理。在 Schema 数量特别巨大时(比如分区数特别多),由于中心化存储伸缩性是非线性的,容易形成瓶颈。
Delta Lake 将元数据同样视为数据,保存在文件中,使得对大数据的处理能力可以运用在处理元数据上。
由于元数据由文件进行管理,所以有了更大的灵活性和可能性,Schema Enforcement (强制检验数据的 Schema,不通过则拒绝),Schema Evolution (根据数据自动更改 Schema,无需手动指定)

数据版本控制

事务都有了,实现版本控制也是顺带的事。Delta Lake 可以读取某个版本的数据或者恢复数据到某一个历史版本。

支持数据更新删除

支持包括单条数据、批量数据的更新和删除

统一了流数据和批处理数据落地

Delta Lake 还统一了流数据和批处理数据落地,而不需要最开始提到的 Lambda 架构。极大的简化了系统的复杂性

Delta Lake 快速体验

使用 Delta Lake 需要 Spark 2.4.2 及以上版本,如果仅仅想体验一下,可以使用 Docker 版 Spark

  • 安装 Spark
1
2
3
4
5
6
7
8
9
10
11
12
13
docker run -d --name spark-master -h spark-master \
-p 8080:8080 \
-p 7077:7077 \
-p 4040:4040 \
-e ENABLE_INIT_DAEMON=false \
bde2020/spark-master:2.4.3-hadoop2.7


docker run -d --name spark-worker-1 \
-p 8081:8081 \
--link spark-master:spark-master \
-e ENABLE_INIT_DAEMON=false \
bde2020/spark-worker:2.4.3-hadoop2.7
  • 进入 Spark Shell
1
docker exec -it spark-master  /spark/bin/spark-shell --packages io.delta:delta-core_2.11:0.3.0
  • 创建 Delta Table
1
2
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

实现原理

Delta Lake 的文件结构

Delta Lake 在原有的 Parquet 文件的基础上,增加了 _delta_log 文件夹。_delta_log 文件夹内包含 json 文件和 checkpoint.parquet 文件。
_delta_log 结构

  • 这些 json 文件称为 transaction log。其文件名是递增的,数字代表版本。每一个文件代表一个事务,存储了 Schema 信息,对文件的操作等。
transaction log 内容
1
2
3
4
5
{"commitInfo":{"timestamp":1563414817197,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"72e1fda6-6860-477a-94bf-924c5935818b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,
\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1563414816435}}
{"add":{"path":"part-00000-90a5fe90-b039-4cce-92be-ff70abc6aeac-c000.snappy.parquet","partitionValues":{},"size":263,"modificationTime":1563414816000,"dataChange":true}}
  • checkpoint.parquet 是检查点,可以加速数据的读取

没有检查点的情况下 ,需要从头开始读取 transaction log,重放每一个 transaction log 的文件操作,才会得到所需要的结果,有了检查点,获取数据的某一个版本时,只需要从距离版本最近的检查点,重放版本和检查点的 transaction log 即可得到指定版本的数据

实现原子操作

写入失败时: 事务不提交,不形成 transaction log 文件,本次事务写入的文件就不会纳入到当前表中。
写入成功时: 事务提交,transaction log 原子的生成,于是数据变持久性存在于当前 delta table 中

实现可序列化隔离级别

可序列化隔离的实现基于乐观并发控制
并发修改的情况: 事务开始是读取最新版本的数据,输出数据产生一个新的版本,在事务提交之前,检查是否还有其他提交的 transaction log 与本次提交有冲突,如果有冲突,抛出 ConcurrentModificationException。写入的数据不会提交,因此不会生效
并发追加的情况: 与之类似,不同点在于发现冲突时会检查 Schema 是否变化,如果没有变化,会自动重试,不会抛出异常

关于 Delta Lake 的思考

像 Delta Lake、 Netflix 的 iceberg 这种新一代的文件格式的出现,解决了大数据发展中批流存储不统一、不支持事务等等痛点。

Spark 诞生之初,就在计算模型上实现了批处理和流处理的统一,现在 Delta Lake 的出现,在存储层也将实现统一。
批流处理的大一统后不但意味着可以消除像 Lambda 架构这种变通的解决方案,而且目前常用的基于批处理的数据加工方式也可也会被流式的数据处理方式所取代。再加上可以支持事务。

届时,开发者可以把更多精力放在"如何从数据中提取有用的信息"这样一件事情上,更多的关注数据流应该如何变化,而不是关注任务说明时候执行,任务失败了怎么重试等等问题。

所以像 Lambda 架构、各种数据准确性检验任务、不同系统的数据导入工具、调度执行批处理任务等等,这些在大数据领域习以为常的解决方案会成为历史。

一方面这当然是好事,开发者可以各司其职,大数据系统也会更加统一
但另一方面,技术的升级往往会替代一部分人,而且会让我们之前的经验一文不值。所以需要我们更需要思考如何提升自己,让自己驾驭技术,而不是让技术取代自己,就像很多职业都需要自思考如何被人工智能取代一样。