End-to-end data engineering project - batch edition

Objective

It can be difficult to know where to begin when starting a data engineering side project. If you have wondered

What data to use for your data project?

How to design your data project?

Then this post is for you. We will go over the key components, and help you understand what you need to design and build your data projects. We will do this using a sample end-to-end data engineering project.

Setup

Let’s assume that we work for an online store and have to get customer’s and orders data ready for analysis using a data visualization tool. The data pipeline architecture is shown below.

Architecture

This is a standard ELT pattern, as all of the code is in python and SQL you can modify them for your data project.

Pre-requisites

  1. git
  2. Github account
  3. Terraform
  4. AWS account
  5. AWS CLI installed and configured
  6. Docker with at least 4GB of RAM and Docker Compose v1.27.0 or later

Read this post , for information on setting up CI/CD, DB migrations, IAC(terraform), “make” commands and automated testing.

Run these commands to setup your project locally and on the cloud.

# Clone the code as shown below.
git clone https://github.com/josephmachado/online_store.git
cd online_store

# Local run & test
make up # start the docker containers on your computer & runs migrations under ./migrations
make ci # Runs auto formatting, lint checks, & all the test files under ./tests

# Create AWS services with Terraform
make tf-init # Only needed on your first terraform run (or if you add new providers)
make infra-up # type in yes after verifying the changes TF will make

# Wait until the EC2 instance is initialized, you can check this via your AWS UI
# See "Status Check" on the EC2 console, it should be "2/2 checks passed" before proceeding

make cloud-metabase # this command will forward Metabase port from EC2 to your machine and opens it in the browser

make cloud-dagster # this command will forward Dagster port from EC2 to your machine and opens it in the browser

We use Dagster as our orchestrator and scheduler. In your cloud dagster instance (open this using make cloud-dagster) turn on the schedule button near the top of the screen.

Dagster schedule

You can connect metabase (open this using make cloud-metabase) to the warehouse with the following credentials

WAREHOUSE_USER: sde
WAREHOUSE_PASSWORD: password
WAREHOUSE_DB: warehouse
WAREHOUSE_HOST: warehouse_db
WAREHOUSE_PORT: 5432

Create database migrations as shown below.

make db-migration # enter a description, e.g., create some schema
# make your changes to the newly created file under ./migrations
make warehouse-migration # to run the new migration on your warehouse

For the continuous delivery to work, set up the infrastructure with terraform, & defined the following repository secrets. You can set up the repository secrets by going to Settings > Secrets > Actions > New repository secret.

  1. SERVER_SSH_KEY: We can get this by running terraform -chdir=./terraform output -raw private_key in the project directory and paste the entire content in a new Action secret called SERVER_SSH_KEY.
  2. REMOTE_HOST: Get this by running terraform -chdir=./terraform output -raw ec2_public_dns in the project directory.
  3. REMOTE_USER: The value for this is ubuntu.

Components

In this section, we will look at the main components of our data pipeline. The idea here is to use these as a starting point for your data project.

Source systems

In real data pipelines, you will use sources such as your internal database, APIs, files, cloud storage systems, etc. We use a data generator that creates fake source data, and a fast API server to serve fake customer risk scores. The tables are created when we spin up docker containers using the customer DB setup script and warehouse tables setup script .

If you cannot find publicly available datasets for your problem of interest, generating fake data is a good alternative.

Schedule & Orchestrate

Orchestrator is the tool/framework used to ensure that the tasks are executed in order, retrying on failures, storing metadata, and displaying progress via UI.

The scheduler is responsible for starting data pipelines at their scheduled frequency.

We use Dagster as our scheduler and orchestrator due to its ease of use and setup. A few key concepts to understand about Dagster are:

  1. ops: These are code that does the computation. In our case, this includes are the extract, load, and transform (with inbuilt dbt op) code. Our ops are defined here .
  2. jobs: We use a job to define the order of execution of operations. Our data ingestion job is defined here .
  3. schedules: Schedules define the frequency of a job run. Our schedule is defined here .
  4. repository: Jobs and schedules are organized into repositories. Our repository is defined here .

