Skip to content

Overall Algorithm Design

Ethan Steinberg edited this page Apr 25, 2024 · 6 revisions

Introduction

This document describes the core ETL algorithm used in MEDS-ETL’s C++ implementation.

The core problem we are trying to solve is that we have a flat version of our data that has one row per event and we need to join that data into a patient centric view that has one row per patient.

Our flat data look like the following, with one row per event:

patient_id | time | code | ….
—---------------------------------
1, January 1st, E11.4 ….
1, October 9th, J30.4 …..

Our desired patient nested data will have one row per patient with the following structure:

patient_id | events
—---------------------------------
1, [(January 1st, E11.4, …), (October 9th, J30.4, …)]

This is a complicated task for a variety of reasons:

  • We need to process more events than can fit in memory
  • Events between multiple patients are scattered across files, in arbitrary orders
  • We want to avoid dependencies on systems that require a lot of setup like PySpark / BigQuery
  • We need to support a large number of threads, with minimal contention

Solution

Luckily, there is a very simple and standard algorithm that can handle this quite nicely: a hash/merge join.

It involves three simple stages.

Stage 1: Sharding

We need to shard our initial dataset by patient_id, such that every patient is in one and only one shard.

This is important because if we set num_shards = num_threads, we can easily obtain perfectly scalable multithreading up to any number of cpus with no contention. Each thread handles one shard on a 1-1 basis, with no communication necessary between threads because no patients are split across shards.

Luckily, fair and correct sharding be achieved quite easily using hashing.

shard_index = hash(patient_id) % num_shards

In this stage we create a folder for every shard and then loop through every file, sending events to the appropriate shard according to patient_id.

Stage 2: Sorting

We now need to extract the patients from each shard. The problem is that events within each shard are not ordered and patients will in general be split across the shard. We need to consolidate all of each patient’s events together so that we can process each patient as a whole.

Luckily, this can be done quite simply by sorting the shard. This can be done easily with merge sort, which does not require storing the entirety of each shard in memory.

It’s important to note that this does not need to be a parallel merge sort as everything is already sharded so a simple and fast single thread merge sort is sufficient.

Stage 3: Collecting Patients

We now have a sharded dataset, where each shard is sorted by patient_id.

We can now collect the patients from each shard with a simple linear pass. As all the events for each patient will be grouped together linearly due to the sort, we can simply do a linear scan and process patient by patient. We greedily consume events one by one from the stream until we hit a new patient.

As before, this can be single threaded due to the fact the dataset is sharded.

Clone this wiki locally