Once upon a time, there was a dataset—a dataset filled with mortality information of people who died in the US in a given year, including how they died, their lifestyle, the conditions they had, and so much more. But using this dataset was no easy task, as only the bravest among us could handle its complexity and size—okay, too much?
This introduction may have been a bit dramatic 🌝, but this dataset was truly a challenge to work with, and I wanted to share my journey of preprocessing it. “It”? Oh yeah, my bad. It’s the Mortality Multiple Cause-of-Death dataset, a comprehensive dataset provided by the National Center for Health Statistics (NCHS) at the Centers for Disease Control and Prevention (CDC) (this sentence sounds so fancy 😌🤚).
From the way the data is structured, to the different encodings, to missing values and errors in the documentation, you are in for a fun ride!
Our First Encounter with the Beast
If you have ever worked with a dataset before, you probably expect it to be structured in a certain way, from more loosely defined formats like CSV to more structured formats like JSON. However, this dataset is like no other. All rows have the same width, there are no separators, no headers, and the data is encoded with numbers and/or characters that have no clear meaning at first glance. You know what they say, a picture is worth a thousand words. Here is just one line of the dataset:
You can scroll, I can't even display a third of it on one line.
Damn, how can we make sense of this mess? Thankfully, they provide a documentation. I invite you to take a look at it, so you get a better understanding of what will come next.
By just looking at a glance at the documentation, we can see that each index (or range) in a row corresponds to a specific feature, and that the encoding of the data is different for each feature. For example, the Resident Status feature is at index 20 and is encoded as a number from 1 to 4 (inclusive). You may begin to see how much work will be involved in processing this dataset. But, it’s completely doable, as with any big task, let’s split it into smaller ones.
At a high level, we need to deal with the following
Mapping the Dataset’s Complex Structure
We need to create some kind of mapping between the dataset’s structure (the indexes) and the features. What I chose to do was to create a dictionary that maps the range of indexes to the features.
Defining the Dataset’s Schema
It’s pretty much the reverse; we need to define how we want to represent the final dataset. What will be the columns? Their types? And so on. If you come from a Pandas background, you may not be familiar with this concept, but in Polars, you can define the schema of the dataset beforehand, instead of casting after the fact. I personally find this more intuitive, and it also helps in having a better view of what the end goal is.
Parsing Each Line
Now that we have the start and end goal, we need to define how to transform one into the other. That’s where the fun part is (no 🤠). Each index may be encoded differently and sometimes requires way more than just a simple one-to-one mapping. On top of that, the dataset is really big. I’ll go more in-depth on this later, but TLDR, we will deal with parallelism (how exciting!).
Without further ado, let’s get to it!
Creating the Mapping Dictionary
The plan is to create a dictionary where the keys are ranges of indexes, and the values are the feature names. This step is not hard but is a bit tedious, and you really have to pay attention to the documentation, as there is no space between different features. One index away from the correct one will be a different feature. Here is a small snippet of the dictionary I created for the dataset.
I made the choice to use an inclusive range for the indexes, as I find it more intuitive (especially when a feature is only one character). I added comments that are a direct copy and paste from the documentation, so that you can understand what each feature is and how it is encoded, all in the same place. Technically, you don’t have to take into account reserved_positions
, but I wanted to be as explicit as possible.
Defining the Dataset’s Schema
After defining the mapping dictionary, we need to define the data type for each feature. In Polars, a schema is a dictionary that maps each feature to its type. The list of all types supported by Polars can be found here. There is actually another step we need to do, which is to define the mapping of the possible values in each feature to the corresponding type. Let’s see a couple of examples.
The injury_at_work
feature is encoded as a single character that can be either Y
, N
, or U
. The type we should use here is pretty straightforward: a nullable boolean.
For all examples, assume that pl is the import alias for the Polars library.
Pretty straightforward, right? When parsing this feature, we will just have to use the value as the key in the mapping dictionary to get the corresponding type.
Let’s move on to another example, the manner_of_death
feature. It’s not a string, as there is only a fixed set of possible values. Does that ring a bell? Yes! An enum. And it’s encoded as a number from 1 to 7 (inclusive).
Notice how I don’t repeat myself? I use the mapping to both, well, map, but also create the enum. In programming in general, you should try as much as possible to have a single source of truth.
I’ll end these examples with entity_axis_conditions
, as it’s a bit more tricky. First, there can be 0 to 20 different conditions, and each condition is made up of multiple attributes. I chose to represent this as a list of structs.
You may be wondering why I did not use the enum I created. It looks like, as of this writing, Polars struggles with enums inside a struct inside a list. I don’t remember exactly what the error was, but I know I did not find a way to make it work. Instead, I’m simply using the pl.Categorical()
type, which doesn’t define the possible values ahead of time, but it’s more efficient than a string in the context where we have categories. pl.Categorical()
is a data type in Polars that optimizes memory usage and performance for columns with a limited set of unique values.
Once all types and mappings have been defined, we can combine everything into the final schema dictionary.
Parsing Each Feature
Now that we have the start and end goal, we have to transform one into the other. For some features, it will be quite easy as we can simply use the different mappings we created. For others, it will require a more comprehensive approach. For each feature, I chose to create a function that takes the raw data and returns the transformed data.
As previously, I’ll start gradually. Let’s go over resident_status
. It’s a simple enum, so we can just use the mapping we created.
There are a couple of things I should mention here: Result
, Ok
, Err
, @propagate_result
, and try_parse_int
. If you have ever worked with Rust before, this would be familiar to you. Result
is a type that represents the result of an operation that can either be a success or a failure; it’s an alternative to throwing exceptions. It allows you to know when an operation might fail, and it forces you to handle the result of the operation, as you cannot access the inner value without handling both scenarios.
Ok
and Err
are the two possible outcomes of the operation. @propagate_result
is a decorator that allows us to propagate the result of a function to the caller; it’s the equivalent of ?
in Rust. Let me explain: if the function returns an Ok
, the inner value is returned; otherwise, the function that called .up()
will return early with the error (as a Result
). If you want to know more about it, you can check out my library Resulty. Of course, you don’t have to use it, but after working so much with Rust, I really missed this way of handling errors.
try_parse_int
is a thin wrapper around the int()
function. It tries to parse an integer from a string. If it succeeds, it returns the parsed integer; otherwise, it returns an error.
Okay, let’s go back to the process_resident_status
function. I think it’s pretty self-explanatory: we simply use the value parsed into an int to get the corresponding enum value from the mapping dictionary. We return a tuple with the name of the column and the enum value.
Let’s move on to something a bit more interesting: process_age_recode_52
. It represents the age of the individual, recoded into different categories. I’m not a fan of having the dataset determine the different groupings of the data (there are multiple different recodes of age too, and they may not all be set). I made the choice to transform all the age-related fields into a common format: a range of Duration
(timestamps). We don’t know the exact age of the individual, which is why we use a range. Okay, let’s take a look at the function.
Okay, what’s going on here 😧? Each “category” is represented as a number from 1
to 52
, and each number may represent a different unit of time. So we need to first check which range the value falls into, and then convert it to a range of timestamps based on what we know about that particular unit (from the documentation). For example, if the value is between 23
and 26
, the difference between the value and the lower bound is the number of years. Yeah… it’s fun, isn’t it 🤧? Then you “simply” multiply it by the equivalent for this particular scale in milliseconds.
Processing an Entire Line
After defining all the individual functions to parse a particular feature, we simply have to call them for each feature in a given line. Thanks to the initial mapping we did, it’s actually quite intuitive.
The function takes an unprocessed line and returns a dictionary with the processed data. We just have to loop over the mapping dictionary, take each range and column name, and then slice the line based on the range and call the corresponding function to process the value.
So, let’s do that for every line and call it a day, right? Of course not 😫, or I wouldn’t have asked. The dataset is so long that this would take hours to process. Instead, we are going to be smarter about it and use the full power of our computer!
Parallelizing the Processing
What makes parallelism hard is when you have to share data between threads, or when no two threads can do a specific thing at the same time (there are more challenges, but that’s the main part). Here, each line is independent of the others, so we don’t need to synchronize anything or worry about race conditions.
The goal is simple: use all the threads available to process the dataset, as right now, only a single thread is used. It’s pretty straightforward; we just need to split the dataset by the number of threads we want to use and then create a process for each of them. Each process will do its thing, and we just have to wait for all of them to finish and concatenate the results.
Here, we simply split the dataset into what are called “batches”. Each batch is a list of lines that we will process in parallel.
Then we create a list that will hold the future results. And then we create a process pool executor with the maximum number of workers we want to use. We then loop over the batches and submit each batch to the pool. Each batch will be processed in parallel, and the results will be stored in the futures list.
You may be wondering what the manager
and _progress
variables are for. This is just so you can track the progress of each process. The manager
is a multiprocessing manager that allows you to share data between processes, and the _progress
variable is a dictionary that will be used to track the progress of each process. Simply put, each process will “report” its progress by updating the dictionary with the current progress, which we can then use in the main process to display the progress.
To wait and at the same time monitor the progress, we create a while
loop that will continue until all the futures are done. We then update the progress bar for each task, and finally, we concatenate the results and cast them to the correct schema. For everything related to logging, I’m using the rich library.
You probably noticed that I’m using cast
at the end, even though I said that the whole point of the schema is to avoid having to do that. Well, for some reason, using the schema=...
argument when creating each individual DataFrame resulted in issues when concatenating them. I may have done something wrong; if you know why, you can open a PR on the repo, and I’ll be happy to review it. That being said, we did not define the schema for nothing, because we use it anyway for the casting.
Conclusion
I hope this post was useful to you. This was a long post, and I’ve cut a lot of corners. The entire code and the dataset are available on GitHub. If you have any questions or suggestions, feel free to reach out to me! And thank you for reading!