Given below is the code that chains together ops to create our job.

def online_store_data_ingestion():
    dbt_test_op(
        dbt_run_op(
            [
                load_customer_risk_score(extract_customer_risk_score()),
                load_customer_data(extract_customer_data()),
                load_orders_data(extract_orders_data()),
            ]
        )
    )

Notice how our python job shown above creates the data pipeline shown below.

Dagster UI

Extract

Extractors pull data from the source system. We extract customer data from our application’s customer database, orders data from s3, and customer risk scores from an API endpoint.

Load

Loaders load data into the destination system(s). We load our orders, customers, and customer risk score data into store.orders, store.customers, and store.customer_risk_score warehouse tables respectively.

We can also load data into other systems as needed. E.g. load customer data into an elastic search cluster for text-based querying, orders into graph database for graph-based querying, cache systems, etc.

The code for extract and load components are present here as load_* functions.

Transform

Transformers clean, apply business logic and model the data ready to be used. This transformed data is used by our stakeholders.

We use dbt to execute our transformation scripts. We de-duplicate the data, cast columns to the correct data types, join with other tables (customer risk score and states), and create the fct_orders and dim_customers views as shown below (based on Kimball modeling).

dim_customers Customer dimension

fct_orders Orders fact

Recommended reading:

  1. Benefits of dbt
  2. Dbt tutorial

Data visualization

Data visualization tools enable business users, analysts, engineers to generate aggregates at different levels, create sharable dashboards, write sql queries, etc. They do not have to connect directly to our warehouse.

We use metabase as our data visualization tool.

Choosing tools & frameworks

We have used open-source software to make development easy, free, and configurable. You may want to switch out tools as necessary. E.g. Fivetran instead of our custom code, etc.

Recommended reading:

  1. How to choose your data tools

Future work & improvements

While we have a working data pipeline, there are a lot of possible improvements. Some of the crucial ones are listed below.

  1. Idempotence: In the customer load step we insert all the data for the past 5 minutes into our warehouse. This process will result in duplicate data since our pipeline runs every 2 minutes. Making the load step idempotent will prevent duplicate data. Read this article on how to make your data pipelines idempotent for more details.
  2. Backfills: Backfilling is an inevitable part of data engineering. To learn more about backfills and how to make our data pipeline backfill-able, read this article
  3. Change data capture & Slowly changing dimensions: A key feature of having a data warehouse is storing queryable historical data. This is commonly done with a modeling technique called slowly changing dimension which requires us to know all the create, delete and update operations happening on the source table. We can use a technique called change data capture (CDC) to capture all the create, delete and update operations happening on the source table. Read how to implement CDC , and how to create SCD2 tables with dbt .
  4. Testing: We have a unit test and post processing dbt test for our pipeline. Read how to add tests , and how to set up CI tests to take our testing capabilities to the next level.
  5. Scale: When passing data between functions, dagster stores them in a temporary file (default). In our load steps, we load the entire data into python process memory and then insert them into the warehouse, while this works for small data sets, it will not work if the data is larger than the process memory. Read how to stream data in your python process and how to scale your data pipeline for ideas on how to handle big data.

Please feel free to create an issue or open a PR here .

Conclusion

To recap we saw

  1. The key components of a data pipeline
  2. Generating fake data
  3. How to design your data project
  4. Future work & improvements

For your data project choose the tools you know the best or would like to learn. Use this project’s infrastructure and architecture as a starting point to build your data project.

Hope this article gives you a good understanding of the key components of a data pipeline and how you can set up your data project. If you have any questions, comments, or ideas please leave them in the comment section below.

