Spark Submit任务提交过程源码分析


由于对spark-submit提交后执行流程比较好奇,所以研究了一下spark源码,以下算是阅读笔记吧。

spark-submit启动脚本:

  • shell -z判断参数是否为空; $@:表示所有参数;$?:表示上一次程序返回值
  • 使用@ 或可以获取数组中的所有元素,例如:${array_name[]},${array_name[@]}
  • SparkSubmit–>yarn/Client–>ApplicationMaster
    • Client:负责提交作业到Master。
    • Master:接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。
    • Worker:负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,比如启动Driver和Executor。
  • shell read可以带有-a, -d, -e, -n, -p, -r, -t, 和 -s八个选项。
    1
    2
    3
    4
    5
    6
    7
    -a :将内容读入到数值中
    -d :表示delimiter,即定界符,一般情况下是以IFS为参数的间隔,但是通过-d,可以定义一直读到出现执行的字符位置。
    -n :用于限定最多可以有多少字符可以作为有效读入。
    -p :用于给出提示符,在前面的例子中我们使用了echo –n “…“来给出提示符,可以使用read –p ‘… my promt?’value的方式只需一个语句来表示。
    -r :在参数输入中,我们可以使用’/’表示没有输入完,换行继续输入。
    -s :对于一些特殊的符号,例如箭头号,不将他们在terminal上打印,我们按光标,在回车之后,如果要求显示,即echo,光标向上,如果不使用-s,在输入时,输入处显示^[[A,即在terminal上打印,之后如果要求echo,光标会上移。
    -t :用于表示等待输入的时间,单位为秒,等待时间超过,将继续执行后面的脚本,注意不作为null输入,参数将保留原有的值
1
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
  • spark-class代码(配置spark任务执行环境变量,提交执行程序):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    . "${SPARK_HOME}"/bin/load-spark-env.sh
    build_command() {
    "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
    printf "%d\0" $?
    }
    //解析参数
    CMD=()
    while IFS= read -d '' -r ARG; do
    CMD+=("$ARG")
    done < <(build_command "$@")
    COUNT=${#CMD[@]}
    LAST=$((COUNT - 1))
    LAUNCHER_EXIT_CODE=${CMD[$LAST]}
    if [ $LAUNCHER_EXIT_CODE != 0 ]; then
    exit $LAUNCHER_EXIT_CODE
    fi
    CMD=("${CMD[@]:0:$LAST}")
    exec "${CMD[@]}"
    exec "${CMD[@]}",即执行java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.deploy.SparkSubmit "$@",最终执行代码示例:
    /software/servers/jdk1.7.0_67/bin/java -cp /software/servers/druid/mart_risk/hadoop/lib/native/:/software/servers/druid/mart_risk/hadoop/share/hadoop/common/lib/hadoop-lzo-0.4.20.jar:/software/conf/druid/mart_risk/bdp_jmart_risk.bdp_jmart_risk_hkh/hive_conf/:/home/mart_risk/data_dir/sjmei/plugins/spark_2.0/conf/:/home/mart_risk/data_dir/sjmei/plugins/spark_2.0/jars/*:/software/conf/druid/mart_risk/bdp_jmart_risk.bdp_jmart_risk_hkh/hadoop_conf/ -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --conf spark.driver.memory=10g --properties-file ./conf/spark-defaults.conf --class com.jd.risk.dm.spark.ml.alphago.GBTMLlib --name spark gbt algo for devices --num-executors 10 --executor-memory 10g --executor-cores 2 --jars ./examples/jars/scopt_2.11-3.3.0.jar --queue bdp_jmart_risk.bdp_jmart_risk_hkh ./libs/jrdm-dm-2.0-SNAPSHOT.jar hdfs://ns2/user/mart_risk/dev.db/risk_jrdm_msj_devices_training_black training

deploy.SparkSubmit代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
...
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
/**
* Submit the application using the provided parameters.
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*/
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
...
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
...
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
/**
* Prepare the environment for submitting an application.
* This returns a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* Exposed for testing.
*/
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
var childMainClass = ""
// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
-1
}
// Set the deploy mode; default is client mode
var deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
}
// Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
// the master and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (clusterManager == YARN) {
(args.master, args.deployMode) match {
case ("yarn-cluster", null) =>
deployMode = CLUSTER
args.master = "yarn"
case ("yarn-cluster", "client") =>
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
case ("yarn-client", "cluster") =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
case (_, mode) =>
args.master = "yarn"
}
// Make sure YARN is included in our build if we're trying to use it
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
printErrorAndExit(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
}
}
// Update args.deployMode if it is null. It will be passed down as a Spark property later.
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _ =>
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val exclusions: Seq[String] =
if (!StringUtils.isBlank(args.packagesExclusions)) {
args.packagesExclusions.split(",")
} else {
Nil
}
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
Option(args.repositories), Option(args.ivyRepoPath), exclusions = exclusions)
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython) {
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
}
}
// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}
// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local.
// In Mesos cluster mode, non-local python files are automatically downloaded by Mesos.
if (args.isPython && !isYarnCluster && !isMesosCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
}
val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles")
}
}
// Require all R files to be local
if (args.isR && !isYarnCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local R files are supported: $args.primaryResource")
}
}
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
...
}
// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}
// In YARN mode for an R app, add the SparkR package archive and the R package
// archive containing all of the built R libraries to archives so that they can
// be distributed with the job
...
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
sysProp = "spark.driver.extraLibraryPath"),
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),
// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
)
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
if (args.childArgs != null) { childArgs ++= args.childArgs }
}
// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
}
}
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python and R files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !args.isPython && !args.isR) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
}
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
// All Spark parameters are expected to be passed to the client through system properties.
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
// Let YARN know it's a pyspark app, so it distributes needed libraries.
if (clusterManager == YARN) {
if (args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}
// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
if (isMesosCluster) {
...
}
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}
// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sysProps -= "spark.driver.host"
}
// Resolve paths in certain spark properties
val pathConfigs = Seq(
"spark.jars",
"spark.files",
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sysProps.get(config).foreach { oldValue =>
sysProps(config) = Utils.resolveURIs(oldValue)
}
}
// Resolve and format python file paths properly before adding them to the PYTHONPATH.
// The resolving part is redundant in the case of --py-files, but necessary if the user
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
sysProps("spark.submit.pyFiles") = formattedPyFiles
}
(childArgs, childClasspath, sysProps, childMainClass)
}
/**
* Run the main method of the child class using the provided launch environment.
* Note that this main class will not be the one provided by the user if we
* are running cluster deploy mode or python applications.
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
...
}

deploy.yarn.Client代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
"future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
}
// Set an env variable indicating we are running in YARN mode.
// Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
val args = new ClientArguments(argStrings)
new Client(args, sparkConf).run()
}
/**
* Submit an application to the ResourceManager.
* If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
* reporting the application's status until the application has exited for any reason.
* Otherwise, the client process will exit after submission.
* If the application finishes with a failed, killed, or undefined status,
* throw an appropriate SparkException.
*/
def run(): Unit = {
this.appId = submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)//ApplicationReport是应用程序的报告(包括程序用户、程序队列、程序名称等等)
val state = report.getYarnApplicationState//得到应用程序的完成状态
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else { //涉及两个对象YarnApplicationState(对于yarn来说任务的状态)、FinalApplicationStatus(对于任务来说任务的运行状态)
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
throw new SparkException(s"Application $appId finished with failed status")
}
if (yarnApplicationState == YarnApplicationState.KILLED ||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
}
/**
* Submit an application running our ApplicationMaster to the ResourceManager.
* The stable Yarn API provides a convenience method (YarnClient#createApplication) for
* creating applications and setting up the application submission context. This was not
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
val newApp = yarnClient.createApplication()// 构建ApplicationMaster的container(包括jar包路径 userClas等)
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
appId
} catch {
...
}
}
/**
* Set up a ContainerLaunchContext to launch our ApplicationMaster container.
* This sets up the launch environment, java options, and the command for launching the AM.
*/
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
val pySparkArchives =
if (sparkConf.get(IS_PYTHON_APP)) {
findPySparkArchives()
} else {
Nil
}
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
val javaOpts = ListBuffer[String]()
// Set the environment variable through a command prefix
// to append to the existing value of the variable
var prefixEnv: Option[String] = None
// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"
val tmpDir = new Path(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
)
javaOpts += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
// hence if there are multiple containers in same node, Spark GC affects all other containers'
// performance (which can be that of other Spark containers)
// Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
// multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
// of cores on a node.
val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
javaOpts += "-XX:+UseConcMarkSweepGC"
javaOpts += "-XX:MaxTenuringThreshold=31"
javaOpts += "-XX:SurvivorRatio=8"
javaOpts += "-XX:+CMSIncrementalMode"
javaOpts += "-XX:+CMSIncrementalPacing"
javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
// Include driver-specific java options if we are launching a driver
if (isClusterMode) {
val driverOpts = sparkConf.get(DRIVER_JAVA_OPTIONS).orElse(sys.env.get("SPARK_JAVA_OPTS"))
driverOpts.foreach { opts =>
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) {
prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths)))
}
if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode")
}
} else {
// Validate and include yarn am specific java options in yarn-client mode.
sparkConf.get(AM_JAVA_OPTIONS).foreach { opts =>
if (opts.contains("-Dspark")) {
val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to set Spark options (was '$opts')."
throw new SparkException(msg)
}
if (opts.contains("-Xmx")) {
val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to specify max heap memory settings " +
s"(was '$opts'). Use spark.yarn.am.memory instead."
throw new SparkException(msg)
}
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths))))
}
}
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts)
val userClass =
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar =
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
val primaryPyFile =
if (isClusterMode && args.primaryPyFile != null) {
Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
} else {
Nil
}
val primaryRFile =
if (args.primaryRFile != null) {
Seq("--primary-r-file", args.primaryRFile)
} else {
Nil
}
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
userArgs ++ Seq(
"--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
) ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
...
// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
setupSecurityToken(amContainer)
amContainer
}
/**
* Set up the context for submitting our ApplicationMaster.
* This uses the YarnClientApplication not available in the Yarn alpha API.
*/
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
appContext.setQueue(sparkConf.get(QUEUE_NAME))
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
sparkConf.get(APPLICATION_TAGS).foreach { tags =>
try {
// The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
// reflection to set it, printing a warning if a tag was specified but the YARN version
// doesn't support it.
val method = appContext.getClass().getMethod(
"setApplicationTags", classOf[java.util.Set[String]])
method.invoke(appContext, new java.util.HashSet[String](tags.asJava))
} catch {
...
}
}
sparkConf.get(MAX_APP_ATTEMPTS) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
"Cluster's default value will be used.")
}
sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
try {
val method = appContext.getClass().getMethod(
"setAttemptFailuresValidityInterval", classOf[Long])
method.invoke(appContext, interval: java.lang.Long)
} catch {
...
}
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(amMemory + amMemoryOverhead)
capability.setVirtualCores(amCores)
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
try {
val amRequest = Records.newRecord(classOf[ResourceRequest])
amRequest.setResourceName(ResourceRequest.ANY)
amRequest.setPriority(Priority.newInstance(0))
amRequest.setCapability(capability)
amRequest.setNumContainers(1)
val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
method.invoke(amRequest, expr)
val setResourceRequestMethod =
appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
setResourceRequestMethod.invoke(appContext, amRequest)
} catch {
...
}
case None =>
appContext.setResource(capability)
}
appContext
}

