10  Data pipelines

10.1 The cell that does everything

Return one last time to the monolithic cell from Chapter 1 — the one that loads the data, cleans it, engineers features, trains a model, and draws a plot, all in a single 400-line block. It works, and while you’re exploring it even feels efficient: everything is in one place, and you can re-run it with a keystroke. The trouble starts when the work has to happen again. New data arrives and you must retrain, so you re-run all 400 lines, including the expensive load you didn’t need to repeat. One feature breaks and you re-read the whole cell to find it. A colleague wants just your cleaning logic and can’t take it without the training code welded to it.

A pipeline is the same logic, broken into stages that connect. Each stage does one part of the job, takes its input and produces its output explicitly, and can be run, tested, and reused on its own. Nothing about the computation changes; what changes is that the workflow becomes something you can reason about and re-run in pieces rather than a single indivisible block.

10.2 Stages that compose

A stage is just a function with one responsibility: load, validate, clean, engineer features, train. Made pure where possible (Chapter 6), each takes data in and returns data out, and the pipeline is their composition.

import numpy as np
import pandas as pd

rng = np.random.default_rng(42)
raw = pd.DataFrame({
    "spend": rng.exponential(50, 1_000),
    "active_days": rng.integers(0, 365, 1_000),
})

def clean(df: pd.DataFrame) -> pd.DataFrame:
    """Drop customers with no active days — they can't have a daily rate."""
    return df[df["active_days"] > 0].copy()

def add_features(df: pd.DataFrame) -> pd.DataFrame:
    """Derive spend per active day."""
    return df.assign(spend_per_day=df["spend"] / df["active_days"])

def summarise(df: pd.DataFrame) -> pd.Series:
    """The model-ready summary the downstream step consumes."""
    return df["spend_per_day"].describe()

# The pipeline is the composition of the stages.
result = summarise(add_features(clean(raw)))
print(result[["count", "mean", "max"]])
count    997.000000
mean       0.807595
max       68.232761
Name: spend_per_day, dtype: float64

Each stage can be read in isolation, tested in isolation (Chapter 7), and reused in isolation — clean is now available to any other analysis without dragging the feature engineering along. Read top to bottom, the stage names alone document how raw data becomes a model-ready summary, which the monolithic cell never did.

NoteData Science Bridge

You already build pipelines — just not at this scale. Pipeline([("scale", StandardScaler()), ("model", LogisticRegression())]) chains transformers and an estimator into a single object so that preprocessing and fitting move together and, crucially, so that scaling fitted on the training fold can’t leak into validation. A data pipeline is that same idea scaled up from the modelling steps to the entire workflow: each stage is a step, and the datasets flowing between them are what the Pipeline passes implicitly between transformers.

Where it breaks down: an sklearn Pipeline lives inside one process and one .fit() call, holding everything in memory. A workflow pipeline usually spans processes, persisted files, and even schedules — the cleaning might run nightly and the training weekly — so it needs explicit intermediate artefacts and an orchestrator to run the stages in order, not just method chaining. The principle transfers; the machinery is heavier.

10.3 Idempotency and intermediate artefacts

Once stages are explicit, two properties make a pipeline cheap to live with. The first is idempotency: running a stage twice produces the same result as running it once, with no surprising side effects — so a re-run is always safe. The second is persisting intermediate artefacts: a stage writes its output to disk (under data/interim/ or data/processed/, per the previous chapter) so that downstream stages, and future re-runs, can read it instead of recomputing it.

Together these make re-runs partial rather than all-or-nothing. If the raw load is expensive but unchanged, you cache it once and never pay for it again until the input changes.

import tempfile
from pathlib import Path

cache_dir = Path(tempfile.mkdtemp())

def build_features(raw: pd.DataFrame, cache: Path = cache_dir / "features.csv") -> pd.DataFrame:
    """Compute features once; reuse the cached artefact on later runs."""
    if cache.exists():
        print("loading cached features")
        return pd.read_csv(cache)
    print("computing features")
    out = add_features(clean(raw))
    out.to_csv(cache, index=False)
    return out

first = build_features(raw)    # computes and caches
second = build_features(raw)   # reads the cache, skips the work

# Compare the values, not the whole Series: a CSV round-trip doesn't
# preserve the original (gappy) index left by `clean`, so the cached
# and recomputed results match in value even though their indexes differ.
same = np.allclose(first["spend_per_day"].to_numpy(),
                   second["spend_per_day"].to_numpy())
print(f"same values from cache: {same}")
computing features
loading cached features
same values from cache: True

The first call computes and writes the artefact; the second reads it back and skips the work entirely. In a real pipeline an orchestrator handles this caching for you, invalidating a stage’s cache when its inputs or code change — but the principle is the one shown here: compute once, persist, reuse.

