TracksPractical Coding FoundationsWorking With FilesSimple Data Pipelines(9 of 9)

Simple Data Pipelines

A data pipeline follows a simple pattern: read input, process it, write output. This pattern appears everywhere — from simple scripts to enterprise systems. Building your first pipeline combines all your file skills into something genuinely useful.

The Pipeline Pattern

Every data pipeline has three stages:

  1. Read — Load data from a source file
  2. Process — Transform, filter, or analyze the data
  3. Write — Save results to an output file
# 1. Read
with open("input.txt", "r") as f:
    data = f.read()

# 2. Process
processed = data.upper()

# 3. Write
with open("output.txt", "w") as f:
    f.write(processed)

This trivial example converts text to uppercase, but the structure scales to complex transformations.

A Real Example: CSV Processing

Let's build a pipeline that reads customer data, adds a computed field, and writes the results:

import csv

# 1. Read input
with open("customers.csv", "r") as infile:
    reader = csv.DictReader(infile)
    customers = list(reader)

# 2. Process: add full name field
for customer in customers:
    first = customer["first_name"]
    last = customer["last_name"]
    customer["full_name"] = f"{first} {last}"

# 3. Write output
fieldnames = ["first_name", "last_name", "email", "full_name"]
with open("customers_processed.csv", "w", newline="") as outfile:
    writer = csv.DictWriter(outfile, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(customers)

The newline="" parameter prevents extra blank lines on Windows.

Adding Error Handling

Production pipelines need to handle problems gracefully:

import csv
import os

input_file = "sales.csv"
output_file = "sales_summary.csv"

# Check input exists
if not os.path.exists(input_file):
    print(f"Error: {input_file} not found")
    exit(1)

try:
    # Read
    with open(input_file, "r") as f:
        reader = csv.DictReader(f)
        sales = list(reader)
    
    # Process: calculate totals
    for sale in sales:
        quantity = int(sale["quantity"])
        price = float(sale["price"])
        sale["total"] = str(quantity * price)
    
    # Write
    fieldnames = ["product", "quantity", "price", "total"]
    with open(output_file, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(sales)
    
    print(f"Processed {len(sales)} records")
    
except Exception as e:
    print(f"Pipeline failed: {e}")

Why Pipelines Matter

This pattern is the foundation of:

  • Report generation
  • Data cleaning and transformation
  • Log analysis
  • Batch processing

Master this pattern, and you can automate countless tedious tasks.

See More

Further Reading

You need to be signed in to leave a comment and join the discussion