spark源码阅读03-withscope的作用和原理

前言

阅读spark源码时,withscope代码随处可见,
比如最简单的map

1
2
3
4
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

当时就有一个疑问,这个的作用是啥?

withscope 的作用

官方的注释解释

Execute the given body such that all RDDs created in this body will have the same scope.
If nesting is allowed, any subsequent calls to this method in the given body will instantiate child scopes that are nested within our scope. Otherwise, these calls will take no effect.
Additionally, the caller of this method may optionally ignore the configurations and scopes set by the higher level caller. In this case, this method will ignore the parent caller’s intention to disallow nesting, and the new scope instantiated will not have a parent. This is useful for scoping physical operations in Spark SQL, for instance.
Note: Return statements are NOT allowed in body.

机器翻译一下

执行给定的主体内容,确保在该主体内容中创建的所有 RDD 都具有相同的范围。如果允许嵌套,则在给定主体内容中的任何后续调用将会实例化嵌套在我们当前范围内的子范围。否则,这些调用将不会产生效果。 此外,调用此方法的主体可以选择忽略上级调用者设置的配置和范围。在这种情况下,此方法会忽略父级调用者禁止嵌套的意图,并且新实例化的范围将没有父级。这对于在 Spark SQL 中为物理操作设定范围是有用的。 注意:主体内容中不允许使用返回语句。

额,有点生硬,找下网上的解释

在 Apache Spark 中,withScope 主要用于构建和跟踪 RDD (Resilient Distributed Datasets) 的作用域层次结构。它被广泛应用于 Spark 的内部实现中,以记录 RDD 的操作历史和父子 lineage 关系,这对于调试、性能优化和监控非常重要。

withScope 的主要功能包括:

  1. 作用域层次结构:
    • withScope 保证了在给定的代码块内创建的所有 RDD 都具有相同的作用域。
    • 如果允许嵌套,则可以创建子作用域来表示更细粒度的操作。
  2. 操作序列记录:
    • 它记录 RDD 操作的序列,帮助追踪 RDD 的来源和转换过程。
  3. 依赖关系分析:
    • 帮助理解和分析 RDD 之间的依赖关系。
  4. Spark UI 展示:
    • 支持 Spark UI 中的可视化展示,使得用户能够看到数据流的图形表示。

其实,还是有那么点不理解,问题不大,我们先看源码。

withscope的源码

withscope到底干了啥?
传到底层后,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean,
ignoreParent: Boolean)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
if (ignoreParent) {
// Ignore all parent settings and scopes and start afresh with our own root scope
// 如果忽略父级,忽略所有父级设置和scope,从新开始一个新的根作用域
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
//如果允许嵌套,则仅在上级调用者允许我们时才设置作用域,其实就是把父级scope传给子级
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
// 禁止子主体覆盖范围,也就是不会更新LocalProperty
if (!allowNesting) {
sc.setLocalProperty(noOverrideKey, "true")
}
body
} finally {
// Remember to restore any state that was modified before exiting
// 代码执行完了恢复scope
sc.setLocalProperty(scopeKey, oldScopeJson)
sc.setLocalProperty(noOverrideKey, oldNoOverride)
}
}

简单来说,withscope就是往LocalProperty里存一个RDDOperationScope对象的json字符串,这个对象里包含了作用域的名字和父级作用域。

withscope的写入的东西长什么样?

我们以workcount代码为例来查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// scalastyle:off println
package my.test

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object day01 {
def main(args: Array[String]): Unit = {

val conf = new SparkConf()

/**
* 如果这个参数不设置,默认认为你运行的是集群模式
* 如果设置成local代表运行的是local模式
*/
conf.setMaster("local[2]")
//设置任务名
conf.setAppName("WordCount")
//创建SparkCore的程序入口
val sc = new SparkContext(conf)
//读取文件 生成RDD
val file: RDD[String] = sc.textFile("file:///D:\\git\\spark_learning\\in\\word.txt")
//把每一行数据按照,分割
val word: RDD[String] = file.flatMap(_.split(","))
//让每一个单词都出现一次
val wordOne: RDD[(String, Int)] = word.map((_, 1))
//单词计数
val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_ + _)
//按照单词出现的次数 降序排序
val sortRdd: RDD[(String, Int)] = wordCount.sortBy(tuple => tuple._2, false)
//将最终的结果进行保存
val outputDir = new Path("file:///D:/result.txt")
val fs = FileSystem.get(new Configuration())
if (fs.exists(outputDir)) {
fs.delete(outputDir, true) // true 表示递归删除目录
}
sortRdd.saveAsTextFile("file:///D:/result.txt")

sc.stop()
}
}

同时修改withscope源码,加上日志打印

1
2
3
4
5
6
7
// Optionally disallow the child body to override our scope
if (!allowNesting) {
sc.setLocalProperty(noOverrideKey, "true")
}
log.info("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
log.info(sc.getLocalProperty(scopeKey))
body

运行代码,关于日志的输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"0","name":"textFile"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO SparkContext: Created broadcast 0 from textFile at day01.scala:24
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"0","name":"textFile"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"1","name":"flatMap"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"2","name":"map"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"3","name":"reduceByKey"}
24/08/08 18:52:52 INFO FileInputFormat: Total input files to process : 1
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"3","name":"reduceByKey"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"3","name":"reduceByKey"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"4","name":"sortBy"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"4","name":"sortBy"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"4","name":"sortBy"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"4","name":"sortBy"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"4","name":"sortBy"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"4","name":"sortBy"}
24/08/08 18:52:52 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:52 INFO RDDOperationScope: {"id":"4","name":"sortBy"}
24/08/08 18:52:52 INFO SparkContext: Starting job: sortBy at day01.scala:32

24/08/08 18:52:53 INFO DAGScheduler: Job 0 finished: sortBy at day01.scala:32, took 0.736760 s
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"5","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"6","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"7","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"8","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"9","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"10","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"11","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"12","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"4","name":"sortBy"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"13","name":"saveAsTextFile"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"13","name":"saveAsTextFile"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"13","name":"saveAsTextFile"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"13","name":"saveAsTextFile"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"13","name":"saveAsTextFile"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"13","name":"saveAsTextFile"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"13","name":"saveAsTextFile"}
24/08/08 18:52:53 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir

24/08/08 18:52:53 INFO DAGScheduler: Job 1 finished: runJob at SparkHadoopWriter.scala:83, took 0.083756 s
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"14","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"15","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"16","name":"checkpoint"}
24/08/08 18:52:53 INFO RDDOperationScope: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
24/08/08 18:52:53 INFO RDDOperationScope: {"id":"17","name":"checkpoint"}
24/08/08 18:52:53 INFO SparkHadoopWriter: Start to commit write Job job_20240808185253177739536364190249_0010.
24/08/08 18:52:53 INFO SparkHadoopWriter: Write Job

可以看到,写入的基本都很简单,一个id,一个name,无了,没有parent

为什么没有parent?

1
2
3
4
5
6
  body
} finally {
// Remember to restore any state that was modified before exiting
sc.setLocalProperty(scopeKey, oldScopeJson)
sc.setLocalProperty(noOverrideKey, oldNoOverride)
}

结合之前的源码,body函数执行结束后,就会重置scope
也就是说body函数内需要再次调用withscope才会有parent
而我们的wordcount里的map,flatmap里都不会再次调用withscope。