10.4 Validation gates

The seams between stages are the right place to check that the data is what the next stage expects. A validation gate asserts the schema, ranges, and assumptions at a boundary — the right columns are present, no nulls where they’re forbidden, values in a plausible range, no target leaking into the features — and fails loudly when they don’t hold. Catching bad data at the boundary turns a cryptic error three stages downstream into a clear message at the point the problem entered.

# A productionised gate, using pandera to declare the contract as a schema
import pandera.pandas as pa

schema = pa.DataFrameSchema({
    "spend": pa.Column(float, pa.Check.ge(0)),
    "active_days": pa.Column(int, pa.Check.in_range(0, 366)),
})

def validate(df):
    return schema.validate(df)   # raises with a precise message on any breach

This is testing applied to data rather than code (Chapter 7): you’re not checking that the cleaning function is correct, but that what arrived at this stage satisfies the contract the next stage relies on. The same exit-code discipline from Chapter 4 applies — a gate that fails should stop the pipeline, not let bad data flow on. Lightweight gates can be plain assertions; pandera and Great Expectations formalise them into reusable, documented schemas.

10.5 Orchestrating the stages

Something has to run the stages in the right order, and re-run only what’s needed. For a linear or simply branching workflow, the make from Chapter 4 is enough — each stage a target that depends on the artefacts of the stages before it:

data/processed/features.csv: data/raw/customers.csv src/features.py
    python -m customer_value.features

models/model.pkl: data/processed/features.csv src/train.py
    python -m customer_value.train

Because each target declares its dependencies, make rebuilds a stage only when its inputs or code have changed — the file-level version of the caching above. When pipelines grow complex, scheduled, or need retries and monitoring, purpose-built orchestrators (Snakemake, Prefect, Dagster, Airflow) take over, but the model is the same directed graph of stages. We return to running these automatically in Continuous integration and on a schedule in Deployment; the companion volume, Thinking in Uncertainty, covers the statistical side of pipeline design, such as guarding against train–serve skew.

TipAuthor’s Note

The monolithic cell blurs the stages together, and that blurring feels like a virtue while you’re exploring — it’s all in front of you, and you can change anything and re-run instantly. The cost is hidden until the work has to repeat: every run becomes all-or-nothing, every change risks the whole, and the only documentation of how data becomes a model is 400 lines you have to read in full.

Explicit stages invert this. The cost of a change becomes local — fix the cleaning, re-run from cleaning, keep the cached raw load — and the pipeline becomes legible, because the stage names and their dependencies are the documentation of the workflow. None of this argues against the exploratory monolith; that’s the right tool for discovery, when you don’t yet know what the stages are. The pipeline is what you build once you do — when the workflow has to run again, on new data, reliably, without you in the room to nurse it through 400 lines.

10.6 Summary

A pipeline turns a monolithic analysis into stages you can run, test, and reuse in pieces:

  1. Break the monolith into composable stages. Each stage is a single-responsibility function taking data in and returning data out; the pipeline is their composition, and every stage is independently testable and reusable.

  2. Make stages idempotent and cache their outputs. Re-running is then safe and cheap, and you recompute only the stages whose inputs or code have changed.

  3. Validate at the seams. A gate that checks schema, ranges, and assumptions between stages catches bad data where it enters, not three stages later — testing the data, not the code.

  4. Orchestrate the graph. make runs simple pipelines and rebuilds only what changed; dedicated orchestrators take over when workflows grow scheduled and complex.

The next chapter addresses the values that thread through every stage but should live in none of them: configuration and secrets.

10.7 Exercises

  1. Take a monolithic notebook cell or script of your own that does several things and break it into composable stage functions, each taking and returning data. Run them as a pipeline. Which stage turned out to be useful on its own, in a context you hadn’t anticipated?

  2. Add a validation gate between two stages: assert the schema and ranges the next stage expects (with plain assertions, or with pandera), and make it fail loudly on bad input. What specific bad data from a past project would this gate have caught at the boundary?

  3. Make one expensive stage idempotent and cached: persist its output to data/interim/, and skip the computation when the artefact already exists. Re-run the pipeline twice and confirm the second run skips the cached stage.

  4. Conceptual: The Data Science Bridge compares a workflow pipeline to an sklearn Pipeline. Give one way the analogy holds and one way it breaks down. What does a workflow pipeline need that an sklearn Pipeline does not?

  5. Conceptual: Not every analysis needs a pipeline framework. Describe a piece of work for which a single script (or even a notebook) is the right tool, and name the signal that tells you a workflow has outgrown it and needs explicit stages and orchestration.