Further reading

  1. Want to understand what the data pipeline components are and how to choose them? Read this article .
  2. Read this article for a quick introduction to data warehouses.
  3. Curious about ways to load data into a data warehouse? Checkout this article .
  4. Want to do a similar project with Airflow? Checkout this article .

References

  1. Dagster docs
  2. Metabase docs
  3. FastAPI docker
  4. Dagster docker setup
  5. dbt docs

If you found this article helpful, share it with a friend or colleague using one of the socials below!

Land your dream Data Engineering job!

Overwhelmed by all the concepts you need to learn to become a data engineer? Have difficulty finding good data projects for your portfolio? Are online tutorials littered with sponsored tools and not foundational concepts?

Learning data engineer can be a long and rough road, but it doesn't have to be!

Pick up any new tool/framework with a clear understanding of data engineering fundamentals. Demonstrate your expertise by building well-documented real-world projects on GitHub.

Sign up for my free DE-101 course that will take you from basics to building data projects in 4 weeks!

    We won't send you spam. Unsubscribe at any time.

    Land your dream Data Engineering job!

    Overwhelmed by all the concepts you need to learn to become a data engineer? Have difficulty finding good data projects for your portfolio? Are online tutorials littered with sponsored tools and not foundational concepts?

    Learning data engineer can be a long and rough road, but it doesn't have to be!

    Pick up any new tool/framework with a clear understanding of data engineering fundamentals. Demonstrate your expertise by building well-documented real-world projects on GitHub.

    Sign up for my free DE-101 course that will take you from basics to building data projects in 4 weeks!

    Join now and get started on your data engineering journey!

      Testimonials:

      I really appreciate you putting these detailed posts together for your readers, you explain things in such a detailed, simple manner that's well organized and easy to follow. I appreciate it so so much!
      I have learned a lot from the course which is much more practical.
      This course helped me build a project and actually land a data engineering job! Thank you.

      When you subscribe, you'll also get emails about data engineering concepts, development practices, career advice, and projects every 2 weeks (or so) to help you level up your data engineering skills. We respect your email privacy.

      M ↓   Markdown
      ?
      Anonymous
      0 points
      3 years ago

      If I want to use Airflow to replace Dagster as our scheduler and orchestrator , how can I do that? Or could you point to me one of your excellent tutorials that has already done this?

      J
      Joseph Kevin Machado
      0 points
      3 years ago

      You can use this docker compose to spin up Airflow containers.

      You will have to re-write the DAGs here https://github.com/josephmachado/online_store/tree/main/app to follow Airflow DAG definitions & remove all the dagster_ services in this projects definition https://github.com/josephmachado/online_store/blob/main/docker-compose.yml.

      Feel free to setup a PR with the Airflow version for this project https://github.com/josephmachado/online_store/pulls.

      ?
      Anonymous
      0 points
      3 years ago

      MINGW64 ~/online_store/online_store (main) $ make up bash: make: command not found

      This is the error I keep getting, pls help

      ?
      Anonymous
      0 points
      3 years ago

      I think the problem is because I have not installed Dagster, so do I need to install python or is there a way to install it on the Git Bash? please help I an very new to data engineering.

      J
      Joseph Kevin Machado
      0 points
      3 years ago

      So sorry you are facing issues. Are you on windows? or is docker runner? The make up command will spin up docker containers for dagster, you don't need to install anything. Please let me know and I will try to help here.

      ?
      Anonymous
      0 points
      3 years ago

      Thanks Joseph.. Yes I am on windows and my docker is running.. Please my only confusion is where to run the make up code..from the tutorial, it looks like you need to get dagster installed. Pls if you dont mind..kindly help with a step by step process of where to run the git clone and the make up code. Thank you in advance.

      J
      Joseph Kevin Machado
      0 points
      3 years ago

      Sure thing. The make up is a shortcut to run the command docker compose up --build -d as seen here

      Could you please try the following

      1. Open bash terminal on windows. This link has some steps on how to open bash terminal depending on your windows version.
      2. Once you are inside the bash terminal, run the git clone https://github.com/josephmachado/online_store.git command and go to the online_store project directory using cd online_store and run make up. If for some reason make does not work could you please try running the command docker compose up --build -d .
      3. After the previous step is done, try docker ps you will see dagster container running.
      4. If all the above steps are good, go to http://localhost:3000/ on your web browser.
      ?
      Anonymous
      1 point
      3 years ago

      Thanks a lot Joseph, this works fine and I am able to open the localhost successfully on my web browser.

      J
      Joseph Kevin Machado
      0 points
      3 years ago

      Woohoo, congratulations :)

      ?
      Anonymous
      0 points
      3 years ago

      ****We use Dagster as our orchestrator and scheduler. Go to your local dagster instance at http://localhost:3000/ and turn on the schedule button near the top of the screen. ****

      Hii I got a bit stuck here, pls help, how do I get to the instance using the http://localhost:3000/ ?

      J
      Joseph Kevin Machado
      0 points
      3 years ago

      Hi, So the make up command will start all the docker containers. Going to http://localhost:3000 on your browser (firefox or chrome) will take you to the Dagster UI. Please let me know if this answers your question.

      ?
      Anonymous
      0 points
      3 years ago

      Can I use pySpark for this project?

      J
      Joseph Kevin Machado
      0 points
      3 years ago

      Yes you can, but I'd make sure that there is a good need for it

      ?
      Anonymous
      0 points
      3 years ago

      ERROR [dagster_code_runner_image 2/4] RUN pip install dagster==0 , I am getting this error while running makeup , i tried to run docker compose up --build -d manually too , got the same .pls help

      J
      Joseph Kevin Machado
      0 points
      3 years ago

      Can you please open a github issue at https://github.com/josephmachado/online_store/issues ? with the exact steps you took to run the containers with a docker ps check before running make up.

      ?
      Anonymous
      0 points
      3 years ago

      Collecting snowflake-connector-python[secure-local-storage]<2.6.0,>=2.4.1 #0 20.04 Downloading snowflake-connector-python-2.5.1.tar.gz (344 kB) #0 20.04 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 344.9/344.9 kB 53.7 MB/s eta 0:00:00 #0 20.16 Installing build dependencies: started #0 26.17 Installing build dependencies: finished with status 'error' #0 26.20 error: subprocess-exited-with-error #0 26.20
      #0 26.20 × pip subprocess to install build dependencies did not run successfully.

      from here it's showing error

      ?
      Anonymous
      0 points
      3 years ago

      on running makeup , i am geeting this error. due to which build is failing. please help Collecting snowflake-connector-python[secure-local-storage]<2.6.0,>=2.4.1 #0 22.80 Downloading snowflake-connector-python-2.5.1.tar.gz (344 kB) #0 22.82 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 344.9/344.9 KB 23.0 MB/s eta 0:00:00 #0 22.92 Installing build dependencies: started #0 28.68 Installing build dependencies: finished with status 'error' #0 28.83 error: subprocess-exited-with-error #0 28.83
      #0 28.83 × pip subprocess to install build dependencies did not run successfully. #0 28.83 │ exit code: 1 #0 28.83 ╰─> [628 lines of output]

      C
      Cương Nguyễn Ngọc
      0 points
      2 years ago

      sry. In your prj, I don't see datetime_inserted field in store.customers or store.orders but you use that field in stagging. Can you explain what point of that field and where(what line of code) use to insert that field?

      ?
      Anonymous
      0 points
      2 years ago

      The store.customers are created in the warehouse during docker compose up (this script is executed when we start the docker containers https://github.com/josephmachado/online_store/blob/main/warehouse_db_setup/1_create_customer_order_state_tables.sql ) as for the store.orders staging table we do not have a date time_inserted but a delivered_on column(https://github.com/josephmachado/online_store/blob/main/app/transform/models/staging/stg_el__orders.sql) hope this helps. LMK if you have more questions