Skip to main content

Command Palette

Search for a command to run...

Industry News

Data Pipelines & ETL: Moving Data Reliably at Scale

13 May 202614 min readSenthil Kumar

# Data Pipelines & ETL: Moving Data Reliably at Scale

A data pipeline extracts data from sources (CRM, databases, APIs), transforms it (clean, join, aggregate), and loads it into a warehouse.

Sounds simple. In practice:

API rate limits change; pipeline breaks

Source schema changes; transforms fail

Data quality degrades; bad data loads

Pipeline takes 8 hours; exceeds SLA

Network timeout at 2 AM; data missing

Reliable pipelines require: idempotency, monitoring, alerting, retries, and validation.

Pipeline Architecture

``` Source System (CRM, database, API) ↓ Extract (API call, database query, log read) ↓ Transform (clean, validate, join, aggregate) ↓ Validate (check row counts, test data quality) ↓ Load (insert into warehouse) ↓ Monitor (track success/failure, performance) ```

ETL Tools

**Orchestration (scheduler):**

Apache Airflow: Most popular; powerful; steep learning curve

dbt: SQL-based; great for transformations; built-in testing

Prefect: More user-friendly than Airflow

Dagster: Modern; strong testing/monitoring

**Extraction:**

Custom Python scripts (REST APIs)

Database connectors (Fivetran, Stitch)

Log collectors (Fluentd, Logstash)

**Transformation:**

SQL (dbt, Looker, Dataform)

Python/Spark (PySpark for big data)

Custom code

Pipeline Best Practices

1. Idempotency

Running pipeline twice = same result (no duplicates).

**Bad (not idempotent):**

```sql INSERT INTO customers (name, email) SELECT name, email FROM crm_source

-- Run twice → 2x rows ```

**Good (idempotent):**

```sql MERGE INTO customers c USING crm_source s ON c.email = s.email WHEN MATCHED THEN UPDATE SET name = s.name WHEN NOT MATCHED THEN INSERT (name, email) VALUES (s.name, s.email)

-- Run twice → same result ```

2. Incremental Loading

Don't reload entire dataset; load only new/changed records.

**Naive (full reload):**

``` Extract all 100M customer records Transform all Load all Time: 30 minutes ```

**Incremental:**

``` Extract customers modified since last run (100K) Transform those Load those Time: 2 minutes ```

Track: `last_modified_date` or `sync_cursor`

3. Error Handling & Retries

Network timeout? Retry. Rate-limited? Backoff and retry. Transient failure? Retry.

```python def extract_from_api(url, max_retries=3): for attempt in range(max_retries): try: response = requests.get(url, timeout=10) return response.json() except requests.exceptions.Timeout: if attempt == max_retries - 1: raise wait_time = 2 ** attempt # Exponential backoff time.sleep(wait_time) ```

4. Data Validation

Test data quality before loading.

```python

# Validation checks assert df.shape[0] > 0, "No rows extracted" assert df.shape[0] <= 1000000, "Too many rows (possible extraction error)" assert df['email'].notna().sum() / len(df) > 0.95, "95%+ emails required" assert (df['created_at'] <= pd.Timestamp.now()).all(), "No future dates"

if not all(checks): raise DataQualityError("Validation failed") ```

5. Monitoring & Alerting

Track pipeline health continuously.

**Metrics:**

Row count: Previous run vs current (sudden drop = issue)

Latency: How long pipeline takes (increase = slow query)

Success rate: % of pipelines succeeding

Data freshness: How recent is warehouse data

**Alerts:**

Row count differs >10% from baseline

Pipeline takes >2x normal time

Pipeline fails 3+ consecutive runs

Data >6 hours old (stale)

6. Data Lineage

Track: where data came from, which transforms applied, when loaded.

``` crm_customers → clean_emails → deduplicate → dim_customers ↓ 2000 rows removed (invalid emails)

crm_orders → calculate_totals → add_customer_context → fact_orders ↓ joined with dim_customers ```

Lineage enables:

Debugging (trace issue to source)

Impact analysis (if source changes, what's affected?)

Compliance (audit trail)

Real-World Pipeline Scenarios

Scenario 1: Real-Time Data Pipeline

Stock trading: Need latest prices in warehouse within 10 seconds.

**Batch pipeline (too slow):**

Runs hourly

Data is 30+ minutes stale

Traders miss opportunities

**Streaming pipeline:**

Event stream (Kafka) receives price updates

Real-time transform and load

Warehouse updated in <1 second

Traders can react immediately

**Implementation:**

```python

# Kafka consumer reads price updates for message in kafka_consumer: price = json.loads(message.value) # Transform warehouse.upsert(price) # Load to warehouse # Latency: <100ms ```

Scenario 2: The Broken Pipeline

Pipeline extracted from CRM daily. One day, source API returns different schema (new field added).

**Without robust error handling:**

Transform fails (unexpected column)

Pipeline crashes silently

Warehouse data stale for 3 days

Analysts report "data missing"

**With robust error handling:**

Schema validation detects new field

Pipeline logs issue; sends alert

Data team investigates immediately (same day)

Schema updated; pipeline resumes

Scenario 3: Incremental vs Full Load

Company syncs 100M customer records from Salesforce daily.

**Full load (naive):**

Extract all 100M

2 hour extract time

API rate limits hit halfway

Sync fails; retry next day

Data frequently stale (>24 hours)

**Incremental load:**

Track: `last_synced_at`

Extract only modified since last sync (5K records)

2 minute extract time

Near real-time freshness (1-2 minutes)

High reliability

Pipeline Testing

Test pipelines before production.

**Unit tests:**

```python def test_clean_email(): assert clean_email("alice@example.com") == "alice@example.com" assert clean_email("ALICE@EXAMPLE.COM") == "alice@example.com" assert clean_email("invalid-email") is None ```

**Integration tests:**

```python def test_extraction(): # Extract from test database df = extract_from_db(test_db) assert len(df) > 0 assert all(df.columns in expected_columns) ```

**Smoke tests (production):**

```python def smoke_test(): # Run pipeline with tiny test dataset # Verify end-to-end in production environment # Catch environmental issues (permissions, network) ```

Pipeline Performance

**Optimization:**

Partition source reads (parallel extraction)

Batch inserts (faster than row-by-row)

Index warehouse tables (faster loads)

Archive old data (reduce storage/scan time)

**Real example:**

``` Before optimization: Extract: 20 minutes (single-threaded) Transform: 10 minutes (serial joins) Load: 5 minutes (row-by-row insert) Total: 35 minutes

After: Extract: 5 minutes (10 parallel threads) Transform: 3 minutes (vectorized joins) Load: 1 minute (batch insert) Total: 9 minutes ```

The Bottom Line

Reliable pipelines are unsexy but essential. They're the plumbing of analytics.

Build them right: idempotent, validated, monitored, tested.

Done right, analytics scales from "ask IT" (weeks) to "query warehouse" (seconds).

Senthil Kumar

Founder & CEO

Founder & CEO of Sentos Technologies. Passionate about AI-powered IT solutions and helping mid-market enterprises advance beyond.

Share this article

Want more insights?

Subscribe to the Sentos newsletter for expert perspectives on managed IT, cybersecurity, AI, and digital transformation.

Advance Beyond.