Data Pipeline Design Patterns - #2. Coding patterns in Python
- Introduction
- Sample project
- Code design patterns
- Python helpers
- Misc
- Conclusion
- Further reading
- References
Introduction
Using the appropriate code design pattern can make your code easy to read, extensible, and seamless to modify existing logic, debug, and enable developers to onboard quicker. If you have wondered
What design patterns do people follow when writing code for a typical data pipeline?
What do people mean by abstract and concrete implementation
Why do data engineers prefer writing functional code?
What do the terms
Factory, Strategy, Singleton, Object pools
mean, and when to use them?
Then this post is for you. This post will cover the typical code design patterns for building data pipelines. We will learn about the pros and cons of each design pattern, when to use them, and, more importantly, when not to use them.
By the end of this post, you will have an overview of the typical code design patterns used for building data pipelines.
Note that this is part 2 of the data design pattern series; The first article can be found here Data Pipeline Design Patterns - #1. Data flow patterns
Sample project
To demonstrate code design patterns, we will build a simple ETL project that lets us pull data from Reddit, transform it and store it in a sqlite3 database.
The completed project is available at github/com/josephmachado/socialetl with setup instructions .
We will start with a simple ETL script and refactor it based on new requirements, which will be introduced in each section. By the end of the post, you should be able to identify the common data pipeline design patterns and when to & when not to use them.
Code design patterns
Let’s start with a simple Reddit etl script, which we will refactor throughout this post.
# social_etl.py
# for complete code check out https://github.com/josephmachado/socialetl
import praw
import os
import sqlite3
REDDIT_CLIENT_ID='replace-with-your-reddit-client-id'
REDDIT_CLIENT_SECRET='replace-with-your-reddit-client-secret'
REDDIT_USER_AGENT='replace-with-your-Reddit-user-agent'
def extract():
client = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=REDDIT_USER_AGENT,
)
subreddit = client.subreddit('dataengineering')
top_subreddit = subreddit.hot(limit=100)
data = []
for submission in top_subreddit:
data.append(
{
'title': submission.title,
'score': submission.score,
'id': submission.id,
'url': submission.url,
'comments': submission.num_comments,
'created': submission.created,
'text': submission.selftext,
}
)
return data
def transform(data):
"""
Function to only keep outliers.
Outliers are based on num of comments > 2 standard deviations from mean
"""
num_comments = [post.get('comments') for post in data]
mean_num_comments = sum(num_comments) / len(num_comments)
std_num_comments = (
sum([(x - mean_num_comments) ** 2 for x in num_comments])
/ len(num_comments)
) ** 0.5
return [
post
for post in data
if post.get('comments') > mean_num_comments + 2 * std_num_comments
]
def load(data):
# create a db connection
conn = sqlite3.connect('./data/socialetl.db')
cur = conn.cursor()
try:
# insert data into DB
for post in data:
cur.execute(
"""
INSERT INTO social_posts (
id, source, social_data
) VALUES (
:id, :source, :social_data
)
""",
{
'id': post.get('id'),
'score': post.get('score'),
'social_data': str(
{
'title': post.get('title'),
'url': post.get('url'),
'comments': post.get('num_comments'),
'created': post.get('created'),
'text': post.get('selftext'),
}
),
},
)
finally:
conn.commit()
conn.close()
def main():
# pull data from Reddit
data = extract()
# transform reddit data
transformed_data = transform(data)
# load data into database
load(transformed_data)
if __name__ == '__main__':
main()
1. Functional design
As a DE, you might have heard people say, “write functional code” let’s break down what it means.
- Atomicity: A function should only do one task.
- Idempotency: If you run the code multiple times with the same input, the output should be the same. In the case of storing the output in an external data store, the output should not be duplicated.
- No side effects: A function should not affect any external data (variable or other) besides its output.
Note: Additional FP concepts higher order functions , functional composition , & referential transparency .
Let’s examine the load
method from the code above.
- Atomicity: No, because it does two things, manages a database connection and loads data into a DB. We can use a Dependency Injection technique to accept the DB connection as an input to the
load
function. - Idempotency: No, because it inserts all the data into the
social_posts
table. If the input to theload
function has duplicates or theload
function is accidentally run twice, duplicates will be inserted into thesocial_posts
table. We can prevent this using anUPSERT
, which will update or insert a record depending on if it’s already present (identified by a key) or not respectively. - No side effects:
Load
function has no side effects. However, note that when theload
function starts accepting DB connection as an input parameter (dependency injection), we should not close it within theload
function since that will affect the state of a variable external to theload
function.
Let’s look at what a functional load
function will look like:
def load(social_data, db_conn) -> None:
logging.info('Loading twitter data.')
if db_conn is None:
raise ValueError(
'db_cursor is None. Please pass a valid DatabaseConnection'
' object.'
)
cur = db_conn.cursor()
try:
for post in social_data:
cur.execute(
"""
INSERT OR REPLACE INTO social_posts (
id, source, social_data
) VALUES (
:id, :source, :social_data
)
""",
{
'id': post.id,
'source': post.source,
'social_data': str(asdict(post.social_data)),
},
)
finally:
cur.close()
db_conn.commit()
Note: If you think shouldn’t the cursor be created outside the load
function, then you are on the right path. We will see how to do this effectively in the context managers
section.
2. Factory pattern
Let’s assume we have to design data pipelines to pull data from Twitter, Mastodon, Linkedin, etc. All of these social data pipelines follow a similar pattern. We can use the Factory pattern to create a uniform interface for pulling social data. Let’s see how the factory pattern works:
- Use a factory pattern when multiple pipelines follow a similar pattern. E.g., if you want to add etl for Twitter, mastodon, Linkedin, etc.
- A factory will be responsible for creating the appropriate (Reddit, Twitter, or mastodon) etl object. The code that calls the factory will use that elt object, unaware of its internal implementation.
- Prevents complex if..else statements that are hard to manage and provides a standard interface to interact with multiple similar pipelines.
- You can define a standard set of methods that all the ETL classes must implement. The common method’s names and signatures (inputs, & outputs) are called abstract interfaces since that defines how we interact with any ETL implementing the standard (e.g., see SocialETL class ). The actual implementation is called the concrete implementation (e.g., RedditETL )
Let’s see how we can add an etl factory to create Twitter and Reddit pipelines. See complete code here .
import os
from abc import ABC, abstractmethod # python module to define abstract interfaces
# Abstract class with abstract methods
class SocialETL(ABC):
@abstractmethod
def extract(self, id, num_records, client):
pass
@abstractmethod
def transform(self, social_data):
pass
@abstractmethod
def load(self, social_data, db_conn):
pass
@abstractmethod
def run(self, db_conn, client, id, num_records):
pass
# Concrete implementation of the abstract Class
class RedditETL(SocialETL):
def extract(self, id, num_records, client):
# code to extract reddit data
def transform(self, social_data):
# code to transform reddit data
def load(self, social_data, db_conn):
# code to load reddit data into the final table
def run(self, db_conn, client, id, num_records):
# code to run extract, transform and load
# Concrete implementation of the abstract Class
class TwitterETL(SocialETL):
def extract(self, id, num_records, client):
# code to extract reddit data
def transform(self, social_data):
# code to transform reddit data
def load(self, social_data, db_conn):
# code to load reddit data into the final table
def run(self, db_conn, client, id, num_records):
# code to run extract, transform and load
# This "factory" will acccept an input and give you the appropriate object that you can use to perform ETL
def etl_factory(source):
factory = {
'Reddit': (
praw.Reddit(
client_id=os.environ['REDDIT_CLIENT_ID'],
client_secret=os.environ['REDDIT_CLIENT_SECRET'],
user_agent=os.environ['REDDIT_USER_AGENT'],
),
RedditETL(),
),
'Twitter': (
tweepy.Client(bearer_token=os.environ['BEARER_TOKEN']),
TwitterETL(),
),
}
if source in factory:
return factory[source]
else:
raise ValueError(
f"source {source} is not supported. Please pass a valid source."
)
# calling code
client, social_etl = etl_factory(source)
social_etl.run(db_conn, client, ...)
Separation of responsibility (definition, creation, & use) is crucial for code maintenance and testability and prevents modifying code in multiple places in case of a new feature. Note that even though we use classes, we still follow functional principles at a function level.
Pros:
- If you have multiple similar etls, a factory can significantly improve code consistency and make it much easier to evolve your pipelines.
- Using factory patterns to establish connections to external systems will enable easier testing. E.g., a db factory allows you to use sqllite3 for dev testing and pg for prod easily. You can also put your spark session behind a factory to use less executor in dev and higher mem setting in prod, etc.
Cons:
- If you use the factory method to define data pipelines that are inherently different (e.g., ETL v ELT or API data pulls vs. S3 -> S3 data transfer, etc.), it will make the code highly complex and brittle. Only use a factory if the underlying data pipelines have a similar structure.
- Using the factory method when there are only one or two data pipelines (without any signs of more data pipelines) is premature optimization and can potentially slow down development due to abstract interface limiting development velocity.
3. Strategy pattern
Our initial transformation function used standard deviation to identify outliers (see transform function ). Let’s assume we want to have the option to choose from multiple transformation functions, such as randomly selecting a few posts or not applying any transformations based on inputs to the pipeline.
We can use a Strategy Pattern
, which allows our code to choose one way of transformation among multiple methods of transformations (aka chose one strategy among various strategies). Seen below are some strategies for transforming the data pulled from either Reddit or Twitter
import random
import logging
def no_transformation(social_data):
logging.info('No transformation applied.')
return social_data
def random_choice_filter(social_data):
logging.info('Randomly choosing 2 social media data points.')
return random.choices(social_data, k=2)
def standard_deviation_outlier_filter(social_data):
# code to only keep standard deviation based outlier
return filtered_data
# a factory to return the appropriate transformation function
def transformation_factory(value):
factory = {
'sd': standard_deviation_outlier_filter,
'no_tx': no_transformation,
'rand': random_choice_filter,
}
return factory[value]
class Reddit(SocialETL):
def transform(self, social_data, transform_function):
""" Function to transform reddit data, by only keeping the
posts with number of comments greater than 2 standard deviations
away from the mean number of comments.
Args:
social_data (List[RedditPostData]): List of reddit post data.
Returns:
List[RedditPostData]: Filtered list of reddit post data.
"""
logging.info('Transforming reddit data.')
return transform_function(social_data)
def run(self, db_cursor_context, client, transform_function, id = 'dataengineering', num_records = 100):
""" Function to run the ETL pipeline.
Args:
db_cursor_context (DatabaseConnection): Database connection.
client (praw.Reddit): Reddit client.
id (str): Subreddit to get data from.
num_records (int): Number of records to get.
"""
logging.info('Running reddit ETL.')
self.load(
social_data=self.transform(
social_data=self.extract(
id=id, num_records=num_records, client=client
),
transform_function=transform_function,
),
db_cursor_context=db_cursor_context,
)
# other methods
# Calling code
transformation = 'sd' # 'no_tx' or 'rand'
client, social_etl = etl_factory(source)
db = db_factory()
social_etl.run(
db_cursor_context=db.managed_cursor(),
client=client,
transform_function=transformation_factory(transformation),
)
Note how the calling code injects a transformation function based on the transformation
variable. The key idea is to have transformation
functions with the same input and output parameters, which makes them switchable. It can be hard to understand which function was executed without proper logging.
4. Singleton, & Object pool patterns
Use a singleton pattern
when your program should only have one object of a class for the entirety of its run. The singleton pattern is commonly used in database connections, logs, etc. However, it can make testing incredibly difficult since all your tests can only use one object. If not designed without sufficient guardrails, one can create multiple instances of singleton classes in Python. In general, Singleton is considered anti pattern
.
A pattern that builds on Singleton is called an Object pool pattern
. In this pattern, instead of being able to use only one single object, you can use an object from a pool of objects. The pool size is set depending on the use cases. Object pool pattern is commonly seen in applications that have multiple incoming requests and need to communicate with the database quickly(e.g., backend apps, stream processing). Having a pool of db connections allow incoming requests to communicate with the DB, without having to create a new connection(takes longer) or having to wait for a singleton object to finish serving other requests. E.g., Psycopg2 connection pool
. However, note that the connections must be returned to their initial state after use and before returning to the pool.
More often than not, in batch data pipeline applications, it is better to have a factory method to create a DB connection since this will give you the flexibility to set your config depending on the environment, and you will not have to deal with cleaning up connections to be returned to a pool, etc.
Checkout our DB connection factory here
Reader exercise: Create an abstract interface for DB connection and create a DB class for Postgres. Please feel free to put up a PR in this repo .
Python helpers
Python provides tools for working with data, checking types, and encapsulating common functionality. In this section, we will go over a few important ones which can significantly improve your code.
1. Typing
Although Python is a dynamic language, having type hints + a type checker (e.g., mypy) can save you from multiple type incompatibility issues at runtime.
E.g., here is the extract
method from the RedditETL
class that shows the expected input & output types.
from typing import List
class RedditETL(SocialETL):
def extract(
self,
id: str,
num_records: int,
client: praw.Reddit,
) -> List[SocialMediaData]:
# Code
We can see that the extract
method expects three inputs id (type string), num_records (type integer), and client (type praw.Reddit) and returns a list of SocialMediaData (Class we will see in the dataclass
section).
Python functions can accept inputs and produce outputs which are functions themselves (aka higher order functions). We can also define function types as shown below.
from typing import Callable, List
# code
def transformation_factory(value: str) -> Callable[[List[SocialMediaData]], List[SocialMediaData]]:
factory = {
'sd': standard_deviation_outlier_filter,
'no_tx': no_transformation,
'rand': random_choice_filter,
}
return factory[value]
Callable[ [List[SocialMediaData]], List[SocialMediaData]]
represents a function.
- First parameter: Defines the type of inputs to the function (
[List[SocialMediaData]]
). The first part ofCallable
is a list to account for multiple possible inputs. - Second parameter: Defines the output type of the function (
List[SocialMediaData]
).
Mypy
is a static type checker for Python. We can run mypy
(as shown here
) to verify that our code respects the types we have defined. E.g., If we call a function with a different type than what it expects, mypy will throw an error.
2. Dataclass
Dataclasses are designed to store data as objects of the data class.
Pros:
- Designed to represent data in Python.
- Has type hints and enables code completion with IDEs.
- Enables setting default values.
- Ability to add custom processing of data on object creation. See post-init .
- Ability to emulate immutability with frozen dataclasses .
- Ability to inherit from other data classes .
Cons:
- Dataclasses are regular classes and include a slight overhead during creation.
- It is overkill for simple dictionary-based data. E.g. [{‘name’: ‘Tanya’}, {‘name’: ‘Sophia’}]
In our code, we use Dataclass to represent the data we get from Reddit and Twitter.
from dataclasses import asdict, dataclass
@dataclass
class RedditPostData:
"""Dataclass to hold reddit post data.
Args:
title (str): Title of the reddit post.
score (int): Score of the reddit post.
url (str): URL of the reddit post.
comms_num (int): Number of comments on the reddit post.
created (str): Datetime (string repr) of when the reddit
post was created.
"""
title: str
score: int
url: str
comms_num: int
created: str
text: str
@dataclass
class TwitterTweetData:
"""Dataclass to hold twitter post data.
Args:
text (str): Text of the twitter post.
"""
text: str
@dataclass
class SocialMediaData:
"""Dataclass to hold social media data.
Args:
id (str): ID of the social media post.
text (str): Text of the social media post.
"""
id: str
source: str
social_data: RedditPostData | TwitterTweetData # social_data can be one of the Reddit or Twitter data types
3. Context Managers
When establishing a connection to an external system, such as a database, file system, etc., we should clean up the connection when done (or on an error) to prevent memory leaks and to free up system resources.
In our example, we need to handle closing our database connection when the load is done (or in case it errors out). Typically this is done with a try..except..finally
block, but this creates duplicate code wherever we connect to our database.
Instead, we can create context managers, which are called using with
blocks. We will create a context manager for our DB connections which automatically close on success or failure. We can define a context manager using the contextmanager
decorator, as shown below.
from contextlib import contextmanager
class DatabaseConnection:
def __init__(
self, db_type: str = 'sqlite3', db_file: str = 'data/socialetl.db'
) -> None:
"""Class to connect to a database.
Args:
db_type (str, optional): Database type.
Defaults to 'sqlite3'.
db_file (str, optional): Database file.
Defaults to 'data/socialetl.db'.
"""
self._db_type = db_type
self._db_file = db_file
@contextmanager
def managed_cursor(self) -> Iterator[sqlite3.Cursor]:
""" Function to create a managed database cursor.
Yields:
sqlite3.Cursor: A sqlite3 cursor.
"""
if self._db_type == 'sqlite3':
_conn = sqlite3.connect(self._db_file)
cur = _conn.cursor()
try:
yield cur
finally:
_conn.commit()
cur.close()
_conn.close()
def __str__(self) -> str:
return f'{self._db_type}://{self._db_file}'
db = DatabaseConnection()
with db.managed_cursor() as cur: # cursor and connection are open
cur.execute("YOUR SQL QUERY")
# cursor and connection are close
When we use the with db.managed_cursor() as cur
clause, the code
- Runs code in
managed_cursor
method up to theyield cur
line. - Makes the cursor available as
cur
inside thewith
block. - After the
with
block, the code control is returned to the finally part of themanaged_cursor
method, committing the changes and closing the connection.
4. Testing with pytest
Testing is critical to ensure that our code does what we expect it to and prevent errors when we modify our code. Placing all our test scripts under the tests
folder will help keep tests separate from pipeline code.
In our project, we will use pytest
to run tests. Let’s go over some critical concepts for testing
.
Test scripts
: While we can create one test script and test all our functions in there, it’s best practice to have one test file per pipeline (e.g.,test_twitter.py
&test_reddit.py
).Pytest
uses file and function names (prefaced withtest_
) to identify test functions (see this link for more details ).Schema setup
: When testing locally, it’s ideal to create (set up) and drop (teardown) tables per test run. Recreating tables each time you run all the tests in your test directory will enable you to write more straightforward tests (e.g., if you are testing if a function inserts ten rows into a table without a teardown of the tables, the next run will show 20 rows). When setting up schema (or fake data) for testing, we can create them at different scopes. The different scopes are- Session level: The setup is run once before we start testing all the test files, and the teardown is run once after all the testing is done.
- Class level: The setup and teardown are before and after each Class (usually named
class Testxyz
). - Function level: The setup and teardown are before and after each function (usually named
test_
).
Fixtures
: When testing, we don’t want to hit external API (Reddit/Twitter) as it may be costly and may return different data each time, making testing almost impossible. We can create static data to replicate data from external systems, called fixtures. One way to use a fixture withPytest
is to create functions (mock_twitter_data & mock_reddit_data ) that return the static data and add them as inputs (dependency injection) to the functions where we need to use them (e.g., test_transform ).Mocking functions
: When we run tests, we might want to override the behavior of a function. In such cases, we can usemocker
to override the behavior of a function. E.g., We want to return a db connection to our testdb, wherever we call thedb_factory
function. We can define this using mocker, as shown here . Note that we define mocks for db_factory twice, this is because when we mock a function we need to specify the location that it is used at, not where it is defined, this allows us the flexibility to mock function based on where they are used. We use asession_mocker
to specify that the mock applies to the entire session (there is amocker
for non-session level mocking).
We can run tests as shown below.
python -m pytest --log-cli-level info -p no:warnings -v ./tests
Note we tell pytest to print all info level logging, ignore warnings and look for test scripts under the tests
folder.
conftest.py is used to define fixtures for an entire directory (tests dir in our case). In our conftest.py, we define a mock_social_posts_table fixture function and set it to be run once per session. When we run pytest, the setup part (pre-yield statement) is run, then all out tests are executed, and then the teardown part (post-yield statement) is run.
5. Decorators
Decorators add functionality to other functions. E.g.,
def log_metadata(func):
def log_wrapper():
print(f'Adding functionality to {func.__name__}')
# do some other thing
return func()
return log_wrapper
@log_metadata
def some_func():
print('do some thing')
In the above example, we add functionality to the some_func
function, by decorating it with the log_metadata
function. The log_metadata
function can be used as a decorator for multiple functions.
In our project, we use log_metadata
to log information about the function name, the input arguments, and dump it into a log_metadata
table for tracking data lineage. The inspect
module helps us identify objects and their input parameters.
Storing the inputs to each function, the time it was run, and the name of the function, help us debug issues with our pipelines (aka Data lineage).
Misc
In addition to the above, here are a few more tips to help you
- Project structure: A consistent project structure makes your imports sensible and helps you easily navigate the code base. This python guide is a great starting point.
- Naming: Follow a naming standard for your code. Google style guide is a great place to start.
- Automated formatting, lint checks, & type checks: Consistent code style helps you and your team with reading and understanding the code base and reduces nit comments on PRs. Automate them with black, isort, flake8, and mypy, see Makefile for usage.
- Makefile: Make commands allow you to define aliases for longer commands. In our project, the Makefile helps us run tests, lint & type checks, formatting, run etls, and connect to our database. Most terminals also support tab completion for make commands defined in your Makefile.
- Githooks: While we can run the make commands before creating a PR, we can take this a step further by automating running these with githooks. We can use a
pre-commit
git hook to run automated checks every time you create a commit. See setup instructions to set up a pre-commit hook. - dotenv: We use the dotenv module to read config variables from .env file
and load them into the OS environment without having to set them at the OS level. This enables us to have separate
.env
files when running locally vs in production. - Reproducable environment: We use venv to create reproducable environments. We can also use docker to do the same, but it is optional for a simple project.
Conclusion
This article covered the commonly used design patterns and python techniques used to write clean data pipelines. To recap, we saw how
- Functional data pipelines produce consistent outputs on re-runs and lead to easily testable code.
- Factory patterns can create standard interfaces for similar pipelines, making using/switching them simple.
- Strategy pattern allows the code to choose from the multiple data processing options at runtime.
- Singleton pattern only allows the creation of one class object. Object pools offer a pool of objects that the program can reuse.
- In batch data pipelines, factory pattern connecting to external systems allows for easy testability.
- Python functionality like type checks prevent runtime type issues, dataclasses help store and process data effectively, context managers handle closing connections and prevent memory leaks, and decorators enrich the functionality of other functions.
- Setting up tests to ensure that the code you write does what you expect it to do.
Note that the design patterns are suggestions and not absolutely required. When designing data pipelines, one must ask themselves if a design pattern can help keep their code clean, now and in the future; if the answer is a no or a maybe, it is better to implement a design pattern only when needed. E.g., if you only have two pipelines (and don’t see it increasing), you don’t need a factory pattern.
If you have any questions or comments, please leave them in the comment section below. If you have any issues with running the code, please open a GitHub issue here .
Further reading
References
If you found this article helpful, share it with a friend or colleague using one of the socials below!
Hey Joseph, thanks for this post! On the functional design section, where you examine the
load
method, I have to disagree that there is no side effect. There is a side effect, and the entire function is indeed side effecting because it involves I/O. From the functional programming side of things, a side effect is anything that isn’t a function’s main effect (i.e. its return value). Affecting variables outside a function’s scope is a side effect, as is I/O.In terms of ETL, the E and L steps would necessarily involve side effects because of the above explanation. The T step may be written to use only pure functions, although it could have its own I/O if intermediate results are written to disk (which we can ignore in this discussion).
Hey, That is a good point, you are right! I will include this in the post. Thank you for letting me know!
Awesome stuff, I recently found your site and wow, what a wealth of data engineering information! Keep up the good work, Joseph! I also used similar patterns that you mentioned above as well(Factory, etc..)
Though in our case, we went more granular, we had around 50 classes(each representing a DB table) that implemented the same interface that we called "Constructor"
We abstracted the ETL portion as their own classes/interfaces. E(Redshift, sqlite, etc...); T(50 corresponding to each Builder ); L(Redshift, sqlite, etc...), which would then all be dynamically constructed together in our main function.
e.g. Orchestrator --pass Builder name, extract source, load destination--> main.py --call builder factory, extract factory, load factory--> invokes Builder(Extract, Transform, Load)
iirc, the reason we didn't abstract the Transform logic was due to some column rename logic functionality did for every table, where it is specific to each individual table.
-- comment from James Im
Very useful and well organized! Thanks for creating it.
nice stuff!
Really impressive with this one! Thank you. I appreciate it.
Great stuff, Everything in one place.
aweson content, thank you.