Skip to main content

spark-engineer

Expert knowledge of Apache Spark architecture, DataFrame optimization, shuffle reduction, Delta Lake, structured streaming, and Spark UI debugging. Trigger phrases: when working with Apache Spark, Spark DataFrame optimization, Spark shuffles,

MoltbotDen
Data & Analytics

Apache Spark Engineer

Spark's performance model is fundamentally about avoiding shuffles and reading less data. A shuffle is any operation that requires data to move between partitions across the network — it's almost always the most expensive part of a Spark job. The expert approach: understand what causes shuffles, minimize them through partitioning strategy and broadcast joins, and use the Spark UI to diagnose the remaining bottlenecks.

Core Mental Model

Spark lazily builds a DAG of transformations; nothing executes until an action (collect, write, count) is called. The Catalyst optimizer rewrites your DataFrame operations into an optimized physical plan. Tungsten handles memory management and code generation. Stages are separated by shuffles — each stage runs in parallel, but between stages all data must be sorted/hashed and written to disk/network. Fewer stages = faster job. The key optimization levers: reduce shuffle size, eliminate data skew, cache only what's reused, and read only columns you need.


Architecture Overview

Driver Program
  ├── SparkContext / SparkSession
  ├── DAGScheduler (logical → physical plan)
  └── TaskScheduler (distributes tasks)

Cluster Manager (YARN, Kubernetes, Standalone)
  └── Executors (JVM processes on worker nodes)
        ├── Task Threads (cpu cores)
        └── Memory (heap + off-heap)

Execution hierarchy:
  Job     → triggered by one action (collect, write, count)
  Stage   → separated by shuffles
  Task    → one per partition, runs on one executor core

Memory regions (executor):
  Execution memory: shuffle, aggregation, sorting (60% default)
  Storage memory:   persist/cache (40% default)
  User memory:      UDFs, RDDs (reserved from execution)
  Reserved:         Spark internal (300MB)

DataFrame vs RDD

# ❌ RDD: unoptimized, no Catalyst, no Tungsten, no predicate pushdown
rdd = sc.textFile("data.csv").map(parse_line).filter(lambda r: r['amount'] > 100)

# ✅ DataFrame: Catalyst optimizer + Tungsten code gen + columnar execution
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.sql.adaptive.enabled", "true") \            # Adaptive Query Execution
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

df = spark.read.parquet("s3://bucket/data/")
result = df.filter(F.col("amount") > 100).groupBy("category").agg(F.sum("amount"))

Shuffle Reduction

Broadcast Join (Most Impactful Optimization)

from pyspark.sql.functions import broadcast

# ❌ Sort-merge join: shuffles BOTH tables
large_orders.join(large_customers, "customer_id")

# ✅ Broadcast join: small table broadcast to all executors (no shuffle of large table)
small_products = spark.read.parquet("s3://bucket/products/")  # < 10MB
large_orders   = spark.read.parquet("s3://bucket/orders/")

result = large_orders.join(broadcast(small_products), "product_id")
# Threshold for auto-broadcast (default 10MB):
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024))  # 50MB

# Check the execution plan to verify broadcast was used
result.explain(mode="formatted")
# Should show: BroadcastHashJoin, not SortMergeJoin

Partition Strategy

# repartition vs coalesce:
#   repartition(n): full shuffle, creates exactly n equal partitions
#                   Use when: increasing partitions, or evening out skew
#   coalesce(n):    narrow transformation, no shuffle, only reduces partitions
#                   Use when: reducing partitions before write (avoids shuffle)

# Rule of thumb: output partition count = cluster_cores × 2-4
# 40 cores × 2 = 80 output partitions
df.repartition(80, "category").write.parquet("output/")

# coalesce to reduce small files on write
df.coalesce(16).write.parquet("output/")

# Partition by column for downstream partition pruning
df.write.partitionBy("year", "month").parquet("output/")
# Future reads: spark.read.parquet("output/").filter("year = 2024 AND month = 1")
# Only reads the 2024/1 partition — massive speedup

# Adaptive Query Execution (Spark 3.0+): auto-coalesces shuffle partitions
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB")

PySpark Column Operations vs UDFs

# ❌ Python UDF: Python process, serialization overhead, no optimization (10x+ slower)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def classify_amount(amount):
    if amount > 1000: return "high"
    elif amount > 100: return "medium"
    return "low"

