跳转至

第八章:性能优化

8.1 内存优化

内存配置

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

内存管理

┌─────────────────────────────────────────────────────────────┐
│                                                              │
│   Spark 内存管理                                            │
│                                                              │
│   ┌─────────────────────────────────────────────────────┐   │
│   │              Executor Memory (8g)                   │   │
│   │  ┌───────────────────────────────────────────────┐  │   │
│   │  │         Reserved Memory (300MB)               │  │   │
│   │  └───────────────────────────────────────────────┘  │   │
│   │  ┌───────────────────────────────────────────────┐  │   │
│   │  │         Spark Memory (80%)                    │  │   │
│   │  │  ┌─────────────────┬─────────────────────┐   │  │   │
│   │  │  │ Storage Memory  │ Execution Memory    │   │  │   │
│   │  │  │ (缓存数据)      │ (Shuffle, Join)     │   │  │   │
│   │  │  └─────────────────┴─────────────────────┘   │  │   │
│   │  └───────────────────────────────────────────────┘  │   │
│   │  ┌───────────────────────────────────────────────┐  │   │
│   │  │         User Memory (20%)                     │  │   │
│   │  └───────────────────────────────────────────────┘  │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

8.2 并行度优化

# 设置 Shuffle 分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")

# 根据数据量调整
# 小数据集: 50-100
# 中等数据集: 200-500
# 大数据集: 1000+

# RDD 并行度
rdd = sc.parallelize(data, numSlices=100)

# 自动调整(Spark 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

8.3 数据倾斜处理

from pyspark.sql.functions import col, rand, when

# 检测数据倾斜
df.groupBy("key").count().orderBy(col("count").desc()).show(10)

# 方案1:加盐(Salting)
salted = df.withColumn("salted_key", 
    when(col("key") == "hot_key", 
         concat(col("key"), lit("_"), (rand() * 10).cast("int")))
    .otherwise(col("key"))
)

# 方案2:广播 Join
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), "key")

# 方案3:倾斜 Join 提示
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")

8.4 缓存策略

# 缓存 DataFrame
df.cache()
df.persist(StorageLevel.MEMORY_AND_DISK)

# 查看缓存状态
spark.catalog.cacheTable("my_table")
spark.catalog.uncacheTable("my_table")

# 缓存级别
# MEMORY_ONLY: 只存内存
# MEMORY_AND_DISK: 内存不足时存磁盘
# DISK_ONLY: 只存磁盘
# MEMORY_ONLY_SER: 序列化存储

# 清除缓存
df.unpersist()
spark.catalog.clearCache()

8.5 执行计划分析

# 查看执行计划
df.explain(True)

# 查看物理计划
df.explain()

# 查看 Spark UI
# http://localhost:4040

# 常见问题
# 1. 全表扫描: 使用分区裁剪
# 2. Shuffle 过多: 减少 join 操作
# 3. 数据倾斜: 使用广播或加盐

8.6 最佳实践

数据格式

# 推荐:Parquet(列式存储)
df.write.parquet("data.parquet")

# 推荐:分区
df.write.partitionBy("year", "month").parquet("data.parquet")

# 避免:大量小文件
# 使用 coalesce 合并
df.coalesce(1).write.parquet("output")

资源配置

# 提交任务
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --executor-memory 8g \
    --executor-cores 4 \
    --num-executors 10 \
    --driver-memory 4g \
    --conf spark.sql.shuffle.partitions=500 \
    --conf spark.sql.adaptive.enabled=true \
    myapp.py

小结

  1. 合理配置内存,避免 OOM
  2. 根据数据量调整并行度
  3. 处理数据倾斜:广播、加盐
  4. 使用缓存加速重复计算
  5. 分析执行计划优化查询

参考资料