跳转至

第五章:Spark SQL

5.1 Spark SQL 概述

Spark SQL 是 Spark 的结构化数据处理模块,支持 SQL 查询和 DataFrame API。

支持的数据源

数据源 说明
Parquet 列式存储,推荐格式
ORC Hive 优化格式
JSON 半结构化数据
CSV 文本格式
JDBC 关系数据库
Hive Hive 表
Delta Delta Lake

5.2 SQL 查询

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkSQLDemo") \
    .enableHiveSupport() \
    .getOrCreate()

# 创建表
spark.sql("""
    CREATE TABLE IF NOT EXISTS users (
        id INT,
        name STRING,
        age INT,
        department STRING
    )
    USING parquet
""")

# 插入数据
spark.sql("""
    INSERT INTO users VALUES
    (1, '张三', 25, '研发部'),
    (2, '李四', 30, '市场部'),
    (3, '王五', 28, '研发部')
""")

# 查询
spark.sql("SELECT * FROM users WHERE age > 25").show()

# 聚合查询
spark.sql("""
    SELECT department, COUNT(*) as count, AVG(age) as avg_age
    FROM users
    GROUP BY department
""").show()

# 窗口函数
spark.sql("""
    SELECT 
        name,
        age,
        RANK() OVER (ORDER BY age DESC) as rank
    FROM users
""").show()

5.3 UDF(用户定义函数)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

# 注册 UDF
@udf(returnType=StringType())
def upper_case(s):
    return s.upper() if s else None

# 使用 UDF
spark.udf.register("upper_case", upper_case)
df.withColumn("name_upper", upper_case(df.name)).show()

# SQL 中使用
spark.sql("SELECT upper_case(name) FROM users").show()

# Pandas UDF(更高效)
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()

df.withColumn("name_upper", to_upper(df.name)).show()

5.4 性能优化

# 缓存表
spark.sql("CACHE TABLE users")

# 清除缓存
spark.sql("UNCACHE TABLE users")

# 查看执行计划
spark.sql("SELECT * FROM users").explain(True)

# 设置并行度
spark.conf.set("spark.sql.shuffle.partitions", "200")

# 启用自适应查询执行
spark.conf.set("spark.sql.adaptive.enabled", "true")

小结

  1. Spark SQL 支持 SQL 查询和 DataFrame API
  2. 支持多种数据源和格式
  3. UDF 扩展 SQL 功能
  4. 缓存和 AQE 提升性能

参考资料