df.withColumn("tier", classify_amount(F.col("amount")))

# ✅ Native Spark expressions: runs in JVM, Catalyst-optimized
df.withColumn("tier",
    F.when(F.col("amount") > 1000, "high")
     .when(F.col("amount") > 100, "medium")
     .otherwise("low")
)

# ✅ Pandas UDF (vectorized): uses Apache Arrow, 10-100x faster than row UDF
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def classify_amount_vectorized(amounts: pd.Series) -> pd.Series:
    return amounts.map(lambda x: "high" if x > 1000 else ("medium" if x > 100 else "low"))

# Only use UDFs when native expressions genuinely can't express the logic

# ✅ Column expressions: string ops, date ops, math — all native
df.withColumn("email_domain", F.split(F.col("email"), "@").getItem(1)) \
  .withColumn("created_date", F.to_date(F.col("created_at"))) \
  .withColumn("month", F.month(F.col("created_at"))) \
  .withColumn("log_amount", F.log1p(F.col("amount"))) \
  .withColumn("amount_z_score",
              (F.col("amount") - F.mean("amount").over(Window.partitionBy("category"))) /
              F.stddev("amount").over(Window.partitionBy("category")))

Spark UI for Performance Diagnosis

Key tabs and what to look for:

Jobs tab:
  - Duration per job → which jobs are slowest?
  - Failed jobs → click for error details

Stages tab:
  - Shuffle Read/Write size → large shuffle = optimization opportunity
  - GC Time > 10% of task time → memory pressure, increase executor memory
  - Spill (memory/disk) > 0 → not enough memory for shuffle, increase partition count

Tasks tab (within a stage):
  - Task duration distribution: if some tasks are 10x longer than median → DATA SKEW
  - Input Size/Records per task: extremely uneven → skew
  - Shuffle read: some tasks reading 100x more than others → skew on shuffle key

SQL tab:
  - Exchange node = shuffle (each Exchange = stage boundary)
  - BroadcastHashJoin = good (no shuffle)
  - SortMergeJoin = shuffle both sides
  - PhotonScan/FileScan + "pushed filters" = predicate pushdown working

Storage tab:
  - Cached DataFrames: size in memory, fraction cached
  - If "Fraction Cached < 1.0" → data spilled to disk (increase memory or uncache)

Handling Data Skew

# Symptom: one partition takes 10x longer than others (visible in Stages > Tasks)
# Cause: hot key (one value has millions of rows)

# Solution 1: Salting — add random key to distribute hot key
from pyspark.sql import functions as F

NUM_BUCKETS = 50

# Add salt to skewed side
orders_salted = large_orders.withColumn(
    "salted_key",
    F.concat(F.col("customer_id"), F.lit("_"), (F.rand() * NUM_BUCKETS).cast("int"))
)

# Explode salt on small side (replicate N times)
customers_exploded = small_customers \
    .withColumn("bucket", F.array([F.lit(i) for i in range(NUM_BUCKETS)])) \
    .withColumn("bucket", F.explode(F.col("bucket"))) \
    .withColumn("salted_key", F.concat(F.col("customer_id"), F.lit("_"), F.col("bucket")))

result = orders_salted.join(customers_exploded, "salted_key").drop("salted_key", "bucket")

# Solution 2: AQE Skew Join (Spark 3.0+ — automatic!)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# AQE detects and splits skewed partitions automatically

Delta Lake

# ACID transactions on data lake
from delta.tables import DeltaTable

# Write as Delta
df.write.format("delta").mode("overwrite").save("s3://bucket/delta/orders")

# Read Delta
orders = spark.read.format("delta").load("s3://bucket/delta/orders")

# MERGE (upsert): most common Delta operation
delta_orders = DeltaTable.forPath(spark, "s3://bucket/delta/orders")

