# Data Pipelines & ETL: Moving Data Reliably at Scale
A data pipeline extracts data from sources (CRM, databases, APIs), transforms it (clean, join, aggregate), and loads it into a warehouse.
Sounds simple. In practice:
API rate limits change; pipeline breaks
Source schema changes; transforms fail
Data quality degrades; bad data loads
Pipeline takes 8 hours; exceeds SLA
Network timeout at 2 AM; data missing
Reliable pipelines require: idempotency, monitoring, alerting, retries, and validation.
Pipeline Architecture
``` Source System (CRM, database, API) ↓ Extract (API call, database query, log read) ↓ Transform (clean, validate, join, aggregate) ↓ Validate (check row counts, test data quality) ↓ Load (insert into warehouse) ↓ Monitor (track success/failure, performance) ```
ETL Tools
**Orchestration (scheduler):**
Apache Airflow: Most popular; powerful; steep learning curve
dbt: SQL-based; great for transformations; built-in testing
Prefect: More user-friendly than Airflow
Dagster: Modern; strong testing/monitoring
**Extraction:**
Custom Python scripts (REST APIs)
Database connectors (Fivetran, Stitch)
Log collectors (Fluentd, Logstash)
**Transformation:**
SQL (dbt, Looker, Dataform)
Python/Spark (PySpark for big data)
Custom code
Pipeline Best Practices
1. Idempotency
Running pipeline twice = same result (no duplicates).
**Bad (not idempotent):**
```sql INSERT INTO customers (name, email) SELECT name, email FROM crm_source
-- Run twice → 2x rows ```
**Good (idempotent):**
```sql MERGE INTO customers c USING crm_source s ON c.email = s.email WHEN MATCHED THEN UPDATE SET name = s.name WHEN NOT MATCHED THEN INSERT (name, email) VALUES (s.name, s.email)
-- Run twice → same result ```
2. Incremental Loading
Don't reload entire dataset; load only new/changed records.
**Naive (full reload):**
``` Extract all 100M customer records Transform all Load all Time: 30 minutes ```
**Incremental:**
``` Extract customers modified since last run (100K) Transform those Load those Time: 2 minutes ```
Track: `last_modified_date` or `sync_cursor`
3. Error Handling & Retries
Network timeout? Retry. Rate-limited? Backoff and retry. Transient failure? Retry.
```python def extract_from_api(url, max_retries=3): for attempt in range(max_retries): try: response = requests.get(url, timeout=10) return response.json() except requests.exceptions.Timeout: if attempt == max_retries - 1: raise wait_time = 2 ** attempt # Exponential backoff time.sleep(wait_time) ```
4. Data Validation
Test data quality before loading.
```python
# Validation checks assert df.shape[0] > 0, "No rows extracted" assert df.shape[0] <= 1000000, "Too many rows (possible extraction error)" assert df['email'].notna().sum() / len(df) > 0.95, "95%+ emails required" assert (df['created_at'] <= pd.Timestamp.now()).all(), "No future dates"
if not all(checks): raise DataQualityError("Validation failed") ```
5. Monitoring & Alerting
Track pipeline health continuously.
**Metrics:**
Row count: Previous run vs current (sudden drop = issue)
Latency: How long pipeline takes (increase = slow query)
Success rate: % of pipelines succeeding
Data freshness: How recent is warehouse data
**Alerts:**
Row count differs >10% from baseline
Pipeline takes >2x normal time
Pipeline fails 3+ consecutive runs
Data >6 hours old (stale)
6. Data Lineage
Track: where data came from, which transforms applied, when loaded.
``` crm_customers → clean_emails → deduplicate → dim_customers ↓ 2000 rows removed (invalid emails)
crm_orders → calculate_totals → add_customer_context → fact_orders ↓ joined with dim_customers ```
Lineage enables:
Debugging (trace issue to source)
Impact analysis (if source changes, what's affected?)
Compliance (audit trail)
Real-World Pipeline Scenarios
Scenario 1: Real-Time Data Pipeline
Stock trading: Need latest prices in warehouse within 10 seconds.
**Batch pipeline (too slow):**
Runs hourly
Data is 30+ minutes stale
Traders miss opportunities
**Streaming pipeline:**
Event stream (Kafka) receives price updates
Real-time transform and load
Warehouse updated in <1 second
Traders can react immediately
**Implementation:**
```python
# Kafka consumer reads price updates for message in kafka_consumer: price = json.loads(message.value) # Transform warehouse.upsert(price) # Load to warehouse # Latency: <100ms ```
Scenario 2: The Broken Pipeline
Pipeline extracted from CRM daily. One day, source API returns different schema (new field added).
**Without robust error handling:**
Transform fails (unexpected column)
Pipeline crashes silently
Warehouse data stale for 3 days
Analysts report "data missing"
**With robust error handling:**
Schema validation detects new field
Pipeline logs issue; sends alert
Data team investigates immediately (same day)
Schema updated; pipeline resumes
Scenario 3: Incremental vs Full Load
Company syncs 100M customer records from Salesforce daily.
**Full load (naive):**
Extract all 100M
2 hour extract time
API rate limits hit halfway
Sync fails; retry next day
Data frequently stale (>24 hours)
**Incremental load:**
Track: `last_synced_at`
Extract only modified since last sync (5K records)
2 minute extract time
Near real-time freshness (1-2 minutes)
High reliability
Pipeline Testing
Test pipelines before production.
**Unit tests:**
```python def test_clean_email(): assert clean_email("alice@example.com") == "alice@example.com" assert clean_email("ALICE@EXAMPLE.COM") == "alice@example.com" assert clean_email("invalid-email") is None ```
**Integration tests:**
```python def test_extraction(): # Extract from test database df = extract_from_db(test_db) assert len(df) > 0 assert all(df.columns in expected_columns) ```
**Smoke tests (production):**
```python def smoke_test(): # Run pipeline with tiny test dataset # Verify end-to-end in production environment # Catch environmental issues (permissions, network) ```
Pipeline Performance
**Optimization:**
Partition source reads (parallel extraction)
Batch inserts (faster than row-by-row)
Index warehouse tables (faster loads)
Archive old data (reduce storage/scan time)
**Real example:**
``` Before optimization: Extract: 20 minutes (single-threaded) Transform: 10 minutes (serial joins) Load: 5 minutes (row-by-row insert) Total: 35 minutes
After: Extract: 5 minutes (10 parallel threads) Transform: 3 minutes (vectorized joins) Load: 1 minute (batch insert) Total: 9 minutes ```
The Bottom Line
Reliable pipelines are unsexy but essential. They're the plumbing of analytics.
Build them right: idempotent, validated, monitored, tested.
Done right, analytics scales from "ask IT" (weeks) to "query warehouse" (seconds).
Senthil Kumar
Founder & CEO
Founder & CEO of Sentos Technologies. Passionate about AI-powered IT solutions and helping mid-market enterprises advance beyond.