Navigation

Python

How to Use Pipeline for ML Workflows

Streamline machine learning workflows with scikit-learn's Pipeline to combine preprocessing, feature selection, and modeling into clean, reproducible pipelines.

Table Of Contents

Clean Code, Better Models

Scattered preprocessing steps lead to messy code and data leakage. Pipelines package your entire ML workflow into a single, reliable object.

Basic Pipeline Creation

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# Generate sample data
X, y = make_classification(n_samples=1000, n_features=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(random_state=42))
])

# Fit and predict
pipeline.fit(X_train, y_train)
accuracy = pipeline.score(X_test, y_test)

print(f"Pipeline accuracy: {accuracy:.3f}")

Multi-Step Preprocessing Pipeline

from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.decomposition import PCA

# Complex preprocessing pipeline
complex_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('selector', SelectKBest(f_classif, k=8)),
    ('pca', PCA(n_components=5)),
    ('classifier', LogisticRegression(random_state=42))
])

# Fit pipeline
complex_pipeline.fit(X_train, y_train)
complex_accuracy = complex_pipeline.score(X_test, y_test)

print(f"Complex pipeline accuracy: {complex_accuracy:.3f}")

ColumnTransformer for Mixed Data

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
import pandas as pd
import numpy as np

# Mixed data types
mixed_data = pd.DataFrame({
    'numeric1': np.random.randn(1000),
    'numeric2': np.random.randn(1000),
    'category1': np.random.choice(['A', 'B', 'C'], 1000),
    'category2': np.random.choice(['X', 'Y'], 1000),
    'target': np.random.randint(0, 2, 1000)
})

X_mixed = mixed_data.drop('target', axis=1)
y_mixed = mixed_data['target']

# Define preprocessing for different column types
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), ['numeric1', 'numeric2']),
        ('cat', OneHotEncoder(drop='first'), ['category1', 'category2'])
    ]
)

# Complete pipeline
mixed_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(random_state=42))
])

# Split and fit
X_mixed_train, X_mixed_test, y_mixed_train, y_mixed_test = train_test_split(
    X_mixed, y_mixed, test_size=0.2, random_state=42
)

mixed_pipeline.fit(X_mixed_train, y_mixed_train)
mixed_accuracy = mixed_pipeline.score(X_mixed_test, y_mixed_test)
print(f"Mixed data pipeline accuracy: {mixed_accuracy:.3f}")

Pipeline with Cross-Validation

from sklearn.model_selection import cross_val_score, GridSearchCV

# Cross-validation with pipeline
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5)
print(f"CV scores: {cv_scores.round(3)}")
print(f"Mean CV score: {cv_scores.mean():.3f}")

# Grid search with pipeline
param_grid = {
    'scaler__with_std': [True, False],
    'classifier__C': [0.1, 1, 10],
    'classifier__penalty': ['l1', 'l2']
}

# Note: Need solver that supports l1 penalty
pipeline_l1 = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(solver='liblinear', random_state=42))
])

grid_search = GridSearchCV(pipeline_l1, param_grid, cv=3, scoring='accuracy')
grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_:.3f}")

Custom Transformers

from sklearn.base import BaseEstimator, TransformerMixin

class LogTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, add_constant=1):
        self.add_constant = add_constant
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return np.log(X + self.add_constant)

# Pipeline with custom transformer
custom_pipeline = Pipeline([
    ('log_transform', LogTransformer()),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(random_state=42))
])

# Use with positive data
X_positive = np.abs(X_train)
custom_pipeline.fit(X_positive, y_train)
custom_score = custom_pipeline.score(np.abs(X_test), y_test)
print(f"Custom transformer pipeline: {custom_score:.3f}")

Pipeline Inspection

# Access pipeline steps
print("Pipeline steps:")
for name, step in pipeline.named_steps.items():
    print(f"{name}: {step}")

