Scaling S3 Event Notifications for Delta Lake

December 30, 2023

by R. Tyler Croy

aws

lambda

Delta Lake on S3-compatible storage systems has concurrency limitations which require coordination of writes for any given table which presents scaling challenges when building general purpose Lambdas to work with Delta tables. AWS Lambdas can have massively concurrent execution which in theory is extremely beneficial for performing parallel Delta operations, but in practice the requirement of table-level locking can result in tons of failed Lambda executions and other problems. The underlying issue stems from AWS S3 missing "put if absent" semantic, which forces a coordination requirement when modifying a Delta table in S3, as was outlined in the previous blog post. For most AWS accounts the default concurrency limitation of Lambda execution is 1000 concurrent executions, because of the table-level lock, if 1 Lambda has the lock then 999 Lambda executions would be all busily waiting to acquire the lock. The options available to prevent this scenario are: limit the concurrent executions to 1, or trigger Lambda only once per-table. This article will outline a pattern for doing the former with AWS S3 Event Notifications and AWS SQS FIFO queues.

Using S3 Event Notifications to trigger Lambda functions is a really compelling way to take externally generated .parquet files and incorporate them into a Delta table's transaction log. This is exactly what the oxbow utility does. Oxbow is currently deployed in a number of production environments, happily ingesting many terabytes per day of externally produced Apache Parquet data into Delta tables, all built around S3 Event Notifications.

The "simple" Oxbow architecture is as follows:

+-----------+
| S3 Bucket | 
+-----------+
      |
      |                          +-----+
       `-/Event Notification/--> | SQS |
                                 +-----+
                                    |
              +--------------+      |
              | Oxbow Lambda | <----'
              +--------------+       

Unfortunately S3 Event Notifications can only be directed into AWS SQS, an unordered queue. S3 Event Notifications exhibit a few behaviors in the wild which influence the system's design here:

  • While SQS messages can have multiple records within them, S3 Event Notifications appear to only trigger a 1:1 ratio of S3 Event Notification to SQS message. Therefore the queue This means that there is no batching or batch control that can be configured for notifications.
  • SQS is not strictly ordered but messages are still visible in a roughly chronological order.
  • AWS Aurora exports of Apache Parquet tends to export multiple .parquet files per table at approximately the same time. If there are 26 tables, most/all the files for the table A will be dumped before table B and so on.

For situations with high numbers of event notifications on specific table prefixes, the oxbow lambda could be triggered in such a way that led to substantial lock contention, and resulted in large numbers of event notifications ending up in the dead-letter queue.

After discussing the problem in detail, a potential solution was offered by Chris Harrington on Mastodon.

SQS FIFO queues have a novel feature which allows for grouping of messages by a user-provided identifier. When messages have a group ID associated, only one consumer can consume a message for the entire group, since to do otherwise would upset the FIFO nature of the queue.

By pre-processing the S3 Event Notifications it is possible to determine the Delta table prefix that a new .parquet resides in, and augment the SQS message with a group ID, and forward that along to an SQS FIFO queue, e.g.:

+-----------+
| S3 Bucket | 
+-----------+
      |
      |                          +-----+
       `-/Event Notification/--> | SQS |
                                 +-----+
                                    |
       +---------------------+      |
       | group-events Lambda | <----'
       +---------------------+       
                 |
    /Add table prefix as group ID/
                 |
                 v
           +-- -------+
           | SQS FIFO |
           +----------+
                 |
                 v
          +--------------+
          | oxbow Lambda |
          +--------------+

With the above implementation, which is open source on GitHub, the oxbow Lambda remains unmodified from the "simple" deployment, and can be configured with unlimited concurrency. The AWS SQS FIFO machinery will ensure that no concurrent executions of the oxbow Lambda will attempt to modify the same Delta table.

Whether using the "simple" or "advanced" deployment pattern with oxbow, it has proven to be an incredibly useful tool for creating append-only Delta Lake tables from data generated by external sources such as AWS Aurora or Kinesis Data Firehose. Incorporating SQS FIFO queues into the design has allowed it to effortlessly scale to meet the needs of incredibly high throughput systems, while costing pennies per day!

To learn more about Oxbow, please check out the GitHub repository, and thanks again to Chris for the great idea!


Improving performance of Rust and Python data processing infrastructure is something I love to do! If you're thinking about reducing costs by moving write workloads out of Spark or Databricks, drop me an email and we'll chat!!