Shuffle, skew, and the small-files problem
Three reasons a Spark job is slow, how to read the UI to know which one bit you, and the canonical fix for each.
The 99% executor and the 1% holdout
A Spark job had been running 4 hours, advertised to take 30 minutes. The Spark UI Stage view showed 199 of 200 tasks done in 5 minutes. The 200th task had been running for 3 hours 55 minutes.
Classic data skew. The job grouped events by user_id, and one bot had emitted 40% of all events. That partition was 200× bigger than the rest. The fix — salt the key (user_id || floor(rand()*16)) so the bot's events spread across 16 partitions instead of 1 — dropped the job to 35 minutes.
Distributed SQL is fast when work is balanced and slow when it isn't. The Spark UI is your microscope; this lesson is the field guide for what to look for in it.
Skew, in two histograms
Before salting — partition row counts after groupBy("user_id"):
partition → 0 1 2 3 4 5 … 31
┃ ┃ ┃ ┃ ┃ ┃ ┃
rows ██ ██ ██ ██ ██ ██ ████████████████████████
(M, log) ██ ████████████████████████
██ ████████████████████████
████████████████████████
████████████████████████
████████████████████████
Partition 31 = one bot user_id, ≈ 99 % of all rows. 199 fast tasks finish in minutes, one task runs for hours. Stage wall-time = slowest task. ☢
After salting — groupBy("user_id", floor(rand()*16)):
partition → 0 1 2 3 4 5 6 7 … 31
┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃ ┃
rows ███ ███ ███ ███ ███ ███ ███ ███ ███
(uniform) ███ ███ ███ ███ ███ ███ ███ ███ ███
███ ███ ███ ███ ███ ███ ███ ███ ███
███ ███ ███ ███ ███ ███ ███ ███ ███
All 32 tasks finish at the same time. Wall-time ≈ 1/16 of the skewed run. ✓
Spread the hot key across N synthetic sub-keys so the shuffle distributes work evenly, then re-aggregate at the end. Cost: the un-salted side of any join must be expanded N-ways too — usually worth it when the alternative is a 4-hour job.
Three reasons a Spark job is slow
- Shuffle volume — the join or aggregation moves a lot of data across the network. Look for Shuffle Read / Shuffle Write sizes in the UI. Mitigate: broadcast the small side; pre-aggregate before the shuffle.
- Skew — one or two partitions are 10–100× larger than the rest. Look for the long red bar in the Stage view's Tasks histogram. Mitigate: salting, or
spark.sql.adaptive.skewJoin.enabled=true(AQE auto-detects). - Small files — input is millions of 1KB Parquet files; reading them costs more than the data. Look at Input Size / Records and divide. <100 rows per file = small-files problem. Mitigate: compaction (
OPTIMIZEon Delta, or write a coalesce job).
-- Delta Lake brings ACID transactions to S3-backed tables.
-- MERGE INTO is the SQL-standard upsert; works the same way in Snowflake and BigQuery.
MERGE INTO marts.fct_orders AS target
USING staging.orders_today AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source.status != target.status THEN
UPDATE SET target.status = source.status,
target.loaded_at = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, status, total_cents, loaded_at)
VALUES (source.order_id, source.customer_id, source.status, source.total_cents, current_timestamp());
-- The whole MERGE is one transaction; readers see either the before or the after, never partial.
-- Compact periodically: OPTIMIZE marts.fct_orders ZORDER BY (customer_id);Demonstrate the *salting* fix conceptually. Imagine `customer_id = 1` (Ada) generates 80% of orders. Salting adds a random suffix to spread her rows across N partitions during shuffle. Return one row, two columns: `customer_id`, `salted_key`, where `salted_key` is `customer_id::text || '_' || (order_id % 4)::text` (4-way salt). Use only Ada's rows. Order by `order_id`.
Takeaway: distributed SQL is row-based parallelism — fast when work is balanced. The three failure modes are shuffle, skew, and small files; the Spark UI tells you which one bit. Delta MERGE INTO is the lakehouse upsert. AQE auto-fixes some skew; salting fixes the rest.