End-to-end data engineering project - batch edition
- Objective
- Setup
- Components
- Choosing tools & frameworks
- Future work & improvements
- Conclusion
- Further reading
- References
Objective
It can be difficult to know where to begin when starting a data engineering side project. If you have wondered
What data to use for your data project?
How to design your data project?
Then this post is for you. We will go over the key components, and help you understand what you need to design and build your data projects. We will do this using a sample end-to-end data engineering project.
Setup
Let’s assume that we work for an online store and have to get customer’s and orders data ready for analysis using a data visualization tool. The data pipeline architecture is shown below.
This is a standard ELT pattern, as all of the code is in python and SQL you can modify them for your data project.
Pre-requisites
- git
- Github account
- Terraform
- AWS account
- AWS CLI installed and configured
- Docker with at least 4GB of RAM and Docker Compose v1.27.0 or later
Read this post , for information on setting up CI/CD, DB migrations, IAC(terraform), “make” commands and automated testing.
Run these commands to setup your project locally and on the cloud.
# Clone the code as shown below.
git clone https://github.com/josephmachado/online_store.git
cd online_store
# Local run & test
make up # start the docker containers on your computer & runs migrations under ./migrations
make ci # Runs auto formatting, lint checks, & all the test files under ./tests
# Create AWS services with Terraform
make tf-init # Only needed on your first terraform run (or if you add new providers)
make infra-up # type in yes after verifying the changes TF will make
# Wait until the EC2 instance is initialized, you can check this via your AWS UI
# See "Status Check" on the EC2 console, it should be "2/2 checks passed" before proceeding
make cloud-metabase # this command will forward Metabase port from EC2 to your machine and opens it in the browser
make cloud-dagster # this command will forward Dagster port from EC2 to your machine and opens it in the browser
We use Dagster
as our orchestrator and scheduler. In your cloud dagster
instance (open this using make cloud-dagster
) turn on the schedule button near the top of the screen.
You can connect metabase (open this using make cloud-metabase
) to the warehouse with the following credentials
WAREHOUSE_USER: sde
WAREHOUSE_PASSWORD: password
WAREHOUSE_DB: warehouse
WAREHOUSE_HOST: warehouse_db
WAREHOUSE_PORT: 5432
Create database migrations as shown below.
make db-migration # enter a description, e.g., create some schema
# make your changes to the newly created file under ./migrations
make warehouse-migration # to run the new migration on your warehouse
For the continuous delivery
to work, set up the infrastructure with terraform, & defined the following repository secrets. You can set up the repository secrets by going to Settings > Secrets > Actions > New repository secret
.
SERVER_SSH_KEY
: We can get this by runningterraform -chdir=./terraform output -raw private_key
in the project directory and paste the entire content in a new Action secret called SERVER_SSH_KEY.REMOTE_HOST
: Get this by runningterraform -chdir=./terraform output -raw ec2_public_dns
in the project directory.REMOTE_USER
: The value for this is ubuntu.
Components
In this section, we will look at the main components of our data pipeline. The idea here is to use these as a starting point for your data project.
Source systems
In real data pipelines, you will use sources such as your internal database, APIs, files, cloud storage systems, etc. We use a data generator that creates fake source data, and a fast API server to serve fake customer risk scores. The tables are created when we spin up docker containers using the customer DB setup script and warehouse tables setup script .
If you cannot find publicly available datasets for your problem of interest, generating fake data is a good alternative.
Schedule & Orchestrate
Orchestrator is the tool/framework used to ensure that the tasks are executed in order, retrying on failures, storing metadata, and displaying progress via UI.
The scheduler is responsible for starting data pipelines at their scheduled frequency.
We use Dagster
as our scheduler and orchestrator due to its ease of use and setup. A few key concepts to understand about Dagster
are:
ops
: These are code that does the computation. In our case, this includes are the extract, load, and transform (with inbuiltdbt
op) code. Our ops are defined here .jobs
: We use a job to define the order of execution of operations. Our data ingestion job is defined here .schedules
: Schedules define the frequency of a job run. Our schedule is defined here .repository
: Jobs and schedules are organized into repositories. Our repository is defined here .
Given below is the code that chains together ops to create our job.
def online_store_data_ingestion():
dbt_test_op(
dbt_run_op(
[
load_customer_risk_score(extract_customer_risk_score()),
load_customer_data(extract_customer_data()),
load_orders_data(extract_orders_data()),
]
)
)
Notice how our python job shown above creates the data pipeline shown below.
Extract
Extractors pull data from the source system. We extract customer data from our application’s customer database, orders data from s3, and customer risk scores from an API endpoint.
Load
Loaders load data into the destination system(s). We load our orders, customers, and customer risk score data into store.orders
, store.customers
, and store.customer_risk_score
warehouse tables respectively.
We can also load data into other systems as needed. E.g. load customer data into an elastic search cluster for text-based querying, orders into graph database for graph-based querying, cache systems, etc.
The code for extract and load components are present here
as load_*
functions.
Transform
Transformers clean, apply business logic and model the data ready to be used. This transformed data is used by our stakeholders.
We use dbt
to execute our transformation scripts. We de-duplicate the data, cast columns to the correct data types, join with other tables (customer risk score and states), and create the fct_orders
and dim_customers
views as shown below (based on Kimball modeling).
dim_customers
fct_orders
Recommended reading:
Data visualization
Data visualization tools enable business users, analysts, engineers to generate aggregates at different levels, create sharable dashboards, write sql queries, etc. They do not have to connect directly to our warehouse.
We use metabase
as our data visualization tool.
Choosing tools & frameworks
We have used open-source software to make development easy, free, and configurable. You may want to switch out tools as necessary. E.g. Fivetran instead of our custom code, etc.
Recommended reading:
Future work & improvements
While we have a working data pipeline, there are a lot of possible improvements. Some of the crucial ones are listed below.
Idempotence
: In thecustomer
load step we insert all the data for the past 5 minutes into our warehouse. This process will result in duplicate data since our pipeline runs every 2 minutes. Making the load step idempotent will prevent duplicate data. Read this article on how to make your data pipelines idempotent for more details.Backfills
: Backfilling is an inevitable part of data engineering. To learn more about backfills and how to make our data pipeline backfill-able, read this articleChange data capture & Slowly changing dimensions
: A key feature of having a data warehouse is storing queryable historical data. This is commonly done with a modeling technique called slowly changing dimension which requires us to know all the create, delete and update operations happening on the source table. We can use a technique called change data capture (CDC) to capture all the create, delete and update operations happening on the source table. Read how to implement CDC , and how to create SCD2 tables with dbt .Testing
: We have a unit test and post processing dbt test for our pipeline. Read how to add tests , and how to set up CI tests to take our testing capabilities to the next level.Scale
: When passing data between functions, dagster stores them in a temporary file (default). In our load steps, we load the entire data into python process memory and then insert them into the warehouse, while this works for small data sets, it will not work if the data is larger than the process memory. Read how to stream data in your python process and how to scale your data pipeline for ideas on how to handle big data.
Please feel free to create an issue or open a PR here .
Conclusion
To recap we saw
- The key components of a data pipeline
- Generating fake data
- How to design your data project
- Future work & improvements
For your data project choose the tools you know the best or would like to learn. Use this project’s infrastructure and architecture as a starting point to build your data project.
Hope this article gives you a good understanding of the key components of a data pipeline and how you can set up your data project. If you have any questions, comments, or ideas please leave them in the comment section below.
Further reading
- Want to understand what the data pipeline components are and how to choose them? Read this article .
- Read this article for a quick introduction to data warehouses.
- Curious about ways to load data into a data warehouse? Checkout this article .
- Want to do a similar project with Airflow? Checkout this article .
References
If you found this article helpful, share it with a friend or colleague using one of the socials below!
If I want to use Airflow to replace Dagster as our scheduler and orchestrator , how can I do that? Or could you point to me one of your excellent tutorials that has already done this?
You can use this docker compose to spin up Airflow containers.
You will have to re-write the DAGs here https://github.com/josephmachado/online_store/tree/main/app to follow Airflow DAG definitions & remove all the
dagster_
services in this projects definition https://github.com/josephmachado/online_store/blob/main/docker-compose.yml.Feel free to setup a PR with the Airflow version for this project https://github.com/josephmachado/online_store/pulls.
MINGW64 ~/online_store/online_store (main) $ make up bash: make: command not found
This is the error I keep getting, pls help
I think the problem is because I have not installed Dagster, so do I need to install python or is there a way to install it on the Git Bash? please help I an very new to data engineering.
So sorry you are facing issues. Are you on windows? or is docker runner? The
make up
command will spin up docker containers for dagster, you don't need to install anything. Please let me know and I will try to help here.Thanks Joseph.. Yes I am on windows and my docker is running.. Please my only confusion is where to run the make up code..from the tutorial, it looks like you need to get dagster installed. Pls if you dont mind..kindly help with a step by step process of where to run the git clone and the make up code. Thank you in advance.
Sure thing. The
make up
is a shortcut to run the commanddocker compose up --build -d
as seen hereCould you please try the following
git clone https://github.com/josephmachado/online_store.git
command and go to the online_store project directory usingcd online_store
and runmake up
. If for some reason make does not work could you please try running the commanddocker compose up --build -d
.docker ps
you will see dagster container running.Thanks a lot Joseph, this works fine and I am able to open the localhost successfully on my web browser.
Woohoo, congratulations :)
****We use Dagster as our orchestrator and scheduler. Go to your local dagster instance at http://localhost:3000/ and turn on the schedule button near the top of the screen. ****
Hii I got a bit stuck here, pls help, how do I get to the instance using the http://localhost:3000/ ?
Hi, So the
make up
command will start all the docker containers. Going tohttp://localhost:3000
on your browser (firefox or chrome) will take you to the Dagster UI. Please let me know if this answers your question.Can I use pySpark for this project?
Yes you can, but I'd make sure that there is a good need for it
ERROR [dagster_code_runner_image 2/4] RUN pip install dagster==0 , I am getting this error while running makeup , i tried to run docker compose up --build -d manually too , got the same .pls help
Can you please open a github issue at https://github.com/josephmachado/online_store/issues ? with the exact steps you took to run the containers with a
docker ps
check before runningmake up
.Collecting snowflake-connector-python[secure-local-storage]<2.6.0,>=2.4.1 #0 20.04 Downloading snowflake-connector-python-2.5.1.tar.gz (344 kB) #0 20.04 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 344.9/344.9 kB 53.7 MB/s eta 0:00:00 #0 20.16 Installing build dependencies: started #0 26.17 Installing build dependencies: finished with status 'error' #0 26.20 error: subprocess-exited-with-error #0 26.20
#0 26.20 × pip subprocess to install build dependencies did not run successfully.
from here it's showing error
on running makeup , i am geeting this error. due to which build is failing. please help Collecting snowflake-connector-python[secure-local-storage]<2.6.0,>=2.4.1 #0 22.80 Downloading snowflake-connector-python-2.5.1.tar.gz (344 kB) #0 22.82 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 344.9/344.9 KB 23.0 MB/s eta 0:00:00 #0 22.92 Installing build dependencies: started #0 28.68 Installing build dependencies: finished with status 'error' #0 28.83 error: subprocess-exited-with-error #0 28.83
#0 28.83 × pip subprocess to install build dependencies did not run successfully. #0 28.83 │ exit code: 1 #0 28.83 ╰─> [628 lines of output]
sry. In your prj, I don't see datetime_inserted field in store.customers or store.orders but you use that field in stagging. Can you explain what point of that field and where(what line of code) use to insert that field?
The store.customers are created in the warehouse during docker compose up (this script is executed when we start the docker containers https://github.com/josephmachado/online_store/blob/main/warehouse_db_setup/1_create_customer_order_state_tables.sql ) as for the store.orders staging table we do not have a date time_inserted but a delivered_on column(https://github.com/josephmachado/online_store/blob/main/app/transform/models/staging/stg_el__orders.sql) hope this helps. LMK if you have more questions