Writing memory efficient data pipelines in Python
- Introduction
- 1. Using generators
- 2. Using distributed frameworks
- Conclusion
- Further reading
- References
Introduction
If you are
Wondering how to write memory efficient data pipelines in python
Working with a dataset that is too large to fit into memory
Then this post is for you. We will go over how to write memory efficient data pipelines using generators and when to use distributed data processing frameworks.
1. Using generators
“Regular functions compute a value and return it, but generators return an iterator that returns a stream of values.” - Python Docs
Creating a generator is very similar to creating a function. You can use the generator expression ()
or define a function that uses yield
instead of return
.
Generators are usually used in a loop. For each iteration of the calling loop:
- Control is passed back to the generator function from the calling loop.
- The generator
yield
s the next value to the loop and the control is passed back to the loop. - Steps 1 and 2 are repeated until all the generated values are exhausted.
You can also get data directly from a generator using next(generator_obj)
.
Let’s say our objective is to process the parking violation 2018 dataset with the following steps:
- Keep only the violations issued by police (denoted by P in the data), to vehicles with the make
FORD
inNJ
. - Replace
P
withpolice
. - Concat house number, street name, and registration state fields into a single
address
field. - Write the result’s in to a csv file with the header
vehicle_make,issuing_agency,address
.
Using generator expression
import csv
input_file_name = "./parking-violations-issued-fiscal-year-2018.csv"
output_file_name = "./nj_ford_trasnportation_issued_pv_2018.csv"
# 1. stream data from input file
read_file_object = open(input_file_name, "r")
extractor = csv.reader(read_file_object) # csv reader produces a generator
# 2. keep only required fields
# field index => field;
# 2 => registration state, 7 => vehicle make, 8 => issuing agency, 23 => house number, 24 => street name
col_filtered_stream = ([row[2], row[7], row[8], row[23], row[24]] for row in extractor)
# 3. keep only violations issued by police, to vehicles with the make FORD in NJ
value_filtered_stream = filter(
lambda x: all([x[0] == "NJ", x[1] == "FORD", x[2] == "P"]), col_filtered_stream
)
# 4. replace P with police
transformed_stream = (
[stream[0], stream[1], "police", stream[3], stream[4]]
for stream in value_filtered_stream
)
# 5. concat house number, street name, registration state into a single address field
final_stream = (
[stream[1], stream[2], ", ".join([stream[3], stream[4], stream[1]])]
for stream in transformed_stream
)
final_stream # this is a generator object and has not yet started generating data
# 6. write a header row for output data
write_file_object = open(output_file_name, "w")
loader = csv.writer(
write_file_object, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
)
header = ["vehicle_make", "issuing_agency", "address"]
loader.writerow(header)
# 7. stream data into an output file
loader.writerows(final_stream) # loader asks for data from final_stream
In the above example, we create generators with the ()
comprehension format. We also chain multiple generators to form a data pipeline, this is called chaining. The above example is simple and the logic can be easily described using lambda
functions.
Let’s see an example where we use a more complex function. This function will calculate the number of days between the violation issue date and the vehicle expiration date.
import csv
from datetime import datetime
input_file_name = "./parking-violations-issued-fiscal-year-2018.csv"
output_file_name = "./pv_2018_w_days_until_violation.csv"
def get_days_until_expiration(row):
issue_date_str = row[4]
vehicle_expiration_date_float = row[12]
issue_date = datetime.strptime(issue_date_str[:10], "%Y-%m-%d")
try:
vehicle_expiration_date = datetime.strptime(
str(vehicle_expiration_date_float).split(".")[0], "%Y%m%d"
)
date_diff = (vehicle_expiration_date - issue_date).days
except ValueError as ve:
date_diff = -1
return date_diff
# 1. stream data from input file
read_file_object = open(input_file_name, "r")
extractor = csv.reader(read_file_object)
# skip header
next(extractor)
# 2. calculate days until expiration
final_stream = (row + [get_days_until_expiration(row)] for row in extractor)
# 3. stream data into an output file
write_file_object = open(output_file_name, "w")
loader = csv.writer(
write_file_object, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
)
loader.writerows(final_stream)
Using generator yield
You can also create your own generator as shown below.
def naive_generator(count=100000):
for i in range(count):
yield i
next(naive_generator) # 1
next(naive_generator) # 2
# ...
This is a simple example, but the principle remains the same for real use cases.
Mini batching
Consider an example where your data pipeline has to call an external paid service. Let’s assume that this service charges per call. It does not matter if you call it with 1 row vs 10,000 rows, It charges the same price.
In this scenario you can accumulate the rows in memory. When it hits a specified threshold, then make the service call.
import time
import csv
input_file_name = "./parking-violations-issued-fiscal-year-2018.csv"
output_file_name = "./pv_2018_ext_call_enriched.csv"
def service_call(rows):
time.sleep(15) # simulating an external service call
return rows
def batched_service_transforms(rows, batch_size=10000):
batch = []
for row in rows:
batch.append(row)
if len(batch) >= batch_size:
yield from service_call(batch)
batch = []
yield from service_call(batch)
# 1. stream data from input file
read_file_object = open(input_file_name, "r")
extractor = csv.reader(read_file_object)
# 2. make batched calls to the external service
final_stream = batched_service_transforms(extractor)
next(final_stream) # you will notice a 15s wait, simulating the external service call
[
next(final_stream) for i in range(9999)
] # you will notice almost no wait, since this data is held in process memory
# we have iterated through the first batch of 10,000
# the next call will invoke the service_call function, thus sleeping for 15s
next(final_stream) # you will notice a 15s wait
Note that the yield from items
is short hand for
for i in items:
yield i
Reading in batches from a database
When pulling a large dataset (either from a DB or an external service) into your python process, you will need to make a tradeoff between
Memory
: Pulling the entire data will cause out of memory errors.Speed
: Fetching one row at a time from the database will incur expensive network calls.
A good tradeoff would be to fetch data in batches. The size of a batch will depend on the memory available and speed requirements of your data pipeline.
In the following code snippet, we fetch data from the database in batches of 10,000 rows. These 10,000 rows will be fetched when required downstream, kept in memory, and served one row at a time until its empty. This process is repeated until the entire dataset is traversed.
import psycopg2
def generate_from_db(username, password, host, port, dbname, batch_size=10000):
conn_url = f"postgresql://{username}:{password}@{host}:{port}/{dbname}"
conn = psycopg2.connect(conn_url)
cur = conn.cursor(name="get_large_data")
cur.execute(
"SELECT c1,c2,c3 FROM big_table"
) # this will get the data ready on the db side
while True:
rows = cur.fetchmany(
batch_size
) # this will fetch data in batches from the ready data in db
if not rows:
break
yield from rows
cur.close()
conn.close()
next(generate_from_db("username", "password", "host", 5432, "database"))
Points to note in the above example
- Opening and closing a db connection is expensive, hence, we keep the connection open.
- In the above example, we use server side caching, which keeps the data ready to be served in your database. Alternatively, you can use
sort, limit, offset
to get the batches.
Pros & Cons
Pros
- No need to install or maintain external libraries.
- Native python modules have good documentation.
- Easy to use.
- Since most of the distributed data processing frameworks support python, it’s relatively easy to port this code over if needed.
Cons
- Parallelizing data processing is an involved process.
- Sorting and aggregating will require you to keep the data in memory.
- Joining multiple datasets will require complex patterns and handling edge cases.
2. Using distributed frameworks
Another option is to leverage distributed frameworks like Spark, Flink or Dask. While they are very powerful tools, you may not always need them. If you think that your data will grow significantly in size, complexity or that the requirements for speed of data processing will be high, definitely consider using these tools.
Pros & Cons
Pros
- Most data processing functions are in-built.
- Can easily scale to large data sets.
- If you are in a python ecosystem, it’s very easy to use any of these frameworks.
Cons
- Can be hard to install, setup clusters, and upgrade.
- If cluster resources are not allocated appropriately, the processing may fail.
- They have their own quirks and gotchas of which to be aware.
Conclusion
Hope this article gives you a good understanding of how to use generators to write memory efficient data pipelines. The next time you have to build a data pipeline to process a larger than memory data set, try using generators.
As always, please leave any comments or questions in the comment section below.