yarn.client.api.impl.YarnClientImpl代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
public YarnClientApplication createApplication() throws YarnException, IOException {
ApplicationSubmissionContext context = (ApplicationSubmissionContext)Records.newRecord(ApplicationSubmissionContext.class);
GetNewApplicationResponse newApp = this.getNewApplication();
ApplicationId appId = newApp.getApplicationId();
context.setApplicationId(appId);
return new YarnClientApplication(newApp, context);
}
ApplicationId getNewApplicationId() {
ApplicationId applicationId = BuilderUtils.newApplicationId(this.recordFactory, ResourceManager.getClusterTimeStamp(), this.applicationCounter.incrementAndGet());
LOG.info("Allocated new applicationId: " + applicationId.getId());
return applicationId;
}
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if(applicationId == null) {
throw new ApplicationIdNotProvidedException("ApplicationId is not provided in ApplicationSubmissionContext");
} else {
SubmitApplicationRequest request = (SubmitApplicationRequest)Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
if(this.isSecurityEnabled() && this.timelineServiceEnabled) {
this.addTimelineDelegationToken(appContext.getAMContainerSpec());
}
this.rmClient.submitApplication(request);
int pollCount = 0;
long startTime = System.currentTimeMillis();
EnumSet waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED);
EnumSet failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED);
while(true) {
while(true) {
try {
ApplicationReport ex = this.getApplicationReport(applicationId);
YarnApplicationState state = ex.getYarnApplicationState();
if(!waitingStates.contains(state)) {
if(failToSubmitStates.contains(state)) {
throw new YarnException("Failed to submit " + applicationId + " to YARN : " + ex.getDiagnostics());
}
LOG.info("Submitted application " + applicationId);
return applicationId;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if(this.enforceAsyncAPITimeout() && elapsedMillis >= this.asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully");
}
++pollCount;
if(pollCount % 10 == 0) {
LOG.info("Application submission is not finished, submitted application " + applicationId + " is still in " + state);
}
try {
Thread.sleep(this.submitPollIntervalMillis);
} catch (InterruptedException var14) {
LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted.");
}
} catch (ApplicationNotFoundException var15) {
LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext");
this.rmClient.submitApplication(request);
}
}
}
}
}
public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnException {
ApplicationSubmissionContext submissionContext = request.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException var7) {
LOG.warn("Unable to get the current user.", var7);
RMAuditLogger.logFailure(user, "Submit Application Request", var7.getMessage(), "ClientRMService", "Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(var7);
}
if(this.rmContext.getRMApps().get(applicationId) != null) {
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
} else {
if(submissionContext.getQueue() == null) {
submissionContext.setQueue("default");
}
if(submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName("N/A");
}
if(submissionContext.getApplicationType() == null) {
submissionContext.setApplicationType("YARN");
} else if(submissionContext.getApplicationType().length() > 20) {
submissionContext.setApplicationType(submissionContext.getApplicationType().substring(0, 20));
}
try {
this.rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user);
LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user);
RMAuditLogger.logSuccess(user, "Submit Application Request", "ClientRMService", applicationId);
} catch (YarnException var6) {
LOG.info("Exception in submitting application with id " + applicationId.getId(), var6);
RMAuditLogger.logFailure(user, "Submit Application Request", var6.getMessage(), "ClientRMService", "Exception in submitting application", applicationId);
throw var6;
}
SubmitApplicationResponse response = (SubmitApplicationResponse)this.recordFactory.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
}
protected void submitApplication(ApplicationSubmissionContext submissionContext, long submitTime, String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
RMAppImpl application = this.createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
ApplicationId appId = submissionContext.getApplicationId();
if(UserGroupInformation.isSecurityEnabled()) {
try {
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, this.parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), application.getUser());
} catch (Exception var9) {
LOG.warn("Unable to parse credentials.", var9);
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, var9.getMessage()));
throw RPCUtil.getRemoteException(var9);
}
} else {
this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
}

