Ingestion of data with "serverless" tools is a highly efficient way to bring
data into a data lake, with a couple caveats. The cost profile for many
serverless offerings works best for elastic data workloads rather than
sustained high volumes of data. I created
sqs-ingest
for append-only data streaming workloads where
kafka-delta-ingest was simply
too expensive and high-powered. At its core sqs-ingest
can be
coupled to a single AWS SQS queue and scale up
or down depending on the workload demands. As mentioned
previously however, there are
horizontal scaling limits when writing to Delta tables. One of the strategies I
shared was to maximize the batch size. In this post we will dive into the
"buffer more" functionality I have added to sqs-ingest
to really help cram as
many messages as possible into each batch!
At a fundamental level sqw-ingest
is a simple SQS triggered Lambda function.
It is therefore subject to all of the features and limitations of AWS Lambda.
For example, the SQS trigger does allow for configuring an increase in the
"batch size" and "batch window", which to recap:
- Batch size: The maximum number of records the Lambda invocation may
have. Higher is typically better for throughput; tuning this to the largest
value possible to fit within the memory or time constraints of what the
function can handle is usually best. For
sqs-ingest
where each record represents a row appended to the Delta table the default value of10
is insufficient, and the maximum of10,000
is preferred. - Batch window: The maximum amount of time to gather records before invoking the function. This is typically the most sensitive value to tune. A batch window of 10 seconds will guarantee that the Lambda will be invoked every 10 seconds, and try to create a transaction in that period, which can cause a concurrent transaction race. This value is best set to the maximum data freshness latency tolerated by the organization. For example, if data should be processed in under 5 minutes, then the batch window is best set to a value between 240s and 300s.
Both of these settings can and should be used but there is a key
constraint on Lambda function invocation: 6MB of data at a time. This 6MB
limit means in a default sqs-ingest
deployment, a data file will not be
written with more than 6MB of data at a time. For higher throughput or larger
data streams it's possible to get into a situation where the transaction log
grows faster than the data volume!
Buffer more!
The solution to increasing the data file size is quite simple as it turns out:
"just get more messages!" The challenge with Lambda is that the 6MB limit is a
hard ceiling, there are no rules against the Lambda communicating directly with
SQS however! In fact the Lambda's execution role already has the necessary
permissions to receive messages from SQS, so sqs-ingest
now can support just
getting more messages!
This functionality is not enabled by default and requires two additional environment variables for configuration:
BUFFER_MORE_QUEUE_URL
Set to the SQS queue URL that triggered the function to buffer more messages per-invocationBUFFER_MORE_MESSAGES
Set to an integer value of the number of additional "batches" to consume from SQS. The maximum amount of messages an SQS API can support fetching at once is10
so the additional messages consumed for a given invocation of the Lambda will be10 * BUFFER_MORE_MESSAGES
.
With "buffer more" enabled sqs-ingest
will repeatedly fetch messages from SQS
until the configured limit has been reached or half the configured timeout of
the function has elapsed. By using half the timeout as the function's "buffer
more" period, the function is far less likely to timeout when it comes to
serializing its write to the Delta table.
flowchart TD
SQS[(SQS)]
Trigger["SQS trigger"]
Lambda["Lambda
(_sqs-ingest_)"]
SQS --> Trigger
Table@{ shape: tri, label: "Delta Table" }
Trigger-- trigger w/ original batch ---Lambda
Lambda<-. buffer more .->SQS
Lambda<-..->SQS
Lambda<-..->SQS
Lambda<-..->SQS
Lambda-- write transaction ---Table
Since the "buffer more" feature must interact directly with SQS, it requires a
little extra time after the Delta transaction to delete the completed messages
from SQS. The trade-off with this extra runtime is that larger data files are
written to Delta making the overall execution more efficient over time, which
makes the buffering sqs-ingest
more time efficient than the default
configuration.
Cost concerns
Buffering more messages per single Lambda invocations generally means fewer
Lambda invocations which can be cheaper but frankly when compared to running
always-on ECS/Fargate containers for kafka-delta-ingest
, the Lambda cost is
tiny.
There are costs associated with SQS API calls that should be considered. Since SQS is pay-per-use there is a threshold where the throughput (messages per second) in SQS becomes more expensive. Kafka brokers are always-on and generally fixed cost so they tend to be quite expensive but can handle hundreds of thousands of messages per second, which I would not recommend putting through SQS and Lambda.
When using sqs-ingest
for streams with tens, hundreds, or thousands of
messages per second the majority of the cost will not be Lambda. When sending
larger volumes of data into SQS, please be sure to use the
SendMessageBatch
API if possible to reduce the number of API calls.
By buffering more messages, sqs-ingest
can handle much larger data streams
than a normal Lambda function and you can get a lot of cost savings and
performance out of sqs-ingest
with all the reliability, scalability, and
performance benefits of a more serverless architecture!
If your team is interested in building out serverless data ingestion pipelines drop me an email and we'll chat!!