跳转至

第三章:RDD 编程

3.1 RDD 概述

RDD(Resilient Distributed Dataset)是 Spark 的核心数据抽象,是一个不可变、分区的元素集合。

RDD 特点

特点 说明
不可变 创建后不可修改
分区 数据分布在多个节点
弹性 自动容错恢复
惰性求值 只有 Action 时才计算

3.2 创建 RDD

from pyspark import SparkContext

sc = SparkContext("local[*]", "RDDDemo")

# 从列表创建
rdd1 = sc.parallelize([1, 2, 3, 4, 5])

# 从文件创建
rdd2 = sc.textFile("hdfs://path/to/file.txt")

# 从其他 RDD 转换
rdd3 = rdd1.map(lambda x: x * 2)

# 查看分区数
print(rdd1.getNumPartitions())

3.3 Transformation 操作

# map: 对每个元素应用函数
rdd = sc.parallelize([1, 2, 3, 4, 5])
mapped = rdd.map(lambda x: x * 2)
# [2, 4, 6, 8, 10]

# filter: 过滤元素
filtered = rdd.filter(lambda x: x > 2)
# [3, 4, 5]

# flatMap: 扁平化映射
rdd2 = sc.parallelize(["hello world", "spark python"])
flatMapped = rdd2.flatMap(lambda x: x.split(" "))
# ["hello", "world", "spark", "python"]

# distinct: 去重
rdd3 = sc.parallelize([1, 1, 2, 2, 3, 3])
distinct = rdd3.distinct()
# [1, 2, 3]

# groupByKey: 按键分组
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
grouped = pairs.groupByKey()
# [("a", [1, 3]), ("b", [2])]

# reduceByKey: 按键归约
reduced = pairs.reduceByKey(lambda a, b: a + b)
# [("a", 4), ("b", 2)]

# join: 连接
rdd_a = sc.parallelize([("a", 1), ("b", 2)])
rdd_b = sc.parallelize([("a", "x"), ("b", "y")])
joined = rdd_a.join(rdd_b)
# [("a", (1, "x")), ("b", (2, "y"))]

3.4 Action 操作

rdd = sc.parallelize([1, 2, 3, 4, 5])

# collect: 收集所有元素
print(rdd.collect())  # [1, 2, 3, 4, 5]

# count: 计数
print(rdd.count())  # 5

# reduce: 归约
print(rdd.reduce(lambda a, b: a + b))  # 15

# first: 第一个元素
print(rdd.first())  # 1

# take: 取前 n 个
print(rdd.take(3))  # [1, 2, 3]

# foreach: 遍历
rdd.foreach(lambda x: print(x))

# saveAsTextFile: 保存
rdd.saveAsTextFile("hdfs://path/to/output")

3.5 键值对操作

pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4)])

# reduceByKey
result = pairs.reduceByKey(lambda a, b: a + b)
# [("a", 4), ("b", 6)]

# groupByKey
result = pairs.groupByKey().mapValues(list)
# [("a", [1, 3]), ("b", [2, 4])]

# sortByKey
sorted_rdd = pairs.sortByKey()

# mapValues: 只对值操作
result = pairs.mapValues(lambda x: x * 2)
# [("a", 2), ("b", 4), ("a", 6), ("b", 8)]

# keys / values
print(pairs.keys().collect())   # ["a", "b", "a", "b"]
print(pairs.values().collect()) # [1, 2, 3, 4]

小结

  1. RDD 是 Spark 的核心数据抽象
  2. Transformation 是惰性的,Action 触发计算
  3. 常用 Transformation:map、filter、reduceByKey
  4. 常用 Action:collect、count、reduce

参考资料