deploy.yarn.ApplicationMaster代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
// Load the properties file with the Spark configuration and set entries as system properties,
// so that user code run inside the AM also has access to them.
// Note: we must do this before SparkHadoopUtil instantiated
if (amArgs.propertiesFile != null) {
Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
}
}
final def run(): Int = {
try {
val appAttemptId = client.getAttemptId()
if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
// Set the master and deploy mode property to match the requested mode.
System.setProperty("spark.master", "yarn")
System.setProperty("spark.submit.deployMode", "cluster")
// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
}
logInfo("ApplicationAttemptId: " + appAttemptId)
val fs = FileSystem.get(yarnConf)
// This shutdown hook should run *after* the SparkContext is shut down.
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) { () =>
val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) {
// The default state of ApplicationMaster is failed if it is invoked by shut down hook.
// This behavior is different compared to 1.x version.
// If user application is exited ahead of time by calling System.exit(N), here mark
// this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
// System.exit(0) to terminate the application.
finish(finalStatus,
ApplicationMaster.EXIT_EARLY,
"Shutdown hook called before final status was reported.")
}
if (!unregistered) {
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir(fs)
}
}
}
// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
}
if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
} catch {
case e: Exception =>
// catch everything else if not specifically handled
logError("Uncaught exception: ", e)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + e)
}
exitCode
}
private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
val sc = waitForSparkContextInitialized()
// If there is no SparkContext at this point, just fail the app.
if (sc == null) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} else {
rpcEnv = sc.env.rpcEnv
val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
}
/**
* Start the user class, which contains the spark driver, in a separate Thread.
* If the main routine exits cleanly or exits with System.exit(N) for any N
* we assume it was successful, for all other cases we assume failure.
*
* Returns the user thread that was started.
*/
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
val userClassLoader =
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
// When running pyspark, the app is run using PythonRunner. The second argument is the list
// of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
userArgs = Seq(args.primaryPyFile, "") ++ userArgs
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
}
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run() {
try {
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running users class")
} catch {
...
}
}
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
}
/**
* Create an [[RpcEndpoint]] that communicates with the driver.
*
* In cluster mode, the AM and the driver belong to same process
* so the AMEndpoint need not monitor lifecycle of the driver.
*
* @return A reference to the driver's RPC endpoint.
*/
private def runAMEndpoint(
host: String,
port: String,
isClusterMode: Boolean): RpcEndpointRef = {
val driverEndpoint = rpcEnv.setupEndpointRef(
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
driverEndpoint
}
private def registerAM(
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: String,
securityMgr: SecurityManager) = {
val sc = sparkContextRef.get()
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress =
sparkConf.get(HISTORY_SERVER_ADDRESS)
.map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
val _sparkConf = if (sc != null) sc.getConf else sparkConf
val driverUrl = RpcEndpointAddress(
_sparkConf.get("spark.driver.host"),
_sparkConf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
allocator.allocateResources()
reporterThread = launchReporterThread()
}
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.getInt("spark.yarn.am.port", 0)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
}

YarnRMClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Registers the application master with the RM.
*
* @param conf The Yarn configuration.
* @param sparkConf The Spark configuration.
* @param uiAddress Address of the SparkUI.
* @param uiHistoryAddress Address of the application on the History Server.
* @param securityMgr The security manager.
* @param localResources Map with information about files distributed via YARN's cache.
*/
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: String,
uiHistoryAddress: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]
): YarnAllocator = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
logInfo("Registering the ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources)
}

