下面是一篇完整、可直接发布的技术教程文章,标题与风格符合你之前的 Spark 系列内容 👇
大数据 Spark(七十五):Action 行动算子 foreachPartition 和 count 使用案例
在 Spark 中,算子大体分为两类:
- Transformation(转换算子):返回新的 RDD,延迟执行
- Action(行动算子):触发真正计算,返回结果或执行副作用
本篇我们重点讲解两个常用但用途完全不同的 Action 行动算子:
foreachPartition 和 count
一、count 算子详解
1️⃣ count 是什么?
count 是最基础、最常见的 Action 算子之一,用于:
统计 RDD 中元素的数量
def count(): Long
它会触发 Spark 作业执行,并返回一个 Long 类型的结果。
2️⃣ 使用示例
val conf = new SparkConf().setMaster("local[*]").setAppName("CountDemo")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1, 2, 3, 4, 5))
val result = rdd.count()
println(result)
sc.stop()
输出结果:
5
3️⃣ 底层执行逻辑说明
count会遍历 所有分区- 每个分区本地统计元素数量
- Driver 端汇总所有分区结果
⚠️ 注意:count 不会返回数据内容,只返回数量,因此非常适合:
- 统计数据规模
- 验证数据是否为空
- 调试管道是否生效
二、foreachPartition 算子详解
1️⃣ foreachPartition 是什么?
def foreachPartition(f: Iterator[T] => Unit): Unit
foreachPartition 是一个 以分区为单位执行的 Action 算子。
与 foreach 的最大区别在于:
| 算子 | 执行粒度 |
|---|---|
| foreach | 每条数据 |
| foreachPartition | 每个分区 |
2️⃣ foreach vs foreachPartition 对比
foreach 示例(不推荐用于外部系统)
rdd.foreach(x => {
println(x)
})
问题:
- 每条数据都会执行一次逻辑
- 如果涉及数据库 / 网络 IO,开销巨大
foreachPartition 示例(推荐)
rdd.foreachPartition(iter => {
while (iter.hasNext) {
println(iter.next())
}
})
优势:
- 每个分区只执行一次
- 非常适合 批量写入外部系统
3️⃣ 典型使用场景:写入数据库
❌ 错误示例(foreach)
rdd.foreach(data => {
val conn = getConnection()
insert(data)
conn.close()
})
问题:
- 每条数据创建一次连接
- 极易压垮数据库
✅ 正确示例(foreachPartition)
rdd.foreachPartition(iter => {
val conn = getConnection()
iter.foreach(data => {
insert(data)
})
conn.close()
})
✔ 每个分区一个连接
✔ 极大减少资源消耗
✔ Spark 官方推荐写法
三、综合案例:count + foreachPartition
需求说明
- 统计数据条数
- 按分区输出数据内容
示例代码
val conf = new SparkConf().setMaster("local[*]").setAppName("ActionDemo")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List("a", "b", "c", "d", "e"), 2)
// 统计数据量
val cnt = rdd.count()
println(s"数据总量:$cnt")
// 按分区处理
rdd.foreachPartition(iter => {
println("====== 分区开始 ======")
iter.foreach(println)
})
sc.stop()
执行结果示意
数据总量:5
====== 分区开始 ======
a
b
====== 分区开始 ======
c
d
e
四、注意事项与最佳实践
⚠️ 1️⃣ foreachPartition 是 Action
- 会触发 Spark 作业执行
- 与 count 一样属于最终算子
⚠️ 2️⃣ foreachPartition 无返回值
foreachPartition(...): Unit
👉 不能用于结果计算,只能用于副作用操作
⚠️ 3️⃣ 外部连接一定要在分区内创建
- 数据库
- Redis
- Kafka Producer
- HTTP 客户端
都应放在 foreachPartition 内部初始化。
五、总结
| 算子 | 作用 | 特点 |
|---|---|---|
| count | 统计元素数量 | 简单、高效 |
| foreachPartition | 按分区执行逻辑 | 适合 IO 操作 |
一句话记忆
count 用来“数数据”,foreachPartition 用来“干活”。
发表回复