spark源码阅读-spark启动流程 2 spark-submit 代码端执行情况

引言

在上一篇spark源码阅读-spark启动流程中,了解到了spark提交命令 spark-submit shell端的运行情况。

举例来说

1
2
3
4
5
6
7
8
9
10
/opt/soft/spark/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 1 \
--executor-memory 800M \
--driver-memory 700M \
--executor-cores 2 \
--class org.apache.spark.examples.JavaWordCount \
/opt/soft/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar \
file:/opt/soft/spark/examples/src/main/resources/kv1.txt

上述shell是像spark提交自带的wordcount程序,那么经过shell端解析,最终获取到的执行命令为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/opt/jdk/bin/java \
-cp /opt/soft/spark/conf/:/opt/soft/spark/jars/*:/opt/soft/hadoop-3.3.1/etc/hadoop/ \
-XX:+IgnoreUnrecognizedVMOptions \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
--add-opens=java.base/java.nio=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED \
-Djdk.reflect.useDirectMethodHandle=false \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=700M \
--class org.apache.spark.examples.JavaWordCount --num-executors 1 \
--executor-memory 800M \
--executor-cores 2 \
/opt/soft/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar \
file:/opt/soft/spark/examples/src/main/resources/kv1.txt

可以看到,最后运行的是org.apache.spark.deploy.SparkSubmit这个类,并将一开始的传入的参数继续传入到这个类里面,本文就简单了解一下sparkSubmit这个类到底做了什么。

spark-submit的运行流程

整体概览


上图即spark-submit后运行的过程,该进程主要涉及以下几个类。

  • SparkSubmit Object java main入口
  • SparkSubit Class 启动 Spark 应用程序的主要网关。
    该程序处理并设置Spark相关的依赖项的类路径,并抽象了一层,以支持在不同集群管理器上运行和部署模式上。
  • SparkApplication spark应用程序的入口点,本人用的yarn cluster模式,因此是YarnClusterApplication
  • Client 客户端,用于和yarn交互。如创建app,或kill。

详细执行

如上图所示

1. 通过java命令,启动了一个jvm,该jvm类的入口类为org.apache.spark.deploy.SparkSubmit

2. 进入main,该方法比较简单,主要为new SparkSubmit,并调用doSumit方法

1
2
3
4
5
6
7
8
9
10
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit
override protected def logError(msg: => String): Unit
override def doSubmit(args: Array[String]): Unit
......
}
submit.doSubmit(args)

3. SparkSubmit类中,参数不断进行传递doSumit->submit->runMain

最后在runMain方法中通过反射创建了YarnClusterApplication的实例,并调用start方法

1
2
3
4
5
6
7
8
9
10
// doSubmit->submit, doSumit 调用 submit方法,走的是case SparkSubmitAction.SUBMIT
def doSubmit(args: Array[String]): Unit = {
......
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//submit->doRunMain->runMain submit方法中直接调用函数doRunMain,doRunMain判断是否需要代理用户,之后就调用runMain方法
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
......
} else {
runMain(args, uninitLog)
}
}

// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
logInfo("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
logWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//最后,在runMain方法中,通过反射创建了YarnClusterApplication,并调用start方法。关于SparkApplication,参见下文解释。
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
......
//这里走的是第一个分支,通过反射创建
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
......
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
} finally {
......
}
}

4. YarnClusterApplication.start方法new了Client,并继续调用run方法

其中Client类里初始化了yarn的各种连接
并且此时将待运行程序的参数org.apache.spark.examples.JavaWordCount等参数通过new ClientArguments(args)传递给了Client这个类,这些参数在后续构建yarn container时会进行使用。

1
2
3
4
5
6
7
8
9
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove(JARS)
conf.remove(FILES)
conf.remove(ARCHIVES)

new Client(new ClientArguments(args), conf, null).run()
}

5. Client.run调用submitApplication方法

最后在submitApplication方法中,通过和yarn的交互,最后通过yarnClient.submitApplication(appContext)提交了application给yarn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//run调用 submitApplication
def run(): Unit = {
submitApplication()
......
}
//submitApplication方法提交yarn application
def submitApplication(): Unit = {
ResourceRequestHelper.validateResources(sparkConf)

try {
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()

if (log.isDebugEnabled) {
logDebug("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
}

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
this.appId = newAppResponse.getApplicationId()

// The app staging dir based on the STAGING_DIR configuration if configured
// otherwise based on the users home directory.
// scalastyle:off FileSystemGet
val appStagingBaseDir = sparkConf.get(STAGING_DIR)
.map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
// scalastyle:on FileSystemGet

new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext()
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
} catch {
case e: Throwable =>
if (stagingDirPath != null) {
cleanupStagingDir()
}
throw e
}
}

6. 至此,本地jvm进程就结束了,剩下的yarn会启动driver进程

其他补充

SparkApplication

1
2
3
4
5
6
7
8
/**
* Entry point for a Spark application. Implementations must provide a no-argument constructor.
*/
private[spark] trait SparkApplication {

def start(args: Array[String], conf: SparkConf): Unit

}