YarnAllocator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
* what to do with containers when YARN fulfills these requests.
*
* This class makes use of YARN's AMRMClient APIs. We interact with the AMRMClient in three ways:
* * Making our resource needs known, which updates local bookkeeping about containers requested.
* * Calling "allocate", which syncs our local container requests with the RM, and returns any
* containers that YARN has granted to us. This also functions as a heartbeat.
* * Processing the containers granted to us to possibly launch executors inside of them.
*
* The public methods of this class are thread-safe. All methods that mutate state are
* synchronized.
*/
/**
* Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
* equal to maxExecutors.
*
* Deal with any containers YARN has granted to us by possibly launching executors in them.
*
* This must be synchronized because variables read in this method are mutated by other methods.
*/
def allocateResources(): Unit = synchronized {
updateResourceRequests()
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// requests.
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
.format(
allocatedContainers.size,
numExecutorsRunning,
allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala)
}
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning))
}
}
/**
* Handle containers granted by the RM by launching executors on them.
*
* Due to the way the YARN allocation protocol works, certain healthy race conditions can result
* in YARN granting containers that we no longer need. In this case, we release them.
*
* Visible for testing.
*/
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// Match incoming requests by host
val remainingAfterHostMatches = new ArrayBuffer[Container]
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
// Match remaining by rack
val remainingAfterRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
// Assign remaining that are neither node-local nor rack-local
val remainingAfterOffRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterRackMatches) {
matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
remainingAfterOffRackMatches)
}
if (!remainingAfterOffRackMatches.isEmpty) {
logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
s"allocated to us")
for (container <- remainingAfterOffRackMatches) {
internalReleaseContainer(container)
}
}
runAllocatedContainers(containersToUse)
logInfo("Received %d containers from YARN, launching executors on %d of them."
.format(allocatedContainers.size, containersToUse.size))
}
@Override
public AllocateResponse allocate(float progressIndicator)
throws YarnException, IOException {
Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative");
AllocateResponse allocateResponse = null;
List<ResourceRequest> askList = null;
List<ContainerId> releaseList = null;
AllocateRequest allocateRequest = null;
List<String> blacklistToAdd = new ArrayList<String>();
List<String> blacklistToRemove = new ArrayList<String>();
try {
synchronized (this) {
askList = new ArrayList<ResourceRequest>(ask.size());
for(ResourceRequest r : ask) {
// create a copy of ResourceRequest as we might change it while the
// RPC layer is using it to send info across
askList.add(ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
r.getRelaxLocality(), r.getNodeLabelExpression()));
}
releaseList = new ArrayList<ContainerId>(release);
// optimistically clear this collection assuming no RPC failure
ask.clear();
release.clear();
blacklistToAdd.addAll(blacklistAdditions);
blacklistToRemove.addAll(blacklistRemovals);
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(blacklistToAdd,
blacklistToRemove);
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
askList, releaseList, blacklistRequest);
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
blacklistRemovals.clear();
}
try {
allocateResponse = rmClient.allocate(allocateRequest);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
.values()) {
for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
for (ResourceRequestInfo request : capabalities.values()) {
addResourceRequestToAsk(request.remoteRequest);
}
}
}
}
// re register with RM
registerApplicationMaster();
allocateResponse = allocate(progressIndicator);
return allocateResponse;
}
synchronized (this) {
// update these on successful RPC
clusterNodeCount = allocateResponse.getNumClusterNodes();
lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources();
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
if (allocateResponse.getAMRMToken() != null) {
updateAMRMToken(allocateResponse.getAMRMToken());
}
if (!pendingRelease.isEmpty()
&& !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
removePendingReleaseRequests(allocateResponse
.getCompletedContainersStatuses());
}
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
if(allocateResponse == null) {
// we hit an exception in allocate()
// preserve ask and release for next call to allocate()
synchronized (this) {
release.addAll(releaseList);
// requests could have been added or deleted during call to allocate
// If requests were added/removed then there is nothing to do since
// the ResourceRequest object in ask would have the actual new value.
// If ask does not have this ResourceRequest then it was unchanged and
// so we can add the value back safely.
// This assumes that there will no concurrent calls to allocate() and
// so we dont have to worry about ask being changed in the
// synchronized block at the beginning of this method.
for(ResourceRequest oldAsk : askList) {
if(!ask.contains(oldAsk)) {
ask.add(oldAsk);
}
}
blacklistAdditions.addAll(blacklistToAdd);
blacklistRemovals.addAll(blacklistToRemove);
}
}
}
return allocateResponse;
}
public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException {
AMRMTokenIdentifier amrmTokenIdentifier = this.authorizeRequest();
ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId();
ApplicationId applicationId = appAttemptId.getApplicationId();
this.amLivelinessMonitor.receivedPing(appAttemptId);
ApplicationMasterService.AllocateResponseLock lock = (ApplicationMasterService.AllocateResponseLock)this.responseMap.get(appAttemptId);
if(lock == null) {
String message = "Application attempt " + appAttemptId + " doesn\'t exist in ApplicationMasterService cache.";
LOG.error(message);
throw new ApplicationAttemptNotFoundException(message);
} else {
synchronized(lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
String filteredProgress1;
if(!this.hasApplicationMasterRegistered(appAttemptId)) {
filteredProgress1 = "AM is not registered for known application attempt: " + appAttemptId + " or RM had restarted after AM registered . AM should re-register.";
LOG.info(filteredProgress1);
RMAuditLogger.logFailure(((RMApp)this.rmContext.getRMApps().get(appAttemptId.getApplicationId())).getUser(), "App Master Heartbeats", "", "ApplicationMasterService", filteredProgress1, applicationId, appAttemptId);
throw new ApplicationMasterNotRegisteredException(filteredProgress1);
} else if(request.getResponseId() + 1 == lastResponse.getResponseId()) {
return lastResponse;
} else if(request.getResponseId() + 1 < lastResponse.getResponseId()) {
filteredProgress1 = "Invalid responseId in AllocateRequest from application attempt: " + appAttemptId + ", expect responseId to be " + (lastResponse.getResponseId() + 1);
throw new InvalidApplicationMasterRequestException(filteredProgress1);
} else {
float filteredProgress = request.getProgress();
if(!Float.isNaN(filteredProgress) && filteredProgress != -1.0F / 0.0 && filteredProgress >= 0.0F) {
if(filteredProgress > 1.0F || filteredProgress == 1.0F / 0.0) {
request.setProgress(1.0F);
}
} else {
request.setProgress(0.0F);
}
this.rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptStatusupdateEvent(appAttemptId, request.getProgress()));
List ask = request.getAskList();
List release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();
List blacklistAdditions = blacklistRequest != null?blacklistRequest.getBlacklistAdditions():Collections.EMPTY_LIST;
List blacklistRemovals = blacklistRequest != null?blacklistRequest.getBlacklistRemovals():Collections.EMPTY_LIST;
RMApp app = (RMApp)this.rmContext.getRMApps().get(applicationId);
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
Iterator allocation = ask.iterator();
while(allocation.hasNext()) {
ResourceRequest appAttempt = (ResourceRequest)allocation.next();
if(null == appAttempt.getNodeLabelExpression() && "*".equals(appAttempt.getResourceName())) {
appAttempt.setNodeLabelExpression(asc.getNodeLabelExpression());
}
}
try {
RMServerUtils.normalizeAndValidateRequests(ask, this.rScheduler.getMaximumResourceCapability(), app.getQueue(), this.rScheduler, this.rmContext);
} catch (InvalidResourceRequestException var31) {
LOG.warn("Invalid resource ask by application " + appAttemptId, var31);
throw var31;
}
try {
RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException var30) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, var30);
throw var30;
}
if(!app.getApplicationSubmissionContext().getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException var29) {
LOG.warn("Invalid container release by application " + appAttemptId, var29);
throw var29;
}
}
Allocation allocation1 = this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals);
if(!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler.blacklistAdditions: " + blacklistAdditions + ", " + "blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt1 = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse = (AllocateResponse)this.recordFactory.newRecordInstance(AllocateResponse.class);
if(!allocation1.getContainers().isEmpty()) {
allocateResponse.setNMTokens(allocation1.getNMTokens());
}
ArrayList updatedNodes = new ArrayList();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
ArrayList nextMasterKey = new ArrayList();
Iterator appAttemptImpl = updatedNodes.iterator();
while(appAttemptImpl.hasNext()) {
RMNode amrmToken = (RMNode)appAttemptImpl.next();
SchedulerNodeReport schedulerNodeReport = this.rScheduler.getNodeReport(amrmToken.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if(schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeId nodeId = amrmToken.getNodeID();
NodeReport report = BuilderUtils.newNodeReport(nodeId, amrmToken.getState(), amrmToken.getHttpAddress(), amrmToken.getRackName(), used, amrmToken.getTotalCapability(), numContainers, amrmToken.getHealthReport(), amrmToken.getLastHealthReportTime(), amrmToken.getNodeLabels());
nextMasterKey.add(report);
}
allocateResponse.setUpdatedNodes(nextMasterKey);
}
allocateResponse.setAllocatedContainers(allocation1.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt1.pullJustFinishedContainers());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation1.getResourceLimit());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
allocateResponse.setPreemptionMessage(this.generatePreemptionMessage(allocation1));
MasterKeyData nextMasterKey1 = this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();
if(nextMasterKey1 != null && nextMasterKey1.getMasterKey().getKeyId() != amrmTokenIdentifier.getKeyId()) {
RMAppAttemptImpl appAttemptImpl1 = (RMAppAttemptImpl)appAttempt1;
Token amrmToken1 = appAttempt1.getAMRMToken();
if(nextMasterKey1.getMasterKey().getKeyId() != appAttemptImpl1.getAMRMTokenKeyId()) {
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back to application: " + applicationId);
amrmToken1 = this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(appAttemptId);
appAttemptImpl1.setAMRMToken(amrmToken1);
}
allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token.newInstance(amrmToken1.getIdentifier(), amrmToken1.getKind().toString(), amrmToken1.getPassword(), amrmToken1.getService().toString()));
}
lock.setAllocateResponse(allocateResponse);
return allocateResponse;
}
}
}
}

