Concurrency limitations for Delta Lake on AWS

November 27, 2023

by R. Tyler Croy

rust

deltalake

Delta Lake relies heavily on the underlying object storage provider to ensure atomic operations on its transaction log, unfortunately AWS S3 is not fully strongly consistent which dictates application-level coordination for safe concurrent Delta operations. The Delta Lake protocol expressly relies on "atomic primitives of the underlying filesystem to ensure concurrent writers do not overwrite each others entries." In an ideal world the underlying filesystem is strongly consistent and Delta writers gain immense concurrency "for free", the reality of network-based file and object storage systems brings many limitations to Delta Lake's potential concurrency.

In the case of AWS S3's consistency model many operations are strongly consistent, but concurrent operations on the same key are not. AWS encourages application-level object locking, which the delta-rs implements using AWS DynamoDB. While we have improved the performance of locking in recent 0.16.x releases of the deltalake Rust crate, there are some real limitations to performance of concurrent writers that the DynamoDB locking model used by Python and Rust-based Delta Lake applications.

Before diving into the limitations, it is first important to understand some fundamentals of Delta Lake's design and how the DynamoDB-based locks operate.

Skip ahead to concurrency limitations

The primitive Delta table

For purposes of discussion, imagine you are working with a Delta table named logs.nginx in a Databricks notebook. Whether using AWS Glue Data Catalog or Unity Catalog, the underlying Apache Spark library for Delta Lake will map queries to logs.nginx to a location in an S3 bucket, such as buoyant-dev/databases/logs/nginx. The structure of this prefix in object storage is significant! A _delta_log/ prefix exists containing the Delta Lake table metadata, and other prefixes may contain Hive-style partitions with Apache Parquet formatted data files, such as:

└── databases
    └── logs
        └── nginx
            ├── _delta_log
            │   └── 00000000000000000000.json
            └── ds=2023-11-27
                ├── part-00000-2bcc9ff6-0551-4401-bd22-d361a60627e3.c000.snappy.parquet
                └── part-00001-ca647ee7-f1ad-4d70-bf02-5d1872324d6f.c000.snappy.parquet

Writers are expected to create .parquet data files named with a UUID to avoid conflicts, and in practice writers may upload as many of those as they want, with as much concurrency as desired. Newly created .parquet files are not visible to readers or other writers of the table until the transaction log, located in _delta_log/ is updated.

With the example table above, the next transaction would create the 00000000000000000001.json, representing "version 1" of the table. Imagine two concurrent writers, Alpha and Bravo, which each believe the next version of the logs.nginx table should be 2. If they both issue a PutObject request for their version of 00000000000000000002.json then only the last writer will win. This will result in an incorrect table state since what Alpha wrote into S3 was overwritten by Bravo and any add, update, delete actions contained within have been lost!

Writers Alpha and Bravo must coordinate their writes to avoid this scenario.

DynamoDB-based locking

The need for consistent and concurrent writes to AWS S3 has been a well understood problem in the Delta Lake community with three prevailing solutions: dymamodb-lock, S3DynamoDbLogStore, and the "S3 commit service." Customers of Databricks who rely only upon Spark-based applications for reading and writing to Delta tables benefit from the S3 commit service which manages write coordination behind the scenes. Delta Lake Spark users outside of the Databricks environment can utilize the S3DynamoDbLogStore which implements an algorithm for concurrent writes for Spark applications. This post focuses on the dynamodb-lock based approach which is the current default for the deltalake Python and Rust packages.

At its core the dynamodb-lock approach relies on a coordination table in the strongly consistent AWS DynamoDB service. While the table name can be configured with the DYNAMO_LOCK_TABLE_NAME environment variable or configuration option, it is conventionally named: delta_rs_lock_table and contains:

keyleaseDurationowner
s3://buoyant-dev/databases/logs/nginx1701153396writer-bravo

The key is expected to be the Delta table that is being locked, though the specific format of this key may change between major changes of the deltalake library. The leaseDuration is expected to be the TTL for the table and represents the "liveness" of the acquired lock by the writer. Finally owner is expected to be some user-provided identifier of the writer (DYNAMO_LOCK_OWNER_NAME) that allows a process resume its lock following a crash or interruption.

If another owner has acquired the lock for the given table key, then the writer will either fail immediately or loop while it tries to acquire the lock.

Limitations on locking

The dynamodb-lock approach allows for some sensible cooperation between concurrent writers but the key limitation is that all concurrent operations must synchronize on the table itself. There is no smaller division of concurrency than a table operation, which presents no concerns with the contrived two writer setup described above or logs.nginx.

There are however practical limitations to this model as the number of concurrent writers increases, which inherently will increase the opportunity for writers to be stuck waiting to acquire a shared singular lock on the table.

For example: imagine a writer has an application-level timeout of 10 seconds to commit a write. Systems like AWS SQS require a "visibility timeout" on messages where consumers have a fixed length of time to complete their work before a message is made visible and available to another consumer.

With an external limitation of 10 seconds applied, if loading the table and committing a write takes 1s, then the theoretical limit on concurrent writers is 10, since that is the maximum amount of times the lock can be acquired, used, and released within a 10 second timeout.

