Streaming data systems can offer more cost-effective data processing while delivering fresher data but require much more monitoring discipline, especially when it comes to lag. In many streaming systems when lag grows, other dependent systems can entered degraded or failing states! In short lag is a big deal. When customers observe lag for stream processing systems which write to Delta Lake tables, the natural first response for most system operators will be to increase concurrency, or horizontally scale, which may actually slow the system further!
This concurrent transaction race problem can manifest when using Delta Lake for AWS Lambda, kafka-delta-ingest or any system which allows for multiple concurrent writers to a single Delta Lake table.
AWS Lambda, ECS, and a number of other deployment environments encourage and even automate horizontally scaling processes but for Delta writers it is typically better to have fewer processes.
Concurrent Transaction Race
The race can be more readily observed in highly concurrent systems like AWS
Lambda, such as with
sqs-ingest
which is part of our Oxbow
suite of tools. Inspired by kafka-delta-ingest, sqs-ingest
is
deployed as an AWS SQS triggered Lambda and helps perform serverless Delta
Lake ingestion. Users can drop data into SNS or SQS and that data will be
automatically appended to the configured Delta tables. Unlike
kafka-delta-ingest which requires always-on daemons and an Apache
Kafka cluster, sqs-ingest can scale to zero which
also means its cost scales to $0.
By default Lambda is typically configured 1000 concurrent invocations, for purposes
of example let us consider a concurrency of 4, with writers A
, B
, C
, and D
. As each writer is invoked with a batch of messages, the Lambda will:
- Boot the function.
- Load the Delta table state at version
N
, e.g.deltalake::open_table()
. - Write batch data to S3.
- Create a new Delta for the added files at version
N+1
.
When there is a single writer this process is straightforward, but when there are multiple concurrent processes the version of the Delta table can change between steps 2 and 4, causing the concurrent transaction race.
flowchart LR
A-- Committed v1 -->table[(Delta table)]
B-. "`_trying v1_`" .->table[(Delta table)]
C-. "`_..._`" .->table[(Delta table)]
D-. "`_..._`" .->table[(Delta table)]
If the version changes after the table state is loaded, the writer should reload
state in order to produce a correct monotonically increasing version number.
The probability of the write succeeding without a reload is 1 / <number of writers>
. Therefore there is a 25% chance for any of the writers in this
scenario to succeed without hitting a conflict requiring a table state reload.
If there are one thousand concurrent Lambda-based writers then they each have a
0.1% chance of successfully writing without a conflict, which as you might
imagine causes a lot of thrashing as the functions all race to write the
N+1
version to the Delta table.
At a high level the conflict resolution procedure is as follows:
sequenceDiagram
participant B as Writer B
participant T as Delta Table
participant O as Object Storage
T ->> B: load transaction log
B ->> O: write data file
loop try to write next version
B ->> T: write v1
T -->> B: v1 already exists!
T ->> B: reload log
note right of B: determine if commit can still be written without data conflict
B ->> T: write v2 (latest + 1)
end
The cost of a conflict between writers is not zero and will always have some overhead.
As transaction log sizes grow, or as tables become less and less optimized the overhead of the table state reload associated with the concurrent transaction race can introduce substantial lag into a streaming system.
With my recently merged pull
request the deltalake
Rust
crate and Python package will try to determine the next commit version
without loading the table's state in many scenarios.
A streaming application that performs a merge or update operations must have the latest table state loaded in order for Apache DataFusion to properly plan a merge execution plan.
An append operation however does not require information about what data already exists in the table, the only information that it needs is the latest transaction version number.
The pull request linked above, which will be released in v0.23
automatically
performs optimistic conflict resolution and reduces the chance for lag in
streaming systems.
Improvements to S3DynamoDbLogStore
and the conflict resolution
changes coming in v0.23
help, but cannot address all the challenges
associated with the concurrent transaction race.
Referring to our previous writers A
, B
, C
, D
, in a streaming merge
application they must reload table state to safely perform their writes. Not only
must they reload the transaction log but they may also need to reload data
files too! A substantial time penalty can be introduced when needing to
resolve a commit conflict, which could happen up to 75% of the time. Imagine if
an automated system started adding new writers as lag increased!
Horizontally scaling writers is more likely to slow throughput than speed it up!
4 Strategies for increasing throughput
Fortunately there are some classic strategies which can improve the performance of a Delta streaming application. None of these strategies are particularly unique or Delta Lake specific, and are typically good considerations for any streaming system which requires a serialization of behaviors like commits to the transaction log.
Maximize the batch size
Delta table commits typically include "actions" and "data" and in a streaming application which performs merges a single commit may contain a number of actions:
remove
which will remove any out-dated.parquet
data files in the transaction log.add
which will add newly created.parquet
data files to the transaction log, which represent the newly merged state of the table.
If the overhead of a single merge commit takes 5 seconds for each writer, then increasing the consumed batch size for the application can reduce the frequency with which concurrent processes are attempting to write merge commits. Of course, this must be balanced against freshness constraints on the streaming application, as an increased batch size may result in a higher latency of data becoming available in the Delta table.
In the case of sqs-ingest
the SQS trigger for the AWS Lambda has two key settings which should be tuned:
- Batch size: The maximum number of records the Lambda invocation may
have. Higher is typically better for throughput; tuning this to the largest
value possible to fit within the memory or time constraints of what the
function can handle is usually best. For
sqs-ingest
where each record represents a row appended to the Delta table the default value of10
is insufficient, and the maximum of10,000
is preferred. - Batch window: The maximum amount of time to gather records before invoking the function. This is typically the most sensitive value to tune. A batch window of 10 seconds will guarantee that the Lambda will be invoked every 10 seconds, and try to create a transaction in that period, which can cause a concurrent transaction race. This value is best set to the maximum data freshness latency tolerated by the organization. For example, if data should be processed in under 5 minutes, then the batch window is best set to a value between 240s and 300s.
Different streaming applications have different ways of configuring the batch size, but functionally they all do the same thing: govern how large of a working set of data the application can process at once.
Aim for the sweet spot of processing as much data as possible within the allowable latency of the streaming applicatin.
Shard the input stream
For higher throughput streams, it can become difficult or even impossible to
perform the append/merges in a timely manner on the input stream. Imagine a
stream with 10,000
records per second, the sqs-ingest
settings mentioned in
the previous section would guarantee that a Lambda is executing every second
to handle the input data. Chances are the processing time of the Lambda will
exceed one second, and the streaming application is prevented from ever
processing data in near real-time!
Sharding is typically an application specific concept and requires
application-specific considerations. Imagine the input stream has a
customer_id
embedded within it, and the destination Delta table is
partitioned on that same customer_id
. Rather than having a singular stream
processing application handle all records at 10_000
records per second,
introducing a step which uses customer_id
to fan out to multiple parallel
processes can reduce the per customer_id
batch size to something more
manageable in the batch size and window.
Using the partition information of the downstream Delta table for sharding the input stream has the added benefit of reducing the data files which may need to be rewritten in the case of a merge operation, further improving performance of the streaming application.
Scale vertically
Whether the streaming application is performing append or merge operations, the amount of memory and processor time required is going to play a role in how fast data can be processed. Observe the CPU usage of the streaming application, if it appears to be CPU-bound then it is usually better to vertically scale rather than horizontally scale. Because of the concurrent race problem a single CPU-bound process using 4 vCPUs is going to be more efficient than four processes each using 1 vCPU, as the latter will have coordination overhead between the writers.
AWS Lambda, ECS, and a number of other deployment environments encourage and even automate horizontally scaling processes but for Delta writers it is typically better to have fewer processes.
Segment appends and merges
The complexity of merge or update operations on high throughput streams can present unavoidable latency and throughput challenges. In some cases it is most practical to avoid trying to try to achieve the impossible under tight time constraints and instead rely on the medallion architecture and segment append operations into bronze tables and reserve merges for downstream silver tables.
A merge operation on a low latency streaming application may not even make sense in some situations, as is the case when the merge involves a data set which is not as frequently updated.
I have seen a number of customers who want data immediately in bronze but can, or want, data to be merged into silver tables in slower 10-60 minute batches. By adopting the bronze/silver distinction, data applications which need the freshest data can get it as soon as possible while others can operate at a slower pace.
Scaling data applications in modern cloud environments can be done automatically and very efficiently. But developers should avoid using the same "scaling playbook" that their stateless web applications might use. Streaming workloads have different performance characteristics and constraints! More containers is not always the answer, often times less is more when scaling latency sensitive data applications.
If your team is looking to improve performance of your streaming or batch infrastructure with Delta Lake, drop me an email and we'll chat!!