总结

Yarn-Cluster模式

客户端操作:

  1. SparkSubmit中根据yarnConf来初始化yarnClient,并启动yarnClient
  2. 创建客户端Application,并获取Application的ID,进一步判断集群中的资源是否满足executor和ApplicationMaster申请的资源,如果不满足则抛出IllegalArgumentException;
  3. 设置资源、环境变量:其中包括了设置Application的Staging目录、准备本地资源(jar文件、log4j.properties)、设置Application其中的环境变量、创建Container启动的Context等;
  4. 设置Application提交的Context,包括设置应用的名字、队列、AM的申请的Container、标记该作业的类型为spark;
  5. 申请Memory,最终通过submitApplication方法向ResourceManager提交该Application。当作业提交到YARN上之后,客户端就没事了,会关闭此进程,因为整个作业运行在YARN集群上进行,运行的结果将会保存到HDFS或者日志中。

Yarn操作:

  1. 运行ApplicationMaster的run方法;
  2. 设置好相关的环境变量;
  3. 创建amClient,并启动;
  4. 在Spark UI启动之前设置Spark UI的AmIpFilter;
  5. 在startUserClass函数专门启动了一个线程(名称为Driver的线程)来启动用户提交的Application,也就是启动了Driver。在Driver中将会初始化SparkContext;
  6. 等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次数(默认为10),如果等待了的次数超过了配置的,程序将会退出;否则用SparkContext初始化yarnAllocator;
    6.1. 怎么知道SparkContext初始化完成?
    其实在5步骤中启动Application的过程中会初始化SparkContext,在初始化SparkContext的时候将会创建YarnClusterScheduler,在SparkContext初始化完成的时候,会调用YarnClusterScheduler类中的postStartHook方法,而该方法会通知ApplicationMaster已经初始化好了SparkContext
    6.2. 为何要等待SparkContext初始化完成?
    CoarseGrainedExecutorBackend启动后需要向CoarseGrainedSchedulerBackend注册
  7. 当SparkContext初始化完成的时候,通过amClient向ResourceManager注册ApplicationMaster
  8. 分配并启动Executeors。在启动Executeors之前,先要通过yarnAllocator获取到numExecutors个Container,然后在Container中启动Executeors。如果在启动Executeors的过程中失败的次数达到了maxNumExecutorFailures的次数,那么这个Application将失败,将Application Status标明为FAILED,并将关闭SparkContext。其实,启动Executeors是通过ExecutorRunnable实现的,而ExecutorRunnable内部是启动CoarseGrainedExecutorBackend的,CoarseGrainedExecutorBackend启动后会向SchedulerBackend注册。
    (resourceManager是如何决定该分配几个container? 在shell提交时跟参数 默认启动两个executor)
  9. 最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。

