Simple Data Pipelines
A data pipeline follows a simple pattern: read input, process it, write output. This pattern appears everywhere — from simple scripts to enterprise systems. Building your first pipeline combines all your file skills into something genuinely useful.
The Pipeline Pattern
Every data pipeline has three stages:
- Read — Load data from a source file
- Process — Transform, filter, or analyze the data
- Write — Save results to an output file
# 1. Read
with open("input.txt", "r") as f:
data = f.read()
# 2. Process
processed = data.upper()
# 3. Write
with open("output.txt", "w") as f:
f.write(processed)
This trivial example converts text to uppercase, but the structure scales to complex transformations.
A Real Example: CSV Processing
Let's build a pipeline that reads customer data, adds a computed field, and writes the results:
import csv
# 1. Read input
with open("customers.csv", "r") as infile:
reader = csv.DictReader(infile)
customers = list(reader)
# 2. Process: add full name field
for customer in customers:
first = customer["first_name"]
last = customer["last_name"]
customer["full_name"] = f"{first} {last}"
# 3. Write output
fieldnames = ["first_name", "last_name", "email", "full_name"]
with open("customers_processed.csv", "w", newline="") as outfile:
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(customers)
The newline="" parameter prevents extra blank lines on Windows.
Adding Error Handling
Production pipelines need to handle problems gracefully:
import csv
import os
input_file = "sales.csv"
output_file = "sales_summary.csv"
# Check input exists
if not os.path.exists(input_file):
print(f"Error: {input_file} not found")
exit(1)
try:
# Read
with open(input_file, "r") as f:
reader = csv.DictReader(f)
sales = list(reader)
# Process: calculate totals
for sale in sales:
quantity = int(sale["quantity"])
price = float(sale["price"])
sale["total"] = str(quantity * price)
# Write
fieldnames = ["product", "quantity", "price", "total"]
with open(output_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(sales)
print(f"Processed {len(sales)} records")
except Exception as e:
print(f"Pipeline failed: {e}")
Why Pipelines Matter
This pattern is the foundation of:
- Report generation
- Data cleaning and transformation
- Log analysis
- Batch processing
Master this pattern, and you can automate countless tedious tasks.