下面是一篇完整、可直接发布的技术教程文章,标题与风格符合你之前的 Spark 系列内容 👇


大数据 Spark(七十五):Action 行动算子 foreachPartition 和 count 使用案例

在 Spark 中,算子大体分为两类:

  • Transformation(转换算子):返回新的 RDD,延迟执行
  • Action(行动算子):触发真正计算,返回结果或执行副作用

本篇我们重点讲解两个常用但用途完全不同的 Action 行动算子

foreachPartitioncount


一、count 算子详解

1️⃣ count 是什么?

count 是最基础、最常见的 Action 算子之一,用于:

统计 RDD 中元素的数量

def count(): Long

它会触发 Spark 作业执行,并返回一个 Long 类型的结果。


2️⃣ 使用示例

val conf = new SparkConf().setMaster("local[*]").setAppName("CountDemo")
val sc = new SparkContext(conf)

val rdd = sc.parallelize(List(1, 2, 3, 4, 5))

val result = rdd.count()
println(result)

sc.stop()

输出结果:

5


3️⃣ 底层执行逻辑说明

  • count 会遍历 所有分区
  • 每个分区本地统计元素数量
  • Driver 端汇总所有分区结果

⚠️ 注意:
count 不会返回数据内容,只返回数量,因此非常适合:

  • 统计数据规模
  • 验证数据是否为空
  • 调试管道是否生效

二、foreachPartition 算子详解

1️⃣ foreachPartition 是什么?

def foreachPartition(f: Iterator[T] => Unit): Unit

foreachPartition 是一个 以分区为单位执行的 Action 算子

foreach 的最大区别在于:

算子执行粒度
foreach每条数据
foreachPartition每个分区

2️⃣ foreach vs foreachPartition 对比

foreach 示例(不推荐用于外部系统)

rdd.foreach(x => {
  println(x)
})

问题:

  • 每条数据都会执行一次逻辑
  • 如果涉及数据库 / 网络 IO,开销巨大

foreachPartition 示例(推荐)

rdd.foreachPartition(iter => {
  while (iter.hasNext) {
    println(iter.next())
  }
})

优势:

  • 每个分区只执行一次
  • 非常适合 批量写入外部系统

3️⃣ 典型使用场景:写入数据库

❌ 错误示例(foreach)

rdd.foreach(data => {
  val conn = getConnection()
  insert(data)
  conn.close()
})

问题:

  • 每条数据创建一次连接
  • 极易压垮数据库

✅ 正确示例(foreachPartition)

rdd.foreachPartition(iter => {
  val conn = getConnection()
  iter.foreach(data => {
    insert(data)
  })
  conn.close()
})

✔ 每个分区一个连接
✔ 极大减少资源消耗
✔ Spark 官方推荐写法


三、综合案例:count + foreachPartition

需求说明

  • 统计数据条数
  • 按分区输出数据内容

示例代码

val conf = new SparkConf().setMaster("local[*]").setAppName("ActionDemo")
val sc = new SparkContext(conf)

val rdd = sc.parallelize(List("a", "b", "c", "d", "e"), 2)

// 统计数据量
val cnt = rdd.count()
println(s"数据总量:$cnt")

// 按分区处理
rdd.foreachPartition(iter => {
  println("====== 分区开始 ======")
  iter.foreach(println)
})

sc.stop()


执行结果示意

数据总量:5
====== 分区开始 ======
a
b
====== 分区开始 ======
c
d
e


四、注意事项与最佳实践

⚠️ 1️⃣ foreachPartition 是 Action

  • 会触发 Spark 作业执行
  • 与 count 一样属于最终算子

⚠️ 2️⃣ foreachPartition 无返回值

foreachPartition(...): Unit

👉 不能用于结果计算,只能用于副作用操作


⚠️ 3️⃣ 外部连接一定要在分区内创建

  • 数据库
  • Redis
  • Kafka Producer
  • HTTP 客户端

都应放在 foreachPartition 内部初始化。


五、总结

算子作用特点
count统计元素数量简单、高效
foreachPartition按分区执行逻辑适合 IO 操作

一句话记忆

count 用来“数数据”,foreachPartition 用来“干活”。