delta_orders.alias("target").merge(
    updates_df.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdate(set={
    "status":     "source.status",
    "updated_at": "source.updated_at",
    "amount":     "source.amount"
}).whenNotMatchedInsertAll() \
  .execute()

# Time travel: query historical versions
orders_yesterday = spark.read.format("delta") \
    .option("versionAsOf", "5") \
    .load("s3://bucket/delta/orders")

# Or by timestamp
orders_last_week = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-08") \
    .load("s3://bucket/delta/orders")

# Optimize (compact small files) + Z-Order (co-locate related data)
delta_orders.optimize() \
    .where("date_col >= '2024-01-01'") \  # optimize a partition
    .executeZOrderBy("customer_id", "created_at")
# Z-Order creates multi-dimensional clustering → massive speedup for
# queries filtering on customer_id AND/OR created_at

# Vacuum: remove old versions (default retention: 7 days)
delta_orders.vacuum(retentionHours=168)  # 7 days

# Schema evolution
df_new_schema.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .save("s3://bucket/delta/orders")

Structured Streaming

# Read from Kafka stream
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", "10000") \
    .load() \
    .select(
        F.col("timestamp").alias("event_time"),
        F.from_json(F.col("value").cast("string"), order_schema).alias("data")
    ) \
    .select("event_time", "data.*")

from pyspark.sql.functions import window

# Windowed aggregation with watermark for late data
agg_stream = stream_df \
    .withWatermark("event_time", "10 minutes") \  # tolerate 10 min late arrivals
    .groupBy(
        window("event_time", "1 hour", "15 minutes"),  # 1h window, 15m slide
        "category"
    ) \
    .agg(
        F.sum("amount").alias("total_revenue"),
        F.count("*").alias("order_count")
    )

# Write to Delta Lake (micro-batch)
query = agg_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://bucket/checkpoints/order_agg") \
    .trigger(processingTime="1 minute") \
    .start("s3://bucket/delta/order_agg")

query.awaitTermination()

Anti-Patterns

# ❌ collect() on large DataFrames (brings all data to driver → OOM)
all_rows = df.collect()  # 1 billion rows → driver crashes
# ✅ Use write() to save, or take(n) for small samples
df.write.parquet("s3://output/")
sample = df.take(100)

# ❌ Calling df.count() multiple times to monitor progress
n1 = df.filter(...).count()  # triggers full scan
n2 = df.filter(...).count()  # another full scan
# ✅ Cache if you need to reuse, or use one action
df_filtered = df.filter(...).cache()
n = df_filtered.count()

# ❌ Too many small partitions (scheduler overhead)
df.repartition(10000)  # 10000 tasks for 1GB = 100KB each
# ✅ Target 100MB-1GB per partition
df.repartition(20)  # 1GB / 50MB target = 20 partitions

# ❌ Caching everything (wastes memory, evicts useful cached data)
df.cache()  # used only once
# ✅ Only cache DataFrames used 2+ times
df.cache(); df.count()  # materialize cache
df.join(other_df, ...).show()
df.groupBy(...).agg(...).show()
df.unpersist()  # free when done

# ❌ Python UDFs in hot path (10x+ overhead)
# ✅ Native Spark functions or Pandas UDFs

Quick Reference

Shuffle Reduction:
  Small table (< 50MB)            → broadcast join
  Even distribution needed        → repartition(n, key)
  Reducing partitions             → coalesce(n)
  Auto-tune shuffle partitions    → AQE enabled (Spark 3.0+)
  Hot key skew                    → salting or AQE skew join

Output Partition Count:
  Target: 100MB - 1GB per partition
  Rule:   executor_cores × 2-4

UDF Priority:
  1. Native Spark functions (F.when, F.split, F.to_date...)
  2. Pandas (vectorized) UDF
  3. Java/Scala UDF
  4. Python UDF (last resort)

Delta Lake:
  ACID writes                     → .format("delta")
  Upsert                          → DeltaTable.merge()
  Historical query                → .option("versionAsOf", N)
  Compact + cluster               → optimize().executeZOrderBy(cols)

Streaming:
  Late data tolerance             → withWatermark("time_col", "10 minutes")
  Micro-batch interval            → .trigger(processingTime="1 minute")
  Checkpoint required             → .option("checkpointLocation", "path")
  Output modes: append (default), update (aggregations), complete (full results)

Spark UI Red Flags:
  GC Time > 10%                   → increase executor memory
  Spill > 0                       → more memory or more partitions
  One task >> median duration     → data skew
  Large Exchange (shuffle)        → candidate for broadcast or repartition

Skill Information

Source
MoltbotDen
Category
Data & Analytics
Repository
View on GitHub

Related Skills