Should Data Pipelines in Python be Function based or Object-Oriented (OOP)?

1. Introduction

As a data engineer, you would have spent hours trying to figure out the right place to make a change in your repository—I know I have.

You think, “Why is it so difficult to make a simple change?".

You push a simple change (with tests, by the way), and suddenly, production issues start popping up!

Dealing with on-call issues when your repository is spaghetti code with multiple layers of abstracted logic is a special hell that makes data engineers age in dog years!

Messy code leads to delayed feature delivery and slow debug cycles, which lowers work satisfaction and delays promotions!

Bad code leads to a bad life

If this resonates with you, know that you are not alone. Every day, thousands of data engineers deal with bad code and, with the best intentions, write messy code.

Most data engineers want to write good code, but the common SWE patterns don’t translate easily to data processing patterns, and there aren’t many practical examples that illustrate how to write clean data pipelines.

Imagine a code base where every engineer knows where to look when something breaks, even if they have never worked on that part of the code base before. Imagine knowing intuitively where a piece of logic would be and quickly figuring out the source of any issue.

That is what this article helps you do!

In this post, I explain how to combine functions and OOP patterns in Python to write pipelines that are easy to maintain/debug. By the end of this post, you will have a clear picture of when and how to use functions and OOP effectively to make your (and your colleagues’) life easy.


Pre-read

This article will assume that you know the following

  1. Classes and objects in Python .
  2. Principles of functional programming in Python .

Note: We follow some functional programming patterns, but pure functional programming is not entirely possible in Python. See this Reddit thread for discussions on this.

TL;DR: Data transformations as functions and connections to external systems as OOP.

FP or OOP

2. Data transformations as functions lead to maintainable code

Data transformations are a series of changes applied to input(s). Functions best capture this nature of data transformations. Each function should ideally be idempotent and atomic and not change any data but only apply transformation to inputs.

Creating data transformations as functions makes the code:

  1. Easy to maintain and debug.
  2. Easy to test.
  3. Represent the reality of transforming data in a series of steps (represented as functions).
  4. Simpler to run in parallel (if your transformations are independent of historical data) with different inputs.

Let’s look at an example of a pipeline that accepts some data and transforms data with functions:

def get_monthly_sales(orders: pl.DataFrame) -> pl.DataFrame:
    return (
        orders
        .with_columns(pl.col("order_date").dt.truncate("1mo").alias("month"))
        .group_by(["month", "product_id", "region_id"])
        .agg([
            pl.col("sales_amount").sum().alias("total_sales"),
            pl.col("order_id").count().alias("order_count")
        ])
    )

def get_regional_sales(monthly_sales: pl.DataFrame) -> pl.DataFrame:
    return (
        monthly_sales
        .group_by(["month", "region_id"])
        .agg([
            pl.col("total_sales").sum().alias("regional_sales"),
            pl.col("order_count").sum().alias("regional_orders")
        ])
    )

# Data transformations are applied as a series of individual transformations
monthly_sales = get_monthly_sales(orders)
regional_sales = get_regional_sales(monthly_sales)
# And so on

It would be simple to create tests for them, as shown below:

@pytest.fixture
def sample_orders():
    return pl.DataFrame({
        "order_id": [1, 2, 3, 4, 5],
        "order_date": pl.Series(
            [datetime.date(2024, 1, 5), datetime.date(2024, 1, 10), 
             datetime.date(2024, 2, 15), datetime.date(2024, 2, 20), 
             datetime.date(2024, 3, 25)], dtype=pl.Date
        ),
        "product_id": [101, 102, 101, 103, 104],
        "region_id": [1, 1, 2, 2, 3],
        "sales_amount": [500.0, 300.0, 700.0, 600.0, 400.0]
    })

def test_get_monthly_sales(sample_orders):
    result = get_monthly_sales(sample_orders)
    
    assert result.shape[0] > 0
    assert "month" in result.columns
    assert "total_sales" in result.columns
    assert "order_count" in result.columns

