Codabra

Shuffle, skew, and the small-files problem

Three reasons a Spark job is slow, how to read the UI to know which one bit you, and the canonical fix for each.

The 99% executor and the 1% holdout

A Spark job had been running 4 hours, advertised to take 30 minutes. The Spark UI Stage view showed 199 of 200 tasks done in 5 minutes. The 200th task had been running for 3 hours 55 minutes.

Classic data skew. The job grouped events by user_id, and one bot had emitted 40% of all events. That partition was 200× bigger than the rest. The fix — salt the key (user_id || floor(rand()*16)) so the bot's events spread across 16 partitions instead of 1 — dropped the job to 35 minutes.

Distributed SQL is fast when work is balanced and slow when it isn't. The Spark UI is your microscope; this lesson is the field guide for what to look for in it.

Skew, in two histograms

Before salting — partition row counts after groupBy("user_id"):

  partition →  0   1   2   3   4   5   …  31
              ┃   ┃   ┃   ┃   ┃   ┃       ┃
  rows        ██  ██  ██  ██  ██  ██      ████████████████████████
  (M, log)    ██                          ████████████████████████
              ██                          ████████████████████████
                                          ████████████████████████
                                          ████████████████████████
                                          ████████████████████████

Partition 31 = one bot user_id, ≈ 99 % of all rows. 199 fast tasks finish in minutes, one task runs for hours. Stage wall-time = slowest task.

After salting — groupBy("user_id", floor(rand()*16)):

  partition →  0   1   2   3   4   5   6   7   …  31
              ┃   ┃   ┃   ┃   ┃   ┃   ┃   ┃       ┃
  rows        ███ ███ ███ ███ ███ ███ ███ ███     ███
  (uniform)   ███ ███ ███ ███ ███ ███ ███ ███     ███
              ███ ███ ███ ███ ███ ███ ███ ███     ███
              ███ ███ ███ ███ ███ ███ ███ ███     ███

All 32 tasks finish at the same time. Wall-time ≈ 1/16 of the skewed run. ✓

Spread the hot key across N synthetic sub-keys so the shuffle distributes work evenly, then re-aggregate at the end. Cost: the un-salted side of any join must be expanded N-ways too — usually worth it when the alternative is a 4-hour job.

Three reasons a Spark job is slow

  1. Shuffle volume — the join or aggregation moves a lot of data across the network. Look for Shuffle Read / Shuffle Write sizes in the UI. Mitigate: broadcast the small side; pre-aggregate before the shuffle.
  2. Skew — one or two partitions are 10–100× larger than the rest. Look for the long red bar in the Stage view's Tasks histogram. Mitigate: salting, or spark.sql.adaptive.skewJoin.enabled=true (AQE auto-detects).
  3. Small files — input is millions of 1KB Parquet files; reading them costs more than the data. Look at Input Size / Records and divide. <100 rows per file = small-files problem. Mitigate: compaction (OPTIMIZE on Delta, or write a coalesce job).
Delta MERGE INTO — the lakehouse upsert
-- Delta Lake brings ACID transactions to S3-backed tables.
-- MERGE INTO is the SQL-standard upsert; works the same way in Snowflake and BigQuery.

MERGE INTO marts.fct_orders AS target
USING staging.orders_today AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.status != target.status THEN
  UPDATE SET target.status = source.status,
             target.loaded_at = current_timestamp()
WHEN NOT MATCHED THEN
  INSERT (order_id, customer_id, status, total_cents, loaded_at)
  VALUES (source.order_id, source.customer_id, source.status, source.total_cents, current_timestamp());

-- The whole MERGE is one transaction; readers see either the before or the after, never partial.
-- Compact periodically: OPTIMIZE marts.fct_orders ZORDER BY (customer_id);

Demonstrate the *salting* fix conceptually. Imagine `customer_id = 1` (Ada) generates 80% of orders. Salting adds a random suffix to spread her rows across N partitions during shuffle. Return one row, two columns: `customer_id`, `salted_key`, where `salted_key` is `customer_id::text || '_' || (order_id % 4)::text` (4-way salt). Use only Ada's rows. Order by `order_id`.

Takeaway: distributed SQL is row-based parallelism — fast when work is balanced. The three failure modes are shuffle, skew, and small files; the Spark UI tells you which one bit. Delta MERGE INTO is the lakehouse upsert. AQE auto-fixes some skew; salting fixes the rest.