Writing RecordBatches to Delta in Rust

February 09, 2023

by R. Tyler Croy

deltalake

rust

developer

Almost the entirety of Buoyant Data's ingestion pipelines are written in Rust and utilize the Delta Lake Rust bindings. This allows for fast and cost efficient data ingestion whether in AWS Lambda or running as more complex daemons in containers deployed in AWS ECS/Fargate. The benefits are numerous but the learning curve can be steep both for learning Rust and implementing Delta Lake writers in Rust.

The basic "flow" of writing data to Delta Lake in Rust is:

  • Ingest data into memory, i.e. read an JSON file.
  • Read the Delta table metadata for schema information.
  • Create the necessary data structures and do any desired schema verification.
  • Write out to a .parquet file.
  • Create a transaction and write that to the Delta table's log.

For most users in the Delta community Slack the challenge stems from having to translate what feels like row-based data into columnar data. As an example, imagine a system recording data for a series of weather stations. With Delta Lake we might conceive of that as a table with rows such as:

+------------------------------------------------------------------------------------+
| ident | temp_c | humidity | wind_speed | wind_direction | ts                       |
+------------------------------------------------------------------------------------+
| KSTS  |   13.3 |    54    |     5      |      nnw       | 2023-02-08 12:00:35-0900 |
| KSTS  |   13.5 |    53    |     3      |      nnw       | 2023-02-08 13:00:15-0900 |
| KSTS  |   13.2 |    52    |     6      |      n         | 2023-02-08 14:00:47-0900 |
| KSTS  |   12.6 |    52    |     5      |      nnw       | 2023-02-08 15:00:05-0900 |
+------------------------------------------------------------------------------------+

Most developers are very comfortable with row-based data processing, but in delta-rs data must be written to Apache Parquet using the Apache Arrow in-memory format, which is columnar. This is done in Rust using the RecordBatch structure.

Using the example above, data must be transposed from row-based to column-based, in effect turning the table into a series of vectors for each column, such as:

let idents = vec!["KSTS", "KSTS", "KSTS", "KSTS"];
let temps = vec![13.3, 13.5, 13.2, 12.6];
// and so on..

To help others understand this API, I have contributed the recordbatch-writer.rs example to the delta-rs repository. Below is a somewhat simplistic transpose of the row data into columns for writing with the RecordBatch:

let arrow_array: Vec<Arc<dyn Array>> = vec![
    Arc::new(TimestampMicrosecondArray::from(ts)),
    Arc::new(Int32Array::from(temp)),
    Arc::new(Float64Array::from(lat)),
    Arc::new(Float64Array::from(long)),
];

RecordBatch::try_new(arrow_schema_ref, arrow_array).expect("Failed to create RecordBatch")

What can be tricky for new users of the library is converting the "native" types, such as Vec<String> or Vec<DateTime<Utc>> (using chrono) to the arrow-rs appropriate intermediate array representations. This requires consulting the docs for arrow::array types and can be challenging at the best of times. My recommendation to new developers is to make prolific use Rust's built-in unit tests to get the structure of the data correct!

Building Rust-based ingestion pipelines can be a little daunting, but the payoff is very worth it in both correctness and efficiency once the various pieces get put together. recordbatch-writer.rs is MIT-licensed, so please use it to start building your own ingestion applications in Rust!


Special thanks to fellow delta-rs committer Will Jones for their improvements to the example during the code review process.


There's plenty to learn and share about utilizing Rust for high performance and efficient data ingestion! Please be sure to subscribe to our RSS feed, or if your organization needs help building and deploying Rust-based Delta Lake applications, we can help! Drop us an email!