Mortality Multiple Cause-of-Death Dataset: A Journey of Data Preprocessing

A comprehensive approach to data preprocessing, from mapping dictionaries to custom solutions

Once upon a time, there was a dataset—a dataset filled with mortality information of people who died in the US in a given year, including how they died, their lifestyle, the conditions they had, and so much more. But using this dataset was no easy task, as only the bravest among us could handle its complexity and size—okay, too much?

This introduction may have been a bit dramatic 🌝, but this dataset was truly a challenge to work with, and I wanted to share my journey of preprocessing it. “It”? Oh yeah, my bad. It’s the Mortality Multiple Cause-of-Death dataset, a comprehensive dataset provided by the National Center for Health Statistics (NCHS) at the Centers for Disease Control and Prevention (CDC) (this sentence sounds so fancy 😌🤚).

From the way the data is structured, to the different encodings, to missing values and errors in the documentation, you are in for a fun ride!

Our First Encounter with the Beast

If you have ever worked with a dataset before, you probably expect it to be structured in a certain way, from more loosely defined formats like CSV to more structured formats like JSON. However, this dataset is like no other. All rows have the same width, there are no separators, no headers, and the data is encoded with numbers and/or characters that have no clear meaning at first glance. You know what they say, a picture is worth a thousand words. Here is just one line of the dataset:

                  11                                          3101  M1085 432311  4M7                2022U7CN                                    C349093 027   08 0211C349 61F179                                                                                                                                                                   02 C349 F179                                                                                                 2                                 1000902                                                                                                                                                                                                                                                                                                                           730020068003

You can scroll, I can't even display a third of it on one line.

Damn, how can we make sense of this mess? Thankfully, they provide a documentation. I invite you to take a look at it, so you get a better understanding of what will come next.

By just looking at a glance at the documentation, we can see that each index (or range) in a row corresponds to a specific feature, and that the encoding of the data is different for each feature. For example, the Resident Status feature is at index 20 and is encoded as a number from 1 to 4 (inclusive). You may begin to see how much work will be involved in processing this dataset. But, it’s completely doable, as with any big task, let’s split it into smaller ones.

At a high level, we need to deal with the following

Mapping the Dataset’s Complex Structure

We need to create some kind of mapping between the dataset’s structure (the indexes) and the features. What I chose to do was to create a dictionary that maps the range of indexes to the features.

Defining the Dataset’s Schema

It’s pretty much the reverse; we need to define how we want to represent the final dataset. What will be the columns? Their types? And so on. If you come from a Pandas background, you may not be familiar with this concept, but in Polars, you can define the schema of the dataset beforehand, instead of casting after the fact. I personally find this more intuitive, and it also helps in having a better view of what the end goal is.

Parsing Each Line

Now that we have the start and end goal, we need to define how to transform one into the other. That’s where the fun part is (no 🤠). Each index may be encoded differently and sometimes requires way more than just a simple one-to-one mapping. On top of that, the dataset is really big. I’ll go more in-depth on this later, but TLDR, we will deal with parallelism (how exciting!).

Without further ado, let’s get to it!

Creating the Mapping Dictionary

The plan is to create a dictionary where the keys are ranges of indexes, and the values are the feature names. This step is not hard but is a bit tedious, and you really have to pay attention to the documentation, as there is no space between different features. One index away from the correct one will be a different feature. Here is a small snippet of the dictionary I created for the dataset.

mapping = {
    #
    (1, 18): "reserved_positions",
    #
    # 19. Record Type:
    #       - 1 = RESIDENTS (State and County of Occurrence and Residence are the same)
    #       - 2 = NONRESIDENTS (State and/or County of Occurrence and Residence are different)
    (19, 19): "record_type",
    #
    # 20. Resident Status:
    #       - 1 = RESIDENT (State and County of Occurrence and Residence are the same)
    #       - 2 = INTRASTATE NONRESIDENT (State of Occurrence and Residence are the same, but County is different)
    #       - 3 = INTERSTATE NONRESIDENT (State of Occurrence and Residence are different, but both are in the U.S)
    #       - 4 = FOREIGN RESIDENT (State of Occurrence is one of the 50 States or the District of Columbia, but Place of Residence is outside of the U.S)
    (20, 20): "resident_status",
    #
    (21, 62): "reserved_positions",
    #
    # ...
}

