Should Data Pipelines in Python be Function based or Object-Oriented (OOP)?
- 1. Introduction
- 2. Data transformations as functions lead to maintainable code
- 3. Objects help track things (aka state)
- 4. Class lets you define reusable code and pipeline patterns
- 5. Functional code uses objects via Dependency Injection
- 6. Conclusion
- 7. Recommended reading
1. Introduction
As a data engineer, you would have spent hours trying to figure out the right place to make a change in your repository—I know I have.
You think, “Why is it so difficult to make a simple change?".
You push a simple change (with tests, by the way), and suddenly, production issues start popping up!
Dealing with on-call issues when your repository is spaghetti code with multiple layers of abstracted logic is a special hell that makes data engineers age in dog years!
Messy code leads to delayed feature delivery and slow debug cycles, which lowers work satisfaction and delays promotions!
Bad code leads to a bad life
If this resonates with you, know that you are not alone. Every day, thousands of data engineers deal with bad code and, with the best intentions, write messy code.
Most data engineers want to write good code, but the common SWE patterns don’t translate easily to data processing patterns, and there aren’t many practical examples that illustrate how to write clean data pipelines.
Imagine a code base where every engineer knows where to look when something breaks, even if they have never worked on that part of the code base before. Imagine knowing intuitively where a piece of logic would be and quickly figuring out the source of any issue.
That is what this article helps you do!
In this post, I explain how to combine functions and OOP patterns in Python to write pipelines that are easy to maintain/debug. By the end of this post, you will have a clear picture of when and how to use functions and OOP effectively to make your (and your colleagues’) life easy.
Pre-read
This article will assume that you know the following
Note: We follow some functional programming patterns, but pure functional programming is not entirely possible in Python. See this Reddit thread for discussions on this.
TL;DR
: Data transformations as functions and connections to external systems as OOP.
2. Data transformations as functions lead to maintainable code
Data transformations are a series of changes applied to input(s). Functions best capture this nature of data transformations. Each function should ideally be idempotent and atomic and not change any data but only apply transformation to inputs.
Creating data transformations as functions makes the code:
- Easy to maintain and debug.
- Easy to test.
- Represent the reality of transforming data in a series of steps (represented as functions).
- Simpler to run in parallel (if your transformations are independent of historical data) with different inputs.
Let’s look at an example of a pipeline that accepts some data and transforms data with functions:
def get_monthly_sales(orders: pl.DataFrame) -> pl.DataFrame:
return (
orders
.with_columns(pl.col("order_date").dt.truncate("1mo").alias("month"))
.group_by(["month", "product_id", "region_id"])
.agg([
pl.col("sales_amount").sum().alias("total_sales"),
pl.col("order_id").count().alias("order_count")
])
)
def get_regional_sales(monthly_sales: pl.DataFrame) -> pl.DataFrame:
return (
monthly_sales
.group_by(["month", "region_id"])
.agg([
pl.col("total_sales").sum().alias("regional_sales"),
pl.col("order_count").sum().alias("regional_orders")
])
)
# Data transformations are applied as a series of individual transformations
monthly_sales = get_monthly_sales(orders)
regional_sales = get_regional_sales(monthly_sales)
# And so on
It would be simple to create tests for them, as shown below:
@pytest.fixture
def sample_orders():
return pl.DataFrame({
"order_id": [1, 2, 3, 4, 5],
"order_date": pl.Series(
[datetime.date(2024, 1, 5), datetime.date(2024, 1, 10),
datetime.date(2024, 2, 15), datetime.date(2024, 2, 20),
datetime.date(2024, 3, 25)], dtype=pl.Date
),
"product_id": [101, 102, 101, 103, 104],
"region_id": [1, 1, 2, 2, 3],
"sales_amount": [500.0, 300.0, 700.0, 600.0, 400.0]
})
def test_get_monthly_sales(sample_orders):
result = get_monthly_sales(sample_orders)
assert result.shape[0] > 0
assert "month" in result.columns
assert "total_sales" in result.columns
assert "order_count" in result.columns
3. Objects help track things (aka state)
I recommend that you use OOP when faced with the scenarios below.
3.1. Track connections & configs when connecting to external systems
When your code needs to connect to an external system, use objects.
Let’s look at a few examples:
DB connectors
: Your code needs to track connection configs, open configs, ensure they are closed in case of exceptions, etc.API connectors
: Your code needs to know what the URL of the API call should be, what query params are allowed, code to paginate through API calls, etc.
Let’s look at a simple example of managing a DB connection:
#
@dataclass
class DBConnection:
db: str
user: str
password: str
host: str
port: int = 5432
class WarehouseConnection:
def __init__(self, db_conn: DBConnection):
self.conn_url = (
f'postgresql://{db_conn.user}:{db_conn.password}@'
f'{db_conn.host}:{db_conn.port}/{db_conn.db}'
)
self.conn = None
@contextmanager
def managed_cursor(self, cursor_factory=None):
self.conn = psycopg2.connect(self.conn_url) if self.conn is None else self.conn
self.conn.autocommit = True
self.curr = self.conn.cursor(cursor_factory=cursor_factory)
try:
yield self.curr
finally:
self.curr.close()
self.conn.close()
# Additional methods to handle connection
In the above code, the WarehouseConnection
class object allows the user to open and close connections as needed. If we had defined this as a function, the code user would create a new connection for each interaction with the warehouse.
3.2. Track pipeline progress (logging, Observer) with objects
When you need a system to keep track of the state of your pipeline, use objects. Logging and observability systems (e.g., the number of rows in v out from a specific transformation function) have objects created for them before the pipeline starts.
Let’s look at the recommended way to use Prometheus (an Observability tool) .
from prometheus_client import Summary
import time
# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')
# Decorate function with metric.
@REQUEST_TIME.time()
def process_request(t):
"""A dummy function that takes some time."""
time.sleep(t)
We create an object REQUEST_TIME
of type Summary
class
.
3.3. Use objects to store configurations of data systems (e.g., Spark, etc.)
OOP is used by systems that require many setup configurations. Operations on the data system execute via the created object.
Some examples of this are:
- In Spark, dataframe operations require a
SparkSession
object. - Great Expectations require a
context
object to use.
These systems usually have a wide range of settings (e.g., Spark with the spark-settings.xml) that define how the data system operates.
For example, when you create a Sparksession object, it will pull in config variables from multiple config files (e.g., spark-defaults.config).
# See list of config files used when creating a SparkSession object
ls $SPARK_HOME/conf
metrics.properties
spark-defaults.conf
log4j2.properties
...
4. Class lets you define reusable code and pipeline patterns
In this section, we will see how you can reduce cognitive overload for developers by encapsulating (aka a fancy word for keeping related code in a single place) code and enforcing data processing patterns using a base class.
4.1. Templatize data flow patterns with a Class
As your code grows, you will notice that most of your pipelines follow a similar pattern. To eliminate repeated code and enforce code uniformity, create a base class and have all the pipelines inherit from it.
With data pipelines, keep the methods in the class as functional as possible; this allows you to reap the benefits of encapsulation (having all code relevant to a use case in one place) while keeping transformations functional.
Let’s assume all our pipelines do the following: extract -> transform -> validate data -> write to output
. We can templatize this as shown below:
#
class TableETL(ABC):
@abstractmethod
def __init__(
self,
spark,
run_upstream: bool = True,
load_data: bool = True,
) -> None:
self.spark = spark
# Other parameters
self.run_upstream = run_upstream
self.load_data = load_data
@abstractmethod
def extract_upstream(self) -> List[ETLDataSet]:
pass
@abstractmethod
def transform_upstream(
self, upstream_datasets: List[ETLDataSet]
) -> ETLDataSet:
pass
def validate(self, data: ETLDataSet) -> bool:
# Validation code
def load(self, data: ETLDataSet) -> None:
# Loading code
def run(self) -> None:
transformed_data = self.transform_upstream(self.extract_upstream())
if not self.validate(transformed_data):
raise InValidDataException(
f "The {self.name} dataset did not pass validation, please"
"check the metadata db for more information"
)
if self.load_data:
self.load(transformed_data)
@abstractmethod
def read(
self, partition_values: Optional[Dict[str, str]] = None
) -> ETLDataSet:
pass
In the above code, we define a template for structuring a data pipeline. In our case, we define standard ways of validating and loading data into the destination.
We enforce the code that inherits this base class to implement its extraction and transformation logic (both of which are typically highly dependent on the data pipeline).
Full code that shows class templatization pattern .
Read about abstract base class and abstract methods here .
4.2. Co-locate common & related data transformations using classes
As your pipeline and complexity grow, you’ll notice that certain patterns keep repeating, e.g., reading data from tables using one or more of some standard set of filters and repeating the same complex joins between fact and dimension tables multiple times in your code.
5. Functional code uses objects via Dependency Injection
Use dependency injection
to pass an object as an input parameter to a function.
# inject spark into a transformation function
def run_code(spark):
num_partitions = spark.sql(
"select returnflag, count(*) from tpch.lineitem group by 1"
).rdd.getNumPartitions()
print(
f "Number of partitions for lineitem dataframe is {num_partitions} and"
" the default spark partitions(spark.sql.shuffle.partitions) are set"
f" to {spark.conf.get('spark.sql.shuffle.partitions')}"
)
Here, a SparkSession object is “injected” into our transformation function. Dependency injection means that an object/function that your code “depends” on is being “injected” (as input param) into your code.
Another approach is to define a global object
. Loggers usually follow this pattern. You typically define a logger once when the code execution starts, and it will be used for the entirety of the code execution.
# From Python official docs at https://docs.python.org/3/library/logging.html
# myapp.py
import logging
import mylib
logger = logging.getLogger(__name__) # Logger is a global variable
def main():
logging.basicConfig(filename='myapp.log', level=logging.INFO)
logger.info('Started')
mylib.do_something()
logger.info('Finished')
if __name__ == '__main__':
main()
6. Conclusion
To recap, we went over the following:
- How function-oriented programming can help build maintainable data transformation code
- How objects enable state management
- How classes encapsulate logic and define pipeline templates
Writing and maintaining a data pipeline is tough. The next time you are building a pipeline from scratch, use the principles above to help you build easy-to-understand and maintainable code.
When you write code, make sure any engineer without too much context can understand it, and pay attention to how complex the code is to debug when it breaks. Your colleagues and future self will be grateful that you did.
Please let me know in the comment section below if you have any questions or comments.
7. Recommended reading
If you found this article helpful, share it with a friend or colleague using one of the socials below!