第五章:Spark SQL¶
5.1 Spark SQL 概述¶
Spark SQL 是 Spark 的结构化数据处理模块,支持 SQL 查询和 DataFrame API。
支持的数据源¶
| 数据源 | 说明 |
|---|---|
| Parquet | 列式存储,推荐格式 |
| ORC | Hive 优化格式 |
| JSON | 半结构化数据 |
| CSV | 文本格式 |
| JDBC | 关系数据库 |
| Hive | Hive 表 |
| Delta | Delta Lake |
5.2 SQL 查询¶
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SparkSQLDemo") \
.enableHiveSupport() \
.getOrCreate()
# 创建表
spark.sql("""
CREATE TABLE IF NOT EXISTS users (
id INT,
name STRING,
age INT,
department STRING
)
USING parquet
""")
# 插入数据
spark.sql("""
INSERT INTO users VALUES
(1, '张三', 25, '研发部'),
(2, '李四', 30, '市场部'),
(3, '王五', 28, '研发部')
""")
# 查询
spark.sql("SELECT * FROM users WHERE age > 25").show()
# 聚合查询
spark.sql("""
SELECT department, COUNT(*) as count, AVG(age) as avg_age
FROM users
GROUP BY department
""").show()
# 窗口函数
spark.sql("""
SELECT
name,
age,
RANK() OVER (ORDER BY age DESC) as rank
FROM users
""").show()
5.3 UDF(用户定义函数)¶
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# 注册 UDF
@udf(returnType=StringType())
def upper_case(s):
return s.upper() if s else None
# 使用 UDF
spark.udf.register("upper_case", upper_case)
df.withColumn("name_upper", upper_case(df.name)).show()
# SQL 中使用
spark.sql("SELECT upper_case(name) FROM users").show()
# Pandas UDF(更高效)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
return s.str.upper()
df.withColumn("name_upper", to_upper(df.name)).show()
5.4 性能优化¶
# 缓存表
spark.sql("CACHE TABLE users")
# 清除缓存
spark.sql("UNCACHE TABLE users")
# 查看执行计划
spark.sql("SELECT * FROM users").explain(True)
# 设置并行度
spark.conf.set("spark.sql.shuffle.partitions", "200")
# 启用自适应查询执行
spark.conf.set("spark.sql.adaptive.enabled", "true")
小结¶
- Spark SQL 支持 SQL 查询和 DataFrame API
- 支持多种数据源和格式
- UDF 扩展 SQL 功能
- 缓存和 AQE 提升性能