I made the choice to use an inclusive range for the indexes, as I find it more intuitive (especially when a feature is only one character). I added comments that are a direct copy and paste from the documentation, so that you can understand what each feature is and how it is encoded, all in the same place. Technically, you don’t have to take into account reserved_positions, but I wanted to be as explicit as possible.

Defining the Dataset’s Schema

After defining the mapping dictionary, we need to define the data type for each feature. In Polars, a schema is a dictionary that maps each feature to its type. The list of all types supported by Polars can be found here. There is actually another step we need to do, which is to define the mapping of the possible values in each feature to the corresponding type. Let’s see a couple of examples.

The injury_at_work feature is encoded as a single character that can be either Y, N, or U. The type we should use here is pretty straightforward: a nullable boolean.

injury_at_work = pl.Boolean()
injury_at_work_mapping = {"Y": True, "N": False, "U": None}

For all examples, assume that pl is the import alias for the Polars library.

Pretty straightforward, right? When parsing this feature, we will just have to use the value as the key in the mapping dictionary to get the corresponding type.

Let’s move on to another example, the manner_of_death feature. It’s not a string, as there is only a fixed set of possible values. Does that ring a bell? Yes! An enum. And it’s encoded as a number from 1 to 7 (inclusive).

manner_of_death_mapping = {
    1: "ACCIDENT",
    2: "SUICIDE",
    3: "HOMICIDE",
    4: "PENDING_INVESTIGATION",
    5: "COULD_NOT_DETERMINE",
    6: "SELF_INFLICTED",
    7: "NATURAL",
}
manner_of_death = pl.Enum([v for v in manner_of_death_mapping.values()])

Notice how I don’t repeat myself? I use the mapping to both, well, map, but also create the enum. In programming in general, you should try as much as possible to have a single source of truth.

I’ll end these examples with entity_axis_conditions, as it’s a bit more tricky. First, there can be 0 to 20 different conditions, and each condition is made up of multiple attributes. I chose to represent this as a list of structs.

certificate_part = pl.Enum(["PART_I", "PART_II"])
entity_axis_condition = pl.Struct(
    {
        "certificate_part": pl.Categorical(),
        "certificate_line": pl.Int8(),
        "condition": pl.Categorical(),
    }
)
entity_axis_conditions = pl.List(inner=entity_axis_condition)

You may be wondering why I did not use the enum I created. It looks like, as of this writing, Polars struggles with enums inside a struct inside a list. I don’t remember exactly what the error was, but I know I did not find a way to make it work. Instead, I’m simply using the pl.Categorical() type, which doesn’t define the possible values ahead of time, but it’s more efficient than a string in the context where we have categories. pl.Categorical() is a data type in Polars that optimizes memory usage and performance for columns with a limited set of unique values.

Once all types and mappings have been defined, we can combine everything into the final schema dictionary.

dataset_schema: Mapping[str, pl.PolarsDataType] = {
    "record_type": record_type,
    "resident_status": resident_status,
    "education": education,
    "month_of_death": month_of_death,
    "sex": sex,
    # ...
}

Parsing Each Feature

Now that we have the start and end goal, we have to transform one into the other. For some features, it will be quite easy as we can simply use the different mappings we created. For others, it will require a more comprehensive approach. For each feature, I chose to create a function that takes the raw data and returns the transformed data.

As previously, I’ll start gradually. Let’s go over resident_status. It’s a simple enum, so we can just use the mapping we created.

@propagate_result
def process_resident_status(
    value: str,
) -> Result[tuple[Literal["resident_status"], Optional[str]], str]:
    if value == "":
        return Ok(("resident_status", None))

    v = try_parse_int(value).up()
    mapped = resident_status_mapping.get(v)
    if mapped:
        return Ok(("resident_status", mapped))
    else:
        return Err(f"Invalid value for resident_status: {v}")

There are a couple of things I should mention here: Result, Ok, Err, @propagate_result, and try_parse_int. If you have ever worked with Rust before, this would be familiar to you. Result is a type that represents the result of an operation that can either be a success or a failure; it’s an alternative to throwing exceptions. It allows you to know when an operation might fail, and it forces you to handle the result of the operation, as you cannot access the inner value without handling both scenarios.

