Delta Lake relies heavily on the underlying object storage provider to ensure atomic operations on its transaction log, unfortunately AWS S3 is not fully strongly consistent which dictates application-level coordination for safe concurrent Delta operations. The Delta Lake protocol expressly relies on "atomic primitives of the underlying filesystem to ensure concurrent writers do not overwrite each others entries." In an ideal world the underlying filesystem is strongly consistent and Delta writers gain immense concurrency "for free", the reality of network-based file and object storage systems brings many limitations to Delta Lake's potential concurrency.
In the case of AWS S3's consistency
model
many operations are strongly consistent, but concurrent operations on the
same key are not. AWS encourages application-level object locking, which the
delta-rs implements using AWS DynamoDB.
While we have improved the performance of locking in recent 0.16.x
releases
of the deltalake
Rust crate, there are some real limitations to performance
of concurrent writers that the DynamoDB locking model used by Python and Rust-based Delta Lake applications.
Before diving into the limitations, it is first important to understand some fundamentals of Delta Lake's design and how the DynamoDB-based locks operate.
Skip ahead to concurrency limitations
The primitive Delta table
For purposes of discussion, imagine you are working with a Delta table named
logs.nginx
in a Databricks
notebook. Whether
using AWS Glue Data Catalog or Unity Catalog, the underlying Apache
Spark library for Delta Lake will map queries to
logs.nginx
to a location in an S3 bucket, such as
buoyant-dev/databases/logs/nginx
. The structure of this prefix in object storage is significant! A _delta_log/
prefix exists containing the Delta Lake table metadata, and other prefixes may contain Hive-style partitions with Apache Parquet formatted data files, such as:
└── databases
└── logs
└── nginx
├── _delta_log
│ └── 00000000000000000000.json
└── ds=2023-11-27
├── part-00000-2bcc9ff6-0551-4401-bd22-d361a60627e3.c000.snappy.parquet
└── part-00001-ca647ee7-f1ad-4d70-bf02-5d1872324d6f.c000.snappy.parquet
Writers are expected to create .parquet
data files named with a
UUID to avoid
conflicts, and in practice writers may upload as many of those as they want,
with as much concurrency as desired. Newly created .parquet
files are not
visible to readers or other writers of the table until the transaction log,
located in _delta_log/
is updated.
With the example table above, the next transaction would create the
00000000000000000001.json
, representing "version 1" of the table. Imagine two
concurrent writers, Alpha and Bravo, which each believe the next version of the logs.nginx
table should be 2
. If they both issue a PutObject
request for their
version of 00000000000000000002.json
then only the last writer will win.
This will result in an incorrect table state since what Alpha wrote into S3
was overwritten by Bravo and any add, update, delete actions contained within
have been lost!
Writers Alpha and Bravo must coordinate their writes to avoid this scenario.
DynamoDB-based locking
The need for consistent and concurrent writes to AWS S3 has been a well
understood problem in the Delta Lake community with three prevailing solutions:
dymamodb-lock, S3DynamoDbLogStore
, and the "S3 commit service." Customers of
Databricks who rely only upon Spark-based applications for reading and writing
to Delta tables benefit from the S3 commit
service
which manages write coordination behind the scenes. Delta Lake Spark users
outside of the Databricks environment can utilize the
S3DynamoDbLogStore
which implements an algorithm for concurrent writes for Spark applications. This post
focuses on the
dynamodb-lock based
approach which is the current default for the deltalake
Python and Rust
packages.
At its core the dynamodb-lock
approach relies on a coordination table in the strongly consistent AWS
DynamoDB service. While the table name can
be configured with the DYNAMO_LOCK_TABLE_NAME
environment variable or
configuration option, it is conventionally named: delta_rs_lock_table
and contains:
key | leaseDuration | owner |
---|---|---|
s3://buoyant-dev/databases/logs/nginx | 1701153396 | writer-bravo |
The key
is expected to be the Delta table that is being locked, though the
specific format of this key may change between major changes of the deltalake
library. The leaseDuration
is expected to be the
TTL
for the table and represents the "liveness" of the acquired lock by the writer.
Finally owner
is expected to be some user-provided identifier of the writer
(DYNAMO_LOCK_OWNER_NAME
) that allows a process resume its lock following a
crash or interruption.
If another owner
has acquired the lock for the given table key, then the
writer will either fail immediately or loop while it tries to acquire the
lock.
Limitations on locking
The dynamodb-lock approach allows for some sensible cooperation between
concurrent writers but the key limitation is that all concurrent operations
must synchronize on the table itself. There is no smaller division of
concurrency than a table operation, which presents no concerns with the
contrived two writer setup described above or logs.nginx
.
There are however practical limitations to this model as the number of concurrent writers increases, which inherently will increase the opportunity for writers to be stuck waiting to acquire a shared singular lock on the table.
For example: imagine a writer has an application-level timeout of 10 seconds to commit a write. Systems like AWS SQS require a "visibility timeout" on messages where consumers have a fixed length of time to complete their work before a message is made visible and available to another consumer.
With an external limitation of 10 seconds applied, if loading the table and committing a write takes 1s, then the theoretical limit on concurrent writers is 10, since that is the maximum amount of times the lock can be acquired, used, and released within a 10 second timeout.
In practice the maximum possible concurrent writers is lower unless the writers are guaranteed a fair and round-robin distribution of workload, otherwise writers with exceptionally bad luck could be perpetually starved of the lock.
The distribution of workloads can also adversely affect the number of possible
concurrent writers. Imagine a workload which will enqueues 100 new versions
for each of three different tables: logs.nginx
, logs.alb
, logs.cdn
.
+-----+-----------------------+
| 1n | 1a | 1c | 2n | 2a | 2c |
+-----+-----------------------+
A "fair" distribution of these versions from the queue can improve the maximum concurrency but only so long as there are many discrete tables being modified concurrently. In this case, having more than three concurrent writers will lead to diminishing returns and wasted runtime waiting for lock acquisition.
An "unfair" distribution of these new versions from the queue will have noticeably worse concurrency performance:
+-----+------------------------------------------------+
| 1n | 2n | 3n | 4n | 5n | 6n | 1a | 2a | 3a | 1c | 2c |
+-----+------------------------------------------------+
Concurrent writers with the above distribution, assuming a FIFO-style queue
consumption model, will spend substantially more time waiting for the lock as
the first 6 versions are processed, since they will all be contending for the
same lock on the logs.nginx
table.
Curse of stale table state
One of the common patterns for optimizing concurrent writers is to minimize the amount of time holding the lock. The .parquet
files can created and uploaded without ever taking the lock. Rather than acquiring the lock for "doing the data processing work", applications should only acquire the lock when they need to create and commit a transaction to the Delta log.
Version files for Delta tables use monotonically increasing version numbers. For concurrent writers this means that before a new transaction is created the existing table state must be loaded. Not only does this mean that the lock must be acquired before loading table state, it also means that there is potential for variable performance as the Delta table grows.
Delta supports checkpoints which provide a key performance optimization for loading table state, but these checkpoint files are only conventionally created every 10th version. This means a writer which acquires the lock starts to load the table state at version 6 must load:
└── _delta_log
└── 00000000000000000000.json
└── 00000000000000000001.json
└── 00000000000000000002.json
└── 00000000000000000003.json
└── 00000000000000000004.json
└── 00000000000000000005.json
└── 00000000000000000005.json
This means it must execute 7 total GetObject
network calls to S3 (among a few others) before it can do any of its meaningful work writing the table.
The lucky writer which loads the table state at version 10 would load:
└── _delta_log
└── _last_checkpoint
└── 00000000000000000010.checkpoint.parquet
Resulting in 2 effective GetObject
network calls (among a few others) before
it can do its meaningful work of writing the next version of the table
The differences in loading table state for concurrent writers results in an unavoidable variability in runtime performance of the critical section holding the lock.
At the end of the day, the fundamental challenge is that only one writer can operate at any given time on a unique Delta table. The most efficient number of concurrent writers for a given table is one.
In practice however, most data processing workloads are doing more than strictly writing to the Delta table. There may be some data computation the writer must do, or enriching of data with third party sources, all of which can likely be done concurrently outside of the dynamodb-lock critical section.
Coalesce is more
For write-heavy, typically append-only, Delta Lake workloads where there is little to no data processing happening, there are still benefits to concurrent writers especially for a writer which handles multiple tables simultaneously such as kafka-delta-ingest. Coalescing the data can help improve the efficiency and performance of writers which need to acquire the DynamoDB-based lock.
One such pattern requires coalescing by table URI into discrete queues. Rather than handling inbound writes all together, a worker can be deployed for each discrete table, and therefore maintain sole ownership of the lock. With kafka-delta-ingest a common pattern is to deploy a single writer for each topic and partition, which has two benefits:
- Ensures all data is consumed from Kafka in a way that guarantees exactly-once semantics
- Sets an optimal number of concurrent Delta writers for a given table to the total number of partitions for its topic, minimizing to the extent possible the amount of lock acquisition overhead.
Optimizing performance by coalescing of workload can take another more Delta-specific form. As already discussed, concurrent writers are limited because they need to acquire a lock to write a new version to a single table. Each version of a Delta table however can have multiple actions which modify the state of the table. In the case of an append-only writer, it would be suboptimal to attempt to have 1000 concurrent writers fighting for the lock to write 1000 versions, each with a single add action. Instead, if possible, coalescing those add actions to where 10 concurrent writers are each writing a new version containing 100 add actions. This approach provides two substantial benefits:
- Fewer concurrent writers spend less time waiting for or attempting to acquire the lock.
- Reduction of "version bloat", where a large number of versions are added to the table, which benefits writers attempting to load table state but also readers attempting to query the table.
Until a strongly consistent "put if absent" style operation exists in S3, the fundamental limitation of concurrent writers will remain: writes must be serialized for any given table. There is however a lot of flexibility that can be built around this limitation to ensure highly efficient writes to Delta Lake tables in AWS S3 in both Python and Rust.
Improving performance of Rust and Python data processing infrastructure is something I love to do! If you're thinking about reducing costs by moving write workloads out of Spark or Databricks, drop me an email and we'll chat!!