第八章:性能优化
8.1 内存优化
内存配置
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "8g") \
.config("spark.memory.fraction", "0.8") \
.config("spark.memory.storageFraction", "0.5") \
.getOrCreate()
内存管理
┌─────────────────────────────────────────────────────────────┐
│ │
│ Spark 内存管理 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Executor Memory (8g) │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Reserved Memory (300MB) │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Spark Memory (80%) │ │ │
│ │ │ ┌─────────────────┬─────────────────────┐ │ │ │
│ │ │ │ Storage Memory │ Execution Memory │ │ │ │
│ │ │ │ (缓存数据) │ (Shuffle, Join) │ │ │ │
│ │ │ └─────────────────┴─────────────────────┘ │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ User Memory (20%) │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
8.2 并行度优化
# 设置 Shuffle 分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")
# 根据数据量调整
# 小数据集: 50-100
# 中等数据集: 200-500
# 大数据集: 1000+
# RDD 并行度
rdd = sc.parallelize(data, numSlices=100)
# 自动调整(Spark 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
8.3 数据倾斜处理
from pyspark.sql.functions import col, rand, when
# 检测数据倾斜
df.groupBy("key").count().orderBy(col("count").desc()).show(10)
# 方案1:加盐(Salting)
salted = df.withColumn("salted_key",
when(col("key") == "hot_key",
concat(col("key"), lit("_"), (rand() * 10).cast("int")))
.otherwise(col("key"))
)
# 方案2:广播 Join
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), "key")
# 方案3:倾斜 Join 提示
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")
8.4 缓存策略
# 缓存 DataFrame
df.cache()
df.persist(StorageLevel.MEMORY_AND_DISK)
# 查看缓存状态
spark.catalog.cacheTable("my_table")
spark.catalog.uncacheTable("my_table")
# 缓存级别
# MEMORY_ONLY: 只存内存
# MEMORY_AND_DISK: 内存不足时存磁盘
# DISK_ONLY: 只存磁盘
# MEMORY_ONLY_SER: 序列化存储
# 清除缓存
df.unpersist()
spark.catalog.clearCache()
8.5 执行计划分析
# 查看执行计划
df.explain(True)
# 查看物理计划
df.explain()
# 查看 Spark UI
# http://localhost:4040
# 常见问题
# 1. 全表扫描: 使用分区裁剪
# 2. Shuffle 过多: 减少 join 操作
# 3. 数据倾斜: 使用广播或加盐
8.6 最佳实践
数据格式
# 推荐:Parquet(列式存储)
df.write.parquet("data.parquet")
# 推荐:分区
df.write.partitionBy("year", "month").parquet("data.parquet")
# 避免:大量小文件
# 使用 coalesce 合并
df.coalesce(1).write.parquet("output")
资源配置
# 提交任务
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--driver-memory 4g \
--conf spark.sql.shuffle.partitions=500 \
--conf spark.sql.adaptive.enabled=true \
myapp.py
小结
- 合理配置内存,避免 OOM
- 根据数据量调整并行度
- 处理数据倾斜:广播、加盐
- 使用缓存加速重复计算
- 分析执行计划优化查询
参考资料