菜鸟-创作你的创作

大数据Spark(七十四):Transformation转换算子aggregateByKey和combineByKey使用案例

下面是一篇风格、结构与《Spark(七十五)》完全一致的完整教程文章,可直接作为你 Spark 系列的第 七十四篇 发布 👇


大数据 Spark(七十四):Transformation 转换算子 aggregateByKey 和 combineByKey 使用案例

在 Spark 中,Key-Value 类型 RDD 的聚合操作是面试与实战中的重点内容。
本篇将系统讲解两个“看起来复杂、但非常强大”的 Transformation 转换算子

aggregateByKey
combineByKey

理解它们,你就真正掌握了 Spark 的聚合模型。


一、为什么需要 aggregateByKey 和 combineByKey?

在 Key-Value 聚合中,我们常见的算子有:

其中:

👉 Spark 中所有 Key 聚合算子,底层都基于 combineByKey


二、aggregateByKey 算子详解

1️⃣ 算子定义

def aggregateByKey[U](
  zeroValue: U
)(
  seqOp: (U, V) => U,
  combOp: (U, U) => U
): RDD[(K, U)]

参数说明

参数说明
zeroValue初始值(每个分区都会用一次)
seqOp分区内聚合逻辑
combOp分区间聚合逻辑

2️⃣ 执行流程图解(文字版)

  1. 每个分区创建一个 zeroValue
  2. 使用 seqOp 在分区内聚合
  3. 使用 combOp 在分区间合并结果

3️⃣ 使用案例一:求每个 key 的最大值(并加 10)

val conf = new SparkConf().setMaster("local[*]").setAppName("AggregateByKeyDemo")
val sc = new SparkContext(conf)

val rdd = sc.parallelize(
  List(
    ("a", 1), ("a", 3), ("a", 2),
    ("b", 2), ("b", 4)
  ), 2
)

val result = rdd.aggregateByKey(0)(
  (x, y) => math.max(x, y),   // 分区内
  (x, y) => x + y             // 分区间
)

result.collect().foreach(println)

sc.stop()

执行逻辑说明:


4️⃣ 使用案例二:求平均值(常见面试题)

val rdd = sc.parallelize(
  List(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 4)), 2
)

val result = rdd.aggregateByKey((0, 0))(
  (acc, value) => (acc._1 + value, acc._2 + 1),
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

val avg = result.mapValues {
  case (sum, count) => sum.toDouble / count
}

avg.collect().foreach(println)


三、combineByKey 算子详解

1️⃣ 算子定义

def combineByKey[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C
): RDD[(K, C)]

参数说明

参数说明
createCombiner第一个 value 变成初始结构
mergeValue分区内合并
mergeCombiners分区间合并

2️⃣ combineByKey 执行核心思想

👉 没有 zeroValue,避免无意义初始值。


3️⃣ 使用案例:求每个 key 的平均值

val rdd = sc.parallelize(
  List(("a", 1), ("a", 2), ("a", 3), ("b", 2), ("b", 4)), 2
)

val result = rdd.combineByKey(
  v => (v, 1),
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
  (acc1: (Int, Int), acc2: (Int, Int)) =>
    (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

val avg = result.mapValues {
  case (sum, count) => sum.toDouble / count
}

avg.collect().foreach(println)


四、aggregateByKey vs combineByKey 对比

维度aggregateByKeycombineByKey
初始值有 zeroValue
灵活性最高
使用难度
底层实现基于 combineByKey原生

五、使用建议与最佳实践

✅ 什么时候用 aggregateByKey?


✅ 什么时候用 combineByKey?


六、总结

reduceByKey 是简化版,aggregateByKey 是进阶版,combineByKey 是终极版。

真正理解这三个算子,你就真正理解了 Spark 的 Shuffle + 聚合机制

退出移动版