3. Objects help track things (aka state)

I recommend that you use OOP when faced with the scenarios below.

3.1. Track connections & configs when connecting to external systems

When your code needs to connect to an external system, use objects.

Let’s look at a few examples:

  1. DB connectors: Your code needs to track connection configs, open configs, ensure they are closed in case of exceptions, etc.
  2. API connectors: Your code needs to know what the URL of the API call should be, what query params are allowed, code to paginate through API calls, etc.

Let’s look at a simple example of managing a DB connection:

#
@dataclass
class DBConnection:
    db: str
    user: str
    password: str
    host: str
    port: int = 5432


class WarehouseConnection:
    def __init__(self, db_conn: DBConnection):
        self.conn_url = (
            f'postgresql://{db_conn.user}:{db_conn.password}@'
            f'{db_conn.host}:{db_conn.port}/{db_conn.db}'
        )
        self.conn = None

    @contextmanager
    def managed_cursor(self, cursor_factory=None):
        self.conn = psycopg2.connect(self.conn_url) if self.conn is None else self.conn
        self.conn.autocommit = True
        self.curr = self.conn.cursor(cursor_factory=cursor_factory)
        try:
            yield self.curr
        finally:
            self.curr.close()
            self.conn.close()
    # Additional methods to handle connection

In the above code, the WarehouseConnection class object allows the user to open and close connections as needed. If we had defined this as a function, the code user would create a new connection for each interaction with the warehouse.

3.2. Track pipeline progress (logging, Observer) with objects

When you need a system to keep track of the state of your pipeline, use objects. Logging and observability systems (e.g., the number of rows in v out from a specific transformation function) have objects created for them before the pipeline starts.

Let’s look at the recommended way to use Prometheus (an Observability tool) .

from prometheus_client import Summary
import time

# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')

# Decorate function with metric.
@REQUEST_TIME.time()
def process_request(t):
    """A dummy function that takes some time."""
    time.sleep(t)

We create an object REQUEST_TIME of type Summary class .

3.3. Use objects to store configurations of data systems (e.g., Spark, etc.)

OOP is used by systems that require many setup configurations. Operations on the data system execute via the created object.

Some examples of this are:

  1. In Spark, dataframe operations require a SparkSession object.
  2. Great Expectations require a context object to use.

These systems usually have a wide range of settings (e.g., Spark with the spark-settings.xml) that define how the data system operates.

For example, when you create a Sparksession object, it will pull in config variables from multiple config files (e.g., spark-defaults.config).

# See list of config files used when creating a SparkSession object
ls $SPARK_HOME/conf

metrics.properties
spark-defaults.conf
log4j2.properties
...

4. Class lets you define reusable code and pipeline patterns

In this section, we will see how you can reduce cognitive overload for developers by encapsulating (aka a fancy word for keeping related code in a single place) code and enforcing data processing patterns using a base class.

4.1. Templatize data flow patterns with a Class

As your code grows, you will notice that most of your pipelines follow a similar pattern. To eliminate repeated code and enforce code uniformity, create a base class and have all the pipelines inherit from it.

With data pipelines, keep the methods in the class as functional as possible; this allows you to reap the benefits of encapsulation (having all code relevant to a use case in one place) while keeping transformations functional.

Let’s assume all our pipelines do the following: extract -> transform -> validate data -> write to output. We can templatize this as shown below:

#
class TableETL(ABC):
    @abstractmethod
    def __init__(
        self,
        spark,
        run_upstream: bool = True,
        load_data: bool = True,
    ) -> None:
        self.spark = spark
        
        # Other parameters
        self.run_upstream = run_upstream
        self.load_data = load_data

    @abstractmethod
    def extract_upstream(self) -> List[ETLDataSet]:
        pass

    @abstractmethod
    def transform_upstream(
        self, upstream_datasets: List[ETLDataSet]
    ) -> ETLDataSet:
        pass

    def validate(self, data: ETLDataSet) -> bool:
        # Validation code

    def load(self, data: ETLDataSet) -> None:
        #  Loading code

    def run(self) -> None:
        transformed_data = self.transform_upstream(self.extract_upstream())
        if not self.validate(transformed_data):
            raise InValidDataException(
                f "The {self.name} dataset did not pass validation, please"
                "check the metadata db for more information"
            )
        if self.load_data:
            self.load(transformed_data)

    @abstractmethod
    def read(
        self, partition_values: Optional[Dict[str, str]] = None
    ) -> ETLDataSet:
        pass

