Almost the entirety of Buoyant Data's ingestion pipelines are written in Rust and utilize the Delta Lake Rust bindings. This allows for fast and cost efficient data ingestion whether in AWS Lambda or running as more complex daemons in containers deployed in AWS ECS/Fargate. The benefits are numerous but the learning curve can be steep both for learning Rust and implementing Delta Lake writers in Rust.
The basic "flow" of writing data to Delta Lake in Rust is:
- Ingest data into memory, i.e. read an JSON file.
- Read the Delta table metadata for schema information.
- Create the necessary data structures and do any desired schema verification.
- Write out to a
.parquet
file. - Create a transaction and write that to the Delta table's log.
For most users in the Delta community Slack the challenge stems from having to translate what feels like row-based data into columnar data. As an example, imagine a system recording data for a series of weather stations. With Delta Lake we might conceive of that as a table with rows such as:
+------------------------------------------------------------------------------------+
| ident | temp_c | humidity | wind_speed | wind_direction | ts |
+------------------------------------------------------------------------------------+
| KSTS | 13.3 | 54 | 5 | nnw | 2023-02-08 12:00:35-0900 |
| KSTS | 13.5 | 53 | 3 | nnw | 2023-02-08 13:00:15-0900 |
| KSTS | 13.2 | 52 | 6 | n | 2023-02-08 14:00:47-0900 |
| KSTS | 12.6 | 52 | 5 | nnw | 2023-02-08 15:00:05-0900 |
+------------------------------------------------------------------------------------+
Most developers are very comfortable with row-based data processing, but in
delta-rs
data must be written to Apache Parquet
using the Apache Arrow in-memory format, which is
columnar. This is done in Rust using the RecordBatch structure.
Using the example above, data must be transposed from row-based to column-based, in effect turning the table into a series of vectors for each column, such as:
let idents = vec!["KSTS", "KSTS", "KSTS", "KSTS"];
let temps = vec![13.3, 13.5, 13.2, 12.6];
// and so on..
To help others understand this API, I have contributed the
recordbatch-writer.rs
example to the delta-rs
repository. Below is a somewhat simplistic transpose
of the row data into columns for writing with the RecordBatch
:
let arrow_array: Vec<Arc<dyn Array>> = vec![
Arc::new(TimestampMicrosecondArray::from(ts)),
Arc::new(Int32Array::from(temp)),
Arc::new(Float64Array::from(lat)),
Arc::new(Float64Array::from(long)),
];
RecordBatch::try_new(arrow_schema_ref, arrow_array).expect("Failed to create RecordBatch")
What can be tricky for new users of the library is converting the "native"
types, such as Vec<String>
or Vec<DateTime<Utc>>
(using
chrono) to the arrow-rs
appropriate
intermediate array representations. This requires consulting the docs for
arrow::array
types and can be
challenging at the best of times. My recommendation to new developers is to
make prolific use Rust's built-in unit tests to get the structure of the data
correct!
Building Rust-based ingestion pipelines can be a little daunting, but the
payoff is very worth it in both correctness and efficiency once the various
pieces get put together.
recordbatch-writer.rs
is MIT-licensed, so please use it to start building your own ingestion
applications in Rust!
Special thanks to fellow delta-rs committer Will Jones for their improvements to the example during the code review process.
There's plenty to learn and share about utilizing Rust for high performance and efficient data ingestion! Please be sure to subscribe to our RSS feed, or if your organization needs help building and deploying Rust-based Delta Lake applications, we can help! Drop us an email!