1 Billion Rows Challenge
Writing highly optimized Rust to analyze a large quantity of data
The 1 Billion Rows Challenge is a fun optimization experiment that pushes developers to write a program in the language they prefer to solve a problem in the lowest execution time possible.
You are tasked with designing and implementing a program to analyze a large-scale dataset of temperature measurements. The dataset, comprising one billion records and occupying approximately 13GB of storage, requires the calculation of minimum, mean, and maximum temperatures for each weather station.
Experimental Setup
- Linux 6.6.49 (Nixos)
- AMD 5900x (24 vCores) @ 3,70 GHz
- 32 GB RAM 3600 Mhz (with XMP)
- NVME SSD (2,5 GB/s raw read speed)
Time is measured through the time
command, using the total execution time.
I’m using the current (1.80.1) stable API of Rust, allowing insecure code where it makes sense.
Nightly-only features are not allowed, but external libraries are.
Table of results
Title | Commit | Time (s) | Flamegraph |
---|---|---|---|
Naive | 261bf09 | 1:54,61 | SVG |
From String to [u8] | 9c53462 | 1:29,94 | SVG |
New hash function | d6cea09 | 1:28,56 | SVG |
Hashbrown raw_entry_mut | b67769e | 56,080 | SVG |
Small Optimizations | 0a9dfc7 | 41,706 | SVG |
Sorting the solutions | 9f9a36c | 40,555 | SVG |
Custom f32 | 7b07df7 | 36,494 | SVG |
Simple multithreading + fix avg | 67b8638 | 4,957 | SVG |
Some smaller optimizations | d1b2624 | 4,029 | SVG |
Last fun test | 3,438 |
Implementations
Naive
A very simple implementation of the solution. We use a HashMap
to store the name of the city in a String
as key and a struct with the measurements.
#[derive(Debug)]
struct Measurement {
pub min: f32,
pub max: f32,
pub count: u64,
pub sum: f32,
}
We iterate over the file lines using a BufReader
with a predefined capacity of 4KB
.
For each line, we split the string at the first (and only) ;
we find and try to parse the latest substring as a f32
.
Then we update the values in the HashMap
.
In the last step we print the results with a println
using the standard Debug
implementation.
From String
to &[u8]
Let’s try to analyze the application flamegraph. You can obtain one using cargo flamegraph --release
. Remember to enable the debug symbols in the binary release profile.
We can spot three big areas: BufReader::read_line
, HashMap::entry
and str::parse
.
Let’s try to address each of these one at a time.
The method BufReader::read_line
reads the content of the file until the next new line, stores it into a vector and then converts it into a String
. This implicitly includes two operations:
- allocating a new
Vec
and filling it withu8
read from the file - converting the
Vec<u8>
to aString
, which includes checking that the contents are valid UTF-8 values.
Since we don’t really need to store the whole line, but only the name of the city, and we already know from the problem description that the file only contains valid UTF-8 characters, then we can work on the line bytes, without converting them to characters of a string (until the very end, when we store them in the hashmap).
For ease of comprehension, we create a new struct ParsedData
that will store the results of the line parsing.
#[derive(Debug)]
struct ParsedData<'a> {
name: &'a [u8],
value: f32,
}
The parser is adapted for working on &[u8]
:
fn parse_bytes(data: &[u8]) -> Option<ParsedData> {
let pos_separator = data.iter().position(|b| *b == SEPARATOR)?;
let (name, rest) = data.split_at(pos_separator);
let num_data = &rest[1..rest.len() - 1];
let value = (unsafe { str::from_utf8_unchecked(num_data) })
.parse()
.unwrap();
Some(ParsedData { name, value })
}
Notice that we use str::from_utf8_unchecked
to convert the number bytes to a str
and then parse
to convert it to a f32
. The first operation has zero costs, as it is using mem::tansmute
under the hood to convert the two types but does not change the memory contents. We use the standard f32
parser for now.
The read loop still reads until the next new line and then applies the parse_bytes
method.
let mut buffer: Vec<u8> = Vec::with_capacity(30);
loop {
let read = reader.read_until(NEW_LINE, &mut buffer).unwrap();
if read == 0 {
break;
}
match parse_bytes(&buffer[..read]) {
Some(parsed_data) => {
records
.entry(parsed_data.name.to_owned())
.and_modify(|m| m.update(parsed_data.value))
.or_insert(Measurement::new(parsed_data.value));
}
None => {
break;
}
}
buffer.clear()
}
With this simple change we almost cut the execution time by about 25 seconds, which is about 21.5% of the naive execution time.
There are still some limitations in the current solution:
- the HashMap used is still quite slow and its
entry
method takes the most of the execution time. - the parsing function could probably be sped up skipping the conversion to
str
, although I’m quite sure it is already optimized away by the compiler BufReader::read_until
takes a lot of the execution time and a lower level buffer could be beneficial
We still haven’t touched any parallelization method.
New hash function
Rust default hash function is designed to be unpredictable due to security reasons: if you could know in advance where a certain entry would be stored you could apply some memory manipulation techniques to set some specific patterns in the HashMap allocated memory, perhaps degrading the application performance. This specific property is not a requirement in this application and can be skipped for some performance benefits.
In this case we adopt the hashbrown crate’s alternative hasher, a non-cryptographic hasher.
The change is very simple: we switch the standard HashMap
with hashbrown::HashMap
.
This new hasher gives us a non insignificant performance upgrade for free, decreasing the execution time by 1 second.
Hashbrown raw_entry_mut
Now, what other part of our program can we optimize further?
We can see that in the HashMap::entry
call, we spend quite some time allocating the parsed name into a Vec<u8>
. This happens both whether the entry is new and when it is already stored in the system. Hashbrown’s HashMap has an advantage over the one in the standard library: raw_entry
and its mutable version raw_entry_mut
(the ones in the standard library are not in stable yet, but you can try them on nightly). This gives us a more lower level access to the workings of the HashMap
, allowing us to pass as key objects that do not implement the trait Borrow
, such as &[u8]
.
match records.raw_entry_mut().from_key(parsed_data.name) {
hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => {
entry.get_mut().update(parsed_data.value)
}
hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
entry.insert(
parsed_data.name.to_owned(),
Measurement::new(parsed_data.value),
);
}
}
This allows us to only allocate new memory for the Vec<u8>
that is stored in the HashMap when a new key is found.
This allows us to break the minute barrier, with a total execution time of 56,080 s.
Small optimizations
We can see how the main
function is now composed of four major blocks. In order by size:
hashbrown::map::HashMap:entry
, the lookup of the hashmap.We cannot further upgrade the hashmap itself, but we could improve how we store the keys. Using a shorter view on the key, perhaps only using some of its letter we could trade more memory with a faster lookup.
Using as key a 4 byte array using the first, third, fifth, and last character of the city name gives a 3 second advantage:
let mut key = [0u8; 4]; key[0] = parsed_data.name[0]; if let Some(&v) = parsed_data.name.get(2) { key[1] = v; } if let Some(&v) = parsed_data.name.get(4) { key[2] = v; } if let Some(&v) = parsed_data.name.last() { key[3] = v; } match records.raw_entry_mut().from_key(&key) { hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => { entry.get_mut().1.update(parsed_data.value) } hashbrown::hash_map::RawEntryMut::Vacant(entry) => { entry.insert( key, ( parsed_data.name.to_owned(), Measurement::new(parsed_data.value), ), ); } }
Unfortunately, we don’t have control on the city names and while we could do an analysis on the list of generated ones and obtain a smarter hashing algorithm, I’ll stop with the previous one to maintain generalizability.
P.S. Apparently the Hashbrown HashMap already implements something similar, so doing something like this probably would only increase the allocation overhead.
std::io::BufRead::read_until
: the part that reads the file in a buffer- This is dominated by
extend_from_slice
(filling the buffer with data) andmemchr
which is used to find the new line character. This is already most likely optimal in its operation. We could perhaps play with the buffer size to further optimize depending on the data.
I tried with a 16k buffer and the new execution time is 47,499 s. A 32k buffer further reduced the execution to 43,960 s. We can see how this improves, but as it is highly dependent on the host machine, we can keep it as a hyperparameter for the fine tuning pass with the final solution.
parse_bytes
: the number parsing part- Since we already know that
position
has returned a correct value, we can usesplit_at_unchecked
instead ofsplit_at
, shaving other 2 seconds from the total, bringing us to 41,706 s. We don’t need all the resolution off32
for this program, butf16
is still in nightly, so we cannot use it yet.
P.S. Somebody implemented
f16
in a stable library (in two variants with more or less precision). Unfortunately both variants do not have enough precision to do theavg
calculation at the end, resulting inNaN
or0.0
. Also, the performance is worse than usingf32
, with a 2 to 3 seconds increase.
Sorting the solutions
Before printing the final results we need to sort them in alphabetical order. This cannot be done directly into the HashMap
as it does not support sorting.
There are many ways to sort the solutions for the final output:
- sorting the keys and retrieving the values when needed from the
HashMap
- building a new vector with the whole content of the
HashMap
and then sorting it by the key - should we sort using a pointer to the data?
- …
Using some benchmarks I measured several solutions, including the usage of parallel sorting through the Rayon crate and the fastest solution is the following:
let records: HashMap<Vec<u8>, Measurement> = read_data();
let mut records: Vec<(&Vec<u8>, &Measurement)> = records.iter().collect();
records.sort_unstable_by_key(|(k, _v)| *k);
for (k, v) in records.into_iter() {
println!(
"{}: {:.1}/{:.1}/{:.1}",
unsafe { str::from_utf8_unchecked(k) },
v.min,
v.avg(),
v.max
);
}
We store a vector of tuples with pointers to key and value inside the HashMap
. This allows us to sort the keys with just one additional indirection. This reduced the total execution time by another second, reaching 40,555 s.
Custom f32
Floating point operations are still very slow in modern CPUs. In this problem we don’t really need the whole f32
space of representation, we only need a decimal value. What if we use integers to represent our values and just divide by 10 when we need to print them at the very end? i32
is enough to handle the sum of values and uses the same space as an f32
but sums are substantially faster to calculate than in floating point.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CustomF32(pub i32);
impl CustomF32 {
#[inline]
fn parse_bytes(mut data: &[u8]) -> Self {
let neg = match data[0] {
b'-' => {
data = &data[1..];
true
}
_ => false,
};
let (a, b, c, d): (u8, u8, u8, u8) = match data {
[c, b'.', d] => (0, 0, c - b'0', d - b'0'),
[b, c, b'.', d] => (0, b - b'0', c - b'0', d - b'0'),
[a, b, c, b'.', d] => (a - b'0', b - b'0', c - b'0', d - b'0'),
_ => panic!("Unknown patters {:?}", std::str::from_utf8(data).unwrap()),
};
let v = a as i32 * 1000 + b as i32 * 100 + c as i32 * 10 + d as i32;
if neg {
CustomF32(-v)
} else {
CustomF32(v)
}
}
}
impl AddAssign for CustomF32 {
#[inline]
fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0
}
}
impl Display for CustomF32 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let v: f64 = self.0.into();
write!(f, "{:.1}", v / 10.0)
}
}
The parsing is the most interesting part: since we know that the values are meteorologic temperature values, we can expect them to never reach more than 100.0 or lower than -100.0. Moreover, the challenge explicitly states that every value has a fixed one digit after the point. Therefore we can apply some pattern matching rules on the bytes slice and recalculate the original number as a sum of powers of 10 multiplied by the actual value.
This change improves our execution time by around 4 seconds, reaching 36.494 s.
Simple multithreading + fix avg
In order to implement an efficient way of executing the calculations in parallel we split the file in chunks and associate each with a separate thread. Finally we merge the partial results, sort them and print them.
In order to implement the chunking, we check the size of the file and calculate the chunk size.
let file = File::open("data/measurements-1000000000.txt").expect("could not open file");
let file_size = file
.metadata()
.expect("Cloud not retrieve file size")
.size();
let threads = std::thread::available_parallelism().unwrap().get() as u64;
let chunk_size = file_size / threads;
Then we spawn a new thread for reach chunk, running an updated version of the read_data
function.
let mut handles = Vec::new();
for i in 0..threads {
let handle = thread::spawn(move || {
let file = File::open("data/measurements-1000000000.txt").expect("could not open file");
let mut reader = BufReader::with_capacity((chunk_size / 4) as usize, file);
if i > 0 {
reader.seek_relative((chunk_size * i) as i64).unwrap();
let mut drop_buffer: Vec<u8> = Vec::with_capacity(30);
let read = reader.read_until(NEW_LINE, &mut drop_buffer).unwrap();
assert_ne!(read, 0);
}
let stop_at = chunk_size * (i + 1);
read_data(reader, stop_at)
});
handles.push(handle);
}
Each thread needs a new BufReader
.
We seek to the initial position of the chunk and skip any data that is not a valid line.
The update read_data
function has an additional stop_at
argument that allows us to stop before the EOF is reached. We keep a counter of the read bytes as it is faster than asking for the file descriptor position.
The avg
function had an error in the order of calculations, resulting in wrong rounding. The correct version is as follows:
#[inline]
pub fn avg(&self) -> f32 {
self.sum.0 as f32 / 10.0 / self.count as f32
}
This reduced the execution time to 5,784 seconds.
Some smaller optimizations
parse_bytes
is updated to always return a value, this saves time leaving out checks on unwrap
.
pub fn parse_bytes(data: &[u8]) -> ParsedData {
let pos_separator = unsafe { data.iter().position(|b| *b == SEPARATOR).unwrap_unchecked() };
let (name, rest) = unsafe { data.split_at_unchecked(pos_separator) };
let num_data = &rest[1..rest.len() - 1];
let value = CustomF32::parse_bytes(num_data);
ParsedData { name, value }
}
Buffer size has been reduced to 26, as that’s the longest sequence of characters for any of the lines.
let mut buffer: Vec<u8> = Vec::with_capacity(26);
We also move from iter
to into_iter
in the records printing, saving one dereference.
This final changes allowed to reach the execution time of 4,029 seconds.
Last fun test
RAM will always be faster than an SSD. I tried moving the dataset on tmpfs
, thus removing the bottleneck of the SSD read speed and further lowering the execution time to 3,438 seconds.
Roundup
This was a pretty interesting challenge and required me to think very critically to my code. Tools like perf and cargo-flamegraph are a must for easily identifying the bottlenecks and helped a lot in the initial optimization stages. I’m very happy with the results I obtained, moving from almost two minutes to 4 seconds of execution.
I also tried to use channel-based solutions with standard threads and async runtimes (Tokio), decoupling file reads from their parsing and handling, but the additional overhead of allocating memory for the bytes message reduced performance. In Tokio, I tried with a simple actor-based approach, but the polling resulted in way longer execution times. Perhaps using streams the performance would improve.
I hope reading this writeup was interesting and I’m open to suggestions on how to further improve times on this challenge. You can find the updated version of this project on my GitLab repository.