How to make data pipelines idempotent
- What is an idempotent function
- Pre-requisites
- Why idempotency matters
- Making your data pipeline idempotent
- Conclusion
- Further reading
- References
What is an idempotent function
“Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application” - wikipedia
Defined as
f(f(x)) = f(x)
In the data engineering context, this can come to mean that: running a data pipeline multiple times with the same input will always produce the same output.
Pre-requisites
If you’d like to code along, please install the following:
Follow the steps below to clone the GitHub repo, sh into the container, and install pyarrow
and fastparquet
.
git clone https://github.com/josephmachado/idempotent-data-pipeline.git
cd idempotent-data-pipeline
docker pull amancevice/pandas
docker run -it -v $(pwd):/var/lib/pandas amancevice/pandas sh
pip install pyarrow # required for pandas to write to parquet
pip install fastparquet # required for pandas to write to parquet
# all the following commands are to be executed within this docker container
Why idempotency matters
Rerunning a data pipeline can create duplicate data or fail to remove stale data. Making a data pipeline idempotent can prevent these.
Let’s use an example to illustrate this. Say we have a data pipeline that
- Pulls parking violation data from a file (or S3, table, etc).
- Performs some transformations on it.
- Writes the data to a directory (or S3, table, etc) named
run date
and partitioned by the vehicle’sRegistration State
.
#!/usr/bin/env python3
import argparse
import os
import pandas as pd
def run_parking_violations_data_pipeline(
input_file: str, output_loc: str, run_id: str
) -> None:
df = pd.read_csv(input_file)
# your transformations
df.to_parquet(
os.path.join(output_loc, run_id), partition_cols=["Registration State"]
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--input-file",
type=str,
help="The input file",
)
parser.add_argument(
"--output-loc",
type=str,
help="The output folder",
)
parser.add_argument(
"--run-id",
type=str,
help="The day of run, in yyyymmdd format",
)
opts = parser.parse_args()
run_parking_violations_data_pipeline(
input_file=opts.input_file, output_loc=opts.output_loc, run_id=opts.run_id
)
The above script is available as parking_violation_data_pipeline.py
in the cloned Github repo
. We can run it inside the docker container, as shown below.
python parking_violation_data_pipeline.py --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls data/out/20210519/ # view contents of the output folder
Let’s assume that the next day, you realize that there are records with Registration state = 99
. You decide to filter it out and reprocess the previous day’s data with the new logic (aka backfill).
Make the following changes to filter out rows with Registration state = 99
.
df = pd.read_csv(input_file)
# your transformations
+ states_to_remove = ["99"]
+ df_fin = df[~df["Registration State"].isin(states_to_remove)]
+ df_fin.to_parquet(
os.path.join(output_loc, run_id), partition_cols=["Registration State"]
)
The script with this filter is available as parking_violation_data_pipeline_w_filter.py
in the cloned Github repo. Let’s rerun this data pipeline for 20210519
.
python parking_violation_data_pipeline_w_filter.py --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls -ltha data/out/20210519/ # data with registration state 99 still present; this is stale data from the previous run.
This is incorrect because the stale data generated from the previous run is still present.
Making your data pipeline idempotent
A common way to make your data pipeline idempotent is to use the delete-write
pattern. As the name implies, the pipeline will first delete the existing data before writing new data. Be very careful to only delete data that the data pipeline will re-create
. This can be done as shown below.
import os
+import shutil
def run_parking_violations_data_pipeline(
input_file: str, output_loc: str, run_id: str
) -> None:
+ output_path = os.path.join(output_loc, run_id)
+ if os.path.exists(output_path):
+ shutil.rmtree(output_path) # removes entire folder
df = pd.read_csv(input_file)
This idempotent script is available as parking_violation_data_pipeline_idempotent.py
in the cloned Github repo. Let’s rerun this data pipeline for 20210519
.
python parking_violation_data_pipeline_idempotent.py --input-file ./data/pv_2018_sample.csv --output-loc ./data/out --run-id 20210519
ls -ltha data/out/20210519/ | grep 99 # data with registration state 99 not present anymore; this is correct
exit # exit the docker container
The stale data has been removed and the data is now correct. For SQL-based transformations, you can follow a similar pattern, as shown below.
CREATE TEMP TABLE TEMP_YYYY_MM_DD
AS
SELECT c1,
c2,
SOME_TRANSFORMATION_FUNCTION(c3) as c3
FROM stage_table
WHERE day = 'yyyy-mm-dd';
-- note the delete-write pattern
DELETE FROM final_table
WHERE day = 'yyyy-mm-dd';
INSERT INTO final_table(c1, c2, c3)
SELECT c1,
c2,
c3
FROM TEMP_YYYY_MM_DD;
DROP TEMP TABLE TEMP_YYYY_MM_DD;
Note that the run-specific temporary table TEMP_YYYY_MM_DD
. This is to prevent table name collisions when multiple jobs (with different dates) are running simultaneously.
Most libraries and frameworks offer an overwrite option (e.g. Spark overwrite , Snowflake overwrite ) which is safer than deleting and writing.
Conclusion
Hope this article gives you a good understanding of why idempotence is crucial and how to make your data pipelines idempotent. To recap, idempotency
- Prevents duplicates
- Removes stale data
- Saves on data storage cost
When you are building your next data pipeline, make sure that they are idempotent. This will save you a lot of trouble when you have to rerun the data pipelines due to backfilling, failed runs, or other errors.
Further reading
References
If you found this article helpful, share it with a friend or colleague using one of the socials below!