Client模式:

客户端操作:

  1. 通过SparkSubmit类的launch的函数直接调用作业的main函数(通过反射机制实现),如果是集群模式就会调用Client的main函数。
  2. 而应用程序的main函数一定都有个SparkContent,并对其进行初始化;
  3. 在SparkContent初始化中将会依次做如下的事情:设置相关的配置、注册MapOutputTracker、BlockManagerMaster、BlockManager,创建taskScheduler和dagScheduler;其中比较重要的是创建taskScheduler和dagScheduler。在创建taskScheduler的时候会根据我们传进来的master来选择Scheduler和SchedulerBackend。由于我们选择的是yarn-client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend,并将YarnClientSchedulerBackend的实例初始化YarnClientClusterScheduler,上面两个实例的获取都是通过反射机制实现的,YarnClientSchedulerBackend类是CoarseGrainedSchedulerBackend类的子类,YarnClientClusterScheduler是TaskSchedulerImpl的子类,仅仅重写了TaskSchedulerImpl中的getRackForHost方法。
  4. 初始化完taskScheduler后,将创建dagScheduler,然后通过taskScheduler.start()启动taskScheduler,而在taskScheduler启动的过程中也会调用SchedulerBackend的start方法。在SchedulerBackend启动的过程中将会初始化一些参数,封装在ClientArguments中,并将封装好的ClientArguments传进Client类中,并client.submitApplication()方法获取Application ID。