Ok and Err are the two possible outcomes of the operation. @propagate_result is a decorator that allows us to propagate the result of a function to the caller; it’s the equivalent of ? in Rust. Let me explain: if the function returns an Ok, the inner value is returned; otherwise, the function that called .up() will return early with the error (as a Result). If you want to know more about it, you can check out my library Resulty. Of course, you don’t have to use it, but after working so much with Rust, I really missed this way of handling errors.

try_parse_int is a thin wrapper around the int() function. It tries to parse an integer from a string. If it succeeds, it returns the parsed integer; otherwise, it returns an error.

def try_parse_int(s: str) -> Result[int, str]:
    try:
        i = int(s)
        return Ok(i)
    except ValueError:
        return Err(f"Could not parse '{s}' as an integer.")

Okay, let’s go back to the process_resident_status function. I think it’s pretty self-explanatory: we simply use the value parsed into an int to get the corresponding enum value from the mapping dictionary. We return a tuple with the name of the column and the enum value.

Let’s move on to something a bit more interesting: process_age_recode_52. It represents the age of the individual, recoded into different categories. I’m not a fan of having the dataset determine the different groupings of the data (there are multiple different recodes of age too, and they may not all be set). I made the choice to transform all the age-related fields into a common format: a range of Duration (timestamps). We don’t know the exact age of the individual, which is why we use a range. Okay, let’s take a look at the function.

@propagate_result
def process_age_recode_52(
    value: str,
) -> Result[tuple[Literal["age_recode_52"], Optional[tuple[int, int]]], str]:
    if value == "":
        return Ok(("age_recode_52", None))

    v = try_parse_int(value).up()
    if 1 <= v <= 51:
        if v == 1:
            low = 0
            upper = HOURS_AS_MS
        elif v == 2:
            low = HOURS_AS_MS
            upper = 23 * HOURS_AS_MS
        elif 3 <= v <= 9:
            i = v - 2
            low = i * DAYS_AS_MS
            upper = i * DAYS_AS_MS
        elif 10 <= v <= 11:
            i = v - 10
            low = (14 + 7 * i) * DAYS_AS_MS
            upper = (13 + 7 * (i + 1)) * DAYS_AS_MS
        elif 12 <= v <= 22:
            i = v - 11
            low = i * MONTHS_AS_MS
            upper = i * MONTHS_AS_MS
        elif 23 <= v <= 26:
            i = v - 22
            low = i * YEARS_AS_MS
            upper = i * YEARS_AS_MS
        elif 27 <= v <= 50:
            i = v - 27
            low = (5 + 5 * i) * YEARS_AS_MS
            upper = (4 + 5 * (i + 1)) * YEARS_AS_MS
        elif v == 51:
            low = 125 * YEARS_AS_MS
            upper = 999 * YEARS_AS_MS
        else:
            return Err(f"Invalid value for age_recode_52: {v}")
        return Ok(("age_recode_52", (low, upper)))
    elif v == 52:
        return Ok(("age_recode_52", None))
    else:
        return Err(f"Invalid value for age_recode_52: {v}")

Okay, what’s going on here 😧? Each “category” is represented as a number from 1 to 52, and each number may represent a different unit of time. So we need to first check which range the value falls into, and then convert it to a range of timestamps based on what we know about that particular unit (from the documentation). For example, if the value is between 23 and 26, the difference between the value and the lower bound is the number of years. Yeah… it’s fun, isn’t it 🤧? Then you “simply” multiply it by the equivalent for this particular scale in milliseconds.

Processing an Entire Line

After defining all the individual functions to parse a particular feature, we simply have to call them for each feature in a given line. Thanks to the initial mapping we did, it’s actually quite intuitive.

@propagate_result
def process_line(line: str) -> Result[OrderedDict[str, Any], str]:
    d = OrderedDict()
    for (start, end), field in mapping.items():
        value = line[start - 1 : end].strip()
        if field == "reserved_positions":
            continue
        elif field == "record_type":
            k, v = process_record_type(value).up()
            d[k] = v
        elif field == "resident_status":
            k, v = process_resident_status(value).up()
            d[k] = v
        elif field == "education":
            k, v = process_education(value).up()
            d[k] = v
        # ...

    return Ok(d)

