下面是一篇风格、结构与《Spark(七十五)》完全一致的完整教程文章,可直接作为你 Spark 系列的第 七十四篇 发布 👇
大数据 Spark(七十四):Transformation 转换算子 aggregateByKey 和 combineByKey 使用案例
在 Spark 中,Key-Value 类型 RDD 的聚合操作是面试与实战中的重点内容。
本篇将系统讲解两个“看起来复杂、但非常强大”的 Transformation 转换算子:
aggregateByKey
combineByKey
理解它们,你就真正掌握了 Spark 的聚合模型。
一、为什么需要 aggregateByKey 和 combineByKey?
在 Key-Value 聚合中,我们常见的算子有:
- reduceByKey
- foldByKey
- aggregateByKey
- combineByKey
其中:
reduceByKey:分区内 + 分区间逻辑必须一致aggregateByKey:分区内和分区间逻辑可以不同combineByKey:最底层、最通用的聚合算子
👉 Spark 中所有 Key 聚合算子,底层都基于 combineByKey。
二、aggregateByKey 算子详解
1️⃣ 算子定义
def aggregateByKey[U](
zeroValue: U
)(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)]
参数说明
| 参数 | 说明 |
|---|---|
| zeroValue | 初始值(每个分区都会用一次) |
| seqOp | 分区内聚合逻辑 |
| combOp | 分区间聚合逻辑 |
2️⃣ 执行流程图解(文字版)
- 每个分区创建一个
zeroValue - 使用
seqOp在分区内聚合 - 使用
combOp在分区间合并结果
3️⃣ 使用案例一:求每个 key 的最大值(并加 10)
val conf = new SparkConf().setMaster("local[*]").setAppName("AggregateByKeyDemo")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(
List(
("a", 1), ("a", 3), ("a", 2),
("b", 2), ("b", 4)
), 2
)
val result = rdd.aggregateByKey(0)(
(x, y) => math.max(x, y), // 分区内
(x, y) => x + y // 分区间
)
result.collect().foreach(println)
sc.stop()
执行逻辑说明:
- 分区内取最大值
- 分区间对最大值求和
4️⃣ 使用案例二:求平均值(常见面试题)
val rdd = sc.parallelize(
List(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 4)), 2
)
val result = rdd.aggregateByKey((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
val avg = result.mapValues {
case (sum, count) => sum.toDouble / count
}
avg.collect().foreach(println)
三、combineByKey 算子详解
1️⃣ 算子定义
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C
): RDD[(K, C)]
参数说明
| 参数 | 说明 |
|---|---|
| createCombiner | 第一个 value 变成初始结构 |
| mergeValue | 分区内合并 |
| mergeCombiners | 分区间合并 |
2️⃣ combineByKey 执行核心思想
- 每个 key 的第一个 value 触发 createCombiner
- 后续 value 使用 mergeValue
- 不同分区结果用 mergeCombiners 合并
👉 没有 zeroValue,避免无意义初始值。
3️⃣ 使用案例:求每个 key 的平均值
val rdd = sc.parallelize(
List(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 4)), 2
)
val result = rdd.combineByKey(
v => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) =>
(acc1._1 + acc2._1, acc1._2 + acc2._2)
)
val avg = result.mapValues {
case (sum, count) => sum.toDouble / count
}
avg.collect().foreach(println)
四、aggregateByKey vs combineByKey 对比
| 维度 | aggregateByKey | combineByKey |
|---|---|---|
| 初始值 | 有 zeroValue | 无 |
| 灵活性 | 高 | 最高 |
| 使用难度 | 中 | 高 |
| 底层实现 | 基于 combineByKey | 原生 |
五、使用建议与最佳实践
✅ 什么时候用 aggregateByKey?
- 需要不同的分区内 / 分区间逻辑
- 需要统一初始值
- 写法相对清晰
✅ 什么时候用 combineByKey?
- 聚合逻辑复杂
- 初始值不能随意给
- 需要完全掌控聚合过程
六、总结
reduceByKey 是简化版,aggregateByKey 是进阶版,combineByKey 是终极版。
真正理解这三个算子,你就真正理解了 Spark 的 Shuffle + 聚合机制。