Yarn操作:

  1. 运行ApplicationMaster的run方法(runExecutorLauncher);
  2. 无需等待SparkContext初始化完成(因为YarnClientClusterScheduler已启动完成),向sparkYarnAM注册该Application
  3. 分配Executors,这里面的分配逻辑和yarn-cluster里面类似,就不再说了。
  4. 最后,Task将在CoarseGrainedExecutorBackend里面运行,然后运行状况会通过Akka通知CoarseGrainedScheduler,直到作业运行完成。
  5. 在作业运行的时候,YarnClientSchedulerBackend会每隔1秒通过client获取到作业的运行状况,并打印出相应的运行信息,当Application的状态是FINISHED、FAILED和KILLED中的一种,那么程序将退出等待。
  6. 最后有个线程会再次确认Application的状态,当Application的状态是FINISHED、FAILED和KILLED中的一种,程序就运行完成,并停止SparkContext。整个过程就结束了。

Yarn的ApplicationMaster管理

  1. client向RM提交程序(包含AM程序, AM启动命令,用户程序);
  2. RM向资源调度器去申请资源,一旦申请的AM需要的资源,AM Laucher 便与对应的NodeManager联系启动
  3. AM同时向AM LivenessMonitor添加进监控列表,启动对AM的监控
  4. AM启动后,向AM Service注册报告自己的端口号,ip,track url等,之后AM会定期向AM Service发送心跳,执行allocate,AM Service会向AM LivenessMonitor更新AM的心跳时间
  5. 当用户程序执行完毕,AM向AM Service报告完成,AM Service通知AM LivenessMonitor从监控列表中删除AM,释放资源。

Reference:

  1. Spark on Yarn 任务提交流程源码分析
  2. Yarn的ApplicationMaster管理
  3. Hadoop Yarn详解