In practice the maximum possible concurrent writers is lower unless the writers are guaranteed a fair and round-robin distribution of workload, otherwise writers with exceptionally bad luck could be perpetually starved of the lock.

The distribution of workloads can also adversely affect the number of possible concurrent writers. Imagine a workload which will enqueues 100 new versions for each of three different tables: logs.nginx, logs.alb, logs.cdn.

+-----+-----------------------+
| 1n | 1a | 1c | 2n | 2a | 2c |
+-----+-----------------------+

A "fair" distribution of these versions from the queue can improve the maximum concurrency but only so long as there are many discrete tables being modified concurrently. In this case, having more than three concurrent writers will lead to diminishing returns and wasted runtime waiting for lock acquisition.

An "unfair" distribution of these new versions from the queue will have noticeably worse concurrency performance:

+-----+------------------------------------------------+
| 1n | 2n | 3n | 4n | 5n | 6n | 1a | 2a | 3a | 1c | 2c |
+-----+------------------------------------------------+

Concurrent writers with the above distribution, assuming a FIFO-style queue consumption model, will spend substantially more time waiting for the lock as the first 6 versions are processed, since they will all be contending for the same lock on the logs.nginx table.

Curse of stale table state

One of the common patterns for optimizing concurrent writers is to minimize the amount of time holding the lock. The .parquet files can created and uploaded without ever taking the lock. Rather than acquiring the lock for "doing the data processing work", applications should only acquire the lock when they need to create and commit a transaction to the Delta log.

Version files for Delta tables use monotonically increasing version numbers. For concurrent writers this means that before a new transaction is created the existing table state must be loaded. Not only does this mean that the lock must be acquired before loading table state, it also means that there is potential for variable performance as the Delta table grows.

Delta supports checkpoints which provide a key performance optimization for loading table state, but these checkpoint files are only conventionally created every 10th version. This means a writer which acquires the lock starts to load the table state at version 6 must load:

└── _delta_log
       └── 00000000000000000000.json
       └── 00000000000000000001.json
       └── 00000000000000000002.json
       └── 00000000000000000003.json
       └── 00000000000000000004.json
       └── 00000000000000000005.json
       └── 00000000000000000005.json

This means it must execute 7 total GetObject network calls to S3 (among a few others) before it can do any of its meaningful work writing the table.

The lucky writer which loads the table state at version 10 would load:

└── _delta_log
       └── _last_checkpoint
       └── 00000000000000000010.checkpoint.parquet

Resulting in 2 effective GetObject network calls (among a few others) before it can do its meaningful work of writing the next version of the table

The differences in loading table state for concurrent writers results in an unavoidable variability in runtime performance of the critical section holding the lock.

At the end of the day, the fundamental challenge is that only one writer can operate at any given time on a unique Delta table. The most efficient number of concurrent writers for a given table is one.

In practice however, most data processing workloads are doing more than strictly writing to the Delta table. There may be some data computation the writer must do, or enriching of data with third party sources, all of which can likely be done concurrently outside of the dynamodb-lock critical section.

Coalesce is more

For write-heavy, typically append-only, Delta Lake workloads where there is little to no data processing happening, there are still benefits to concurrent writers especially for a writer which handles multiple tables simultaneously such as kafka-delta-ingest. Coalescing the data can help improve the efficiency and performance of writers which need to acquire the DynamoDB-based lock.

One such pattern requires coalescing by table URI into discrete queues. Rather than handling inbound writes all together, a worker can be deployed for each discrete table, and therefore maintain sole ownership of the lock. With kafka-delta-ingest a common pattern is to deploy a single writer for each topic and partition, which has two benefits:

  • Ensures all data is consumed from Kafka in a way that guarantees exactly-once semantics
  • Sets an optimal number of concurrent Delta writers for a given table to the total number of partitions for its topic, minimizing to the extent possible the amount of lock acquisition overhead.

Optimizing performance by coalescing of workload can take another more Delta-specific form. As already discussed, concurrent writers are limited because they need to acquire a lock to write a new version to a single table. Each version of a Delta table however can have multiple actions which modify the state of the table. In the case of an append-only writer, it would be suboptimal to attempt to have 1000 concurrent writers fighting for the lock to write 1000 versions, each with a single add action. Instead, if possible, coalescing those add actions to where 10 concurrent writers are each writing a new version containing 100 add actions. This approach provides two substantial benefits:

  1. Fewer concurrent writers spend less time waiting for or attempting to acquire the lock.
  2. Reduction of "version bloat", where a large number of versions are added to the table, which benefits writers attempting to load table state but also readers attempting to query the table.

Until a strongly consistent "put if absent" style operation exists in S3, the fundamental limitation of concurrent writers will remain: writes must be serialized for any given table. There is however a lot of flexibility that can be built around this limitation to ensure highly efficient writes to Delta Lake tables in AWS S3 in both Python and Rust.


Improving performance of Rust and Python data processing infrastructure is something I love to do! If you're thinking about reducing costs by moving write workloads out of Spark or Databricks, drop me an email and we'll chat!!