Delta Lake on S3-compatible storage systems has concurrency limitations which require coordination of writes for any given table which presents scaling challenges when building general purpose Lambdas to work with Delta tables. AWS Lambdas can have massively concurrent execution which in theory is extremely beneficial for performing parallel Delta operations, but in practice the requirement of table-level locking can result in tons of failed Lambda executions and other problems. The underlying issue stems from AWS S3 missing "put if absent" semantic, which forces a coordination requirement when modifying a Delta table in S3, as was outlined in the previous blog post. For most AWS accounts the default concurrency limitation of Lambda execution is 1000 concurrent executions, because of the table-level lock, if 1 Lambda has the lock then 999 Lambda executions would be all busily waiting to acquire the lock. The options available to prevent this scenario are: limit the concurrent executions to 1, or trigger Lambda only once per-table. This article will outline a pattern for doing the former with AWS S3 Event Notifications and AWS SQS FIFO queues.
Using S3 Event
Notifications
to trigger Lambda functions is a really compelling way to take externally
generated .parquet
files and incorporate them into a Delta table's
transaction log. This is exactly what the
oxbow utility does. Oxbow is currently
deployed in a number of production environments, happily ingesting many
terabytes per day of externally produced Apache Parquet data into Delta tables,
all built around S3 Event Notifications.
The "simple" Oxbow architecture is as follows:
+-----------+
| S3 Bucket |
+-----------+
|
| +-----+
`-/Event Notification/--> | SQS |
+-----+
|
+--------------+ |
| Oxbow Lambda | <----'
+--------------+
Unfortunately S3 Event Notifications can only be directed into AWS SQS, an unordered queue. S3 Event Notifications exhibit a few behaviors in the wild which influence the system's design here:
- While SQS messages can have multiple records within them, S3 Event Notifications appear to only trigger a 1:1 ratio of S3 Event Notification to SQS message. Therefore the queue This means that there is no batching or batch control that can be configured for notifications.
- SQS is not strictly ordered but messages are still visible in a roughly chronological order.
- AWS Aurora exports of Apache Parquet tends to export multiple
.parquet
files per table at approximately the same time. If there are 26 tables, most/all the files for the tableA
will be dumped before tableB
and so on.
For situations with high numbers of event notifications on specific table
prefixes, the oxbow
lambda could be triggered in such a way that led to
substantial lock contention, and resulted in large numbers of event
notifications ending up in the dead-letter queue.
After discussing the problem in detail, a potential solution was offered by Chris Harrington on Mastodon.
SQS FIFO queues have a novel feature which allows for grouping of messages by a user-provided identifier. When messages have a group ID associated, only one consumer can consume a message for the entire group, since to do otherwise would upset the FIFO nature of the queue.
By pre-processing the S3 Event Notifications it is possible to determine the
Delta table prefix that a new .parquet
resides in, and augment the SQS
message with a group ID, and forward that along to an SQS FIFO queue, e.g.:
+-----------+
| S3 Bucket |
+-----------+
|
| +-----+
`-/Event Notification/--> | SQS |
+-----+
|
+---------------------+ |
| group-events Lambda | <----'
+---------------------+
|
/Add table prefix as group ID/
|
v
+-- -------+
| SQS FIFO |
+----------+
|
v
+--------------+
| oxbow Lambda |
+--------------+
With the above implementation, which is open source on
GitHub,
the oxbow Lambda remains unmodified from the "simple" deployment, and can be
configured with unlimited concurrency. The AWS SQS FIFO machinery will ensure
that no concurrent executions of the oxbow
Lambda will attempt to modify the
same Delta table.
Whether using the "simple" or "advanced" deployment pattern with oxbow
, it
has proven to be an incredibly useful tool for creating append-only Delta
Lake tables from data generated by external sources such as
AWS Aurora or Kinesis Data Firehose. Incorporating SQS FIFO queues into the
design has allowed it to effortlessly scale to meet the needs of incredibly
high throughput systems, while costing pennies per day!
To learn more about Oxbow, please check out the GitHub repository, and thanks again to Chris for the great idea!
Improving performance of Rust and Python data processing infrastructure is something I love to do! If you're thinking about reducing costs by moving write workloads out of Spark or Databricks, drop me an email and we'll chat!!