# Get feature names after preprocessing
if hasattr(pipeline.named_steps['scaler'], 'get_feature_names_out'):
    feature_names = pipeline.named_steps['scaler'].get_feature_names_out()
    print(f"Feature names: {feature_names[:5]}...")

# Get coefficients from the classifier
coefficients = pipeline.named_steps['classifier'].coef_[0]
print(f"Model coefficients: {coefficients[:5].round(3)}...")

Conditional Steps with FunctionTransformer

from sklearn.preprocessing import FunctionTransformer

def conditional_log(X):
    """Apply log only to positive values"""
    return np.where(X > 0, np.log(X + 1), X)

# Pipeline with function transformer
func_pipeline = Pipeline([
    ('conditional_log', FunctionTransformer(conditional_log)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(random_state=42))
])

func_pipeline.fit(X_train, y_train)
func_score = func_pipeline.score(X_test, y_test)
print(f"Function transformer pipeline: {func_score:.3f}")

Pipeline Persistence

import joblib

# Save pipeline
joblib.dump(pipeline, 'ml_pipeline.pkl')

# Load pipeline
loaded_pipeline = joblib.load('ml_pipeline.pkl')

# Use loaded pipeline
loaded_predictions = loaded_pipeline.predict(X_test[:5])
print(f"Loaded pipeline predictions: {loaded_predictions}")

Multiple Algorithms Pipeline

from sklearn.ensemble import VotingClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier

# Create base models
models = [
    ('lr', LogisticRegression(random_state=42)),
    ('svm', SVC(probability=True, random_state=42)),
    ('rf', RandomForestClassifier(random_state=42))
]

# Voting classifier pipeline
voting_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('voting', VotingClassifier(models, voting='soft'))
])

voting_pipeline.fit(X_train, y_train)
voting_score = voting_pipeline.score(X_test, y_test)
print(f"Voting classifier pipeline: {voting_score:.3f}")

Real-World Example

# Text classification pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB

# Sample text data
texts = [
    "This movie is great", "Terrible film", "Love this movie",
    "Worst movie ever", "Amazing acting", "Boring story"
]
labels = [1, 0, 1, 0, 1, 0]

# Text processing pipeline
text_pipeline = Pipeline([
    ('tfidf', TfidfVectorizer(max_features=1000)),
    ('classifier', MultinomialNB())
])

text_pipeline.fit(texts, labels)

# Predict new text
new_texts = ["Great movie", "Bad acting"]
predictions = text_pipeline.predict(new_texts)
print(f"Text predictions: {predictions}")

Pipeline Best Practices

# Best practices demonstration
def create_robust_pipeline():
    """Create a robust ML pipeline"""
    
    # Define preprocessing
    numeric_features = ['numeric1', 'numeric2']
    categorical_features = ['category1', 'category2']
    
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', Pipeline([
                ('scaler', StandardScaler()),
                ('selector', SelectKBest(f_classif, k=2))
            ]), numeric_features),
            ('cat', Pipeline([
                ('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
            ]), categorical_features)
        ]
    )
    
    # Complete pipeline
    robust_pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', LogisticRegression(
            random_state=42,
            max_iter=1000
        ))
    ])
    
    return robust_pipeline

# Create and test robust pipeline
robust_pipeline = create_robust_pipeline()
robust_pipeline.fit(X_mixed_train, y_mixed_train)
robust_score = robust_pipeline.score(X_mixed_test, y_mixed_test)
print(f"Robust pipeline accuracy: {robust_score:.3f}")

Pipeline Benefits

  • Prevents data leakage by applying transforms consistently
  • Simplifies code with single fit/predict interface
  • Enables easy hyperparameter tuning across all steps
  • Facilitates model deployment as single object
  • Ensures reproducibility of preprocessing steps

Master Advanced ML

Explore automated machine learning, learn model deployment strategies, and discover MLOps best practices.

Share this article

Add Comment

No comments yet. Be the first to comment!

More from Python