SparkApplication是一个接口,只有一个方法,start,就是启动。通过该接口的实现,实现了不同资源的启动。具体有5个实现。

  • TestSparkApplication 测试类
  • ClientApp 以client模式启动,即driver启动在本地。
  • JavaMainApplication 直接用java main启动,这个暂时不是很清楚
  • RestSubmissionClientApp 通过rest接口启动
  • YarnClusterApplication 对接yarn资源启动,即driver启动在yarn容器上

提交的yarn的application长什么样?执行的是什么程序命令?

提交到yarn的application执行的命令在appContext.proto.amContainerSpec.command里。

其中,

  • appContext为ApplicationSubmissionContext类,具体实现为ApplicationSubmissionContextPBImpl
  • proto 为 ApplicationSubmissionContextProto
  • amContainerSpec 为 ContainerLaunchContextProto
  • command 为 LazyStringList

最终的命令如下所示:

可以看到,最终启动的java org.apache.spark.deploy.yarn.ApplicationMaster 这个类,同时将wordCount的启动命令 -- class org.apache.spark.examples.JavaWordCount --jar....等参数传递进去,等待进一步处理。

那么,appContext.proto.amContainerSpec.command是何时构建的呢?

基础流程是YarnClusterApplication->Client->ContainerLaunchContext->ApplicationSubmissionContext 这样传递的。

回到上文第四步,4. YarnClusterApplication.start方法new了Client,并继续调用run方法,在该方法中,YarnClusterApplication将args传递给了Client对象。

再回到上文第5步,5-clientrun调用submitapplication方法,在该方法的submitapplication,调用了createContainerLaunchContext方法构建ContainerLaunchContext,参数就是在这时候传递的。

1
2
3
4
5
6
7
8
9
10
11
def submitApplication(): Unit = {
ResourceRequestHelper.validateResources(sparkConf)
......
try {
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext()
val appContext = createApplicationSubmissionContext(newApp, containerContext)
......
}
.....
}

再来看createContainerLaunchContext的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private def createContainerLaunchContext(): ContainerLaunchContext = {
......
// 将 --class参数提取成 userClass常量
val userClass =
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar =
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
......
// 将 userClass 和其他参数合并成am的参数汇总
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
Seq("--dist-cache-conf",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))

//根据amArgs参数拼装最后执行的命令
// Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
//将命令传递给amContainer并返回。
amContainer.setCommands(printableCommands.asJava)
......
}

最后,通过

1
val appContext = createApplicationSubmissionContext(newApp, containerContext)

将container传递给ApplicationSubmissionContext

在通过

1
yarnClient.submitApplication(appContext)

提交app,这样,app就知道了启动后应该运行什么。

application是如何获取到代码的?

spark-submit的时候,jvm是本地的,jar就在本地,因此可以执行,然后提交后在yarn上运行是异地的,yarn上的application是如何拿到代码的呢?

答:
jar路径需要全局可访问,hdfs就是一个比较好的选择。

总结

SparkSubmit类,如它的名字,提交spark任务,经过SparkSubmit类,最终将之前编写好的spark程序(如wordCount)提交到了运行环境上(yarn或spark集群或k8s等),后续就到了真正的运行流程了。