In the above code, we define a template for structuring a data pipeline. In our case, we define standard ways of validating and loading data into the destination.

We enforce the code that inherits this base class to implement its extraction and transformation logic (both of which are typically highly dependent on the data pipeline).

Full code that shows class templatization pattern .

Read about abstract base class and abstract methods here .

As your pipeline and complexity grow, you’ll notice that certain patterns keep repeating, e.g., reading data from tables using one or more of some standard set of filters and repeating the same complex joins between fact and dimension tables multiple times in your code.

5. Functional code uses objects via Dependency Injection

Use dependency injection to pass an object as an input parameter to a function.

# inject spark into a transformation function
def run_code(spark):

    num_partitions = spark.sql(
        "select returnflag, count(*) from tpch.lineitem group by 1"
    ).rdd.getNumPartitions()
    print(
        f "Number of partitions for lineitem dataframe is {num_partitions} and"
        " the default spark partitions(spark.sql.shuffle.partitions) are set"
        f" to {spark.conf.get('spark.sql.shuffle.partitions')}"
    )

Here, a SparkSession object is “injected” into our transformation function. Dependency injection means that an object/function that your code “depends” on is being “injected” (as input param) into your code.

Another approach is to define a global object. Loggers usually follow this pattern. You typically define a logger once when the code execution starts, and it will be used for the entirety of the code execution.

# From Python official docs at https://docs.python.org/3/library/logging.html
# myapp.py
import logging
import mylib
logger = logging.getLogger(__name__) # Logger is a global variable

def main():
    logging.basicConfig(filename='myapp.log', level=logging.INFO)
    logger.info('Started')
    mylib.do_something()
    logger.info('Finished')

if __name__ == '__main__':
    main()

6. Conclusion

To recap, we went over the following:

  1. How function-oriented programming can help build maintainable data transformation code
  2. How objects enable state management
  3. How classes encapsulate logic and define pipeline templates

Writing and maintaining a data pipeline is tough. The next time you are building a pipeline from scratch, use the principles above to help you build easy-to-understand and maintainable code.

When you write code, make sure any engineer without too much context can understand it, and pay attention to how complex the code is to debug when it breaks. Your colleagues and future self will be grateful that you did.

Please let me know in the comment section below if you have any questions or comments.

  1. Data flow & code best practices
  2. Data flow patterns

If you found this article helpful, share it with a friend or colleague using one of the socials below!

Land your dream Data Engineering job!

Overwhelmed by all the concepts you need to learn to become a data engineer? Have difficulty finding good data projects for your portfolio? Are online tutorials littered with sponsored tools and not foundational concepts?

Learning data engineer can be a long and rough road, but it doesn't have to be!

Pick up any new tool/framework with a clear understanding of data engineering fundamentals. Demonstrate your expertise by building well-documented real-world projects on GitHub.

Sign up for my free DE-101 course that will take you from basics to building data projects in 4 weeks!

Join now and get started on your data engineering journey!

    Testimonials:

    I really appreciate you putting these detailed posts together for your readers, you explain things in such a detailed, simple manner that's well organized and easy to follow. I appreciate it so so much!
    I have learned a lot from the course which is much more practical.
    This course helped me build a project and actually land a data engineering job! Thank you.

    When you subscribe, you'll also get emails about data engineering concepts, development practices, career advice, and projects every 2 weeks (or so) to help you level up your data engineering skills. We respect your email privacy.

    M ↓   Markdown