The function takes an unprocessed line and returns a dictionary with the processed data. We just have to loop over the mapping dictionary, take each range and column name, and then slice the line based on the range and call the corresponding function to process the value.

So, let’s do that for every line and call it a day, right? Of course not 😫, or I wouldn’t have asked. The dataset is so long that this would take hours to process. Instead, we are going to be smarter about it and use the full power of our computer!

Parallelizing the Processing

What makes parallelism hard is when you have to share data between threads, or when no two threads can do a specific thing at the same time (there are more challenges, but that’s the main part). Here, each line is independent of the others, so we don’t need to synchronize anything or worry about race conditions.

The goal is simple: use all the threads available to process the dataset, as right now, only a single thread is used. It’s pretty straightforward; we just need to split the dataset by the number of threads we want to use and then create a process for each of them. Each process will do its thing, and we just have to wait for all of them to finish and concatenate the results.

@propagate_result
def parallel_process_lines(
    path: Path,
) -> Result[pl.DataFrame, str]:
    lines = load_file(path)

    # Evenly distribute the lines to each process
    lines_per_process = len(lines) // MAX_PARALLEL_PROCESSING
    lines_per_process = max(1, lines_per_process)

    # Split the lines into batches
    batches = []
    for i in range(0, len(lines), lines_per_process):
        batch = lines[i : i + lines_per_process]
        batches.append(batch)

    print(f"Processing {len(lines)} lines in {len(batches)} batches")
    print(f"{MAX_PARALLEL_PROCESSING} parallel processes will be used")

    # ...

Here, we simply split the dataset into what are called “batches”. Each batch is a list of lines that we will process in parallel.

    futures: list[
        tuple[Future[Result[pl.DataFrame, str]], TaskID]
    ] = []  # keep track of the jobs

    with multiprocessing.Manager() as manager:
        _progress = manager.dict()
        overall_progress_task = progress.add_task(
            "> Processing lines", total=len(lines)
        )

        with concurrent.futures.ProcessPoolExecutor(
            max_workers=MAX_PARALLEL_PROCESSING
        ) as pool:
            for i, batch in enumerate(batches):
                batch_task = progress.add_task(
                    f"> > Processing batch {i + 1}", total=len(batch)
                )
                future = pool.submit(process_lines_batch, batch, _progress, batch_task)
                futures.append((future, batch_task))
    # ...

Then we create a list that will hold the future results. And then we create a process pool executor with the maximum number of workers we want to use. We then loop over the batches and submit each batch to the pool. Each batch will be processed in parallel, and the results will be stored in the futures list.

You may be wondering what the manager and _progress variables are for. This is just so you can track the progress of each process. The manager is a multiprocessing manager that allows you to share data between processes, and the _progress variable is a dictionary that will be used to track the progress of each process. Simply put, each process will “report” its progress by updating the dictionary with the current progress, which we can then use in the main process to display the progress.

            # monitor the progress:
            while (n_finished := sum([future.done() for future, _ in futures])) < len(
                futures
            ):
                progress.update(
                    overall_progress_task, completed=n_finished, total=len(futures)
                )
                for task_id, update_data in _progress.items():
                    latest = update_data["progress"]
                    total = update_data["total"]
                    # update the progress bar for this task:
                    progress.update(
                        task_id,
                        completed=latest,
                        total=total,
                        visible=latest < total,
                    )

            results = [future.result() for future, _ in futures]

    results = [r.unwrap_or_bubble() for r in results]
    df = pl.concat(results).cast(dataset_schema)  # type: ignore

    return Ok(df)

To wait and at the same time monitor the progress, we create a while loop that will continue until all the futures are done. We then update the progress bar for each task, and finally, we concatenate the results and cast them to the correct schema. For everything related to logging, I’m using the rich library.

You probably noticed that I’m using cast at the end, even though I said that the whole point of the schema is to avoid having to do that. Well, for some reason, using the schema=... argument when creating each individual DataFrame resulted in issues when concatenating them. I may have done something wrong; if you know why, you can open a PR on the repo, and I’ll be happy to review it. That being said, we did not define the schema for nothing, because we use it anyway for the casting.

Conclusion

I hope this post was useful to you. This was a long post, and I’ve cut a lot of corners. The entire code and the dataset are available on GitHub. If you have any questions or suggestions, feel free to reach out to me! And thank you for reading!