Hi,
I think an example will help illustrate the model better.
/*** SimpleApp.scala ***/import org.apache.spark.SparkContextimport
org.apache.spark.SparkContext._
object SimpleApp { def main(args: Array[String]) { val logFile =
"$YOUR_SPARK_HOME/README.md" val sc = new SparkContext("local", "Simple
App", "YOUR_SPARK_HOME", List("target/scala-2.10/simple-project_2.10-1.0.jar"))
val logData = sc.textFile(logFile, 2).cache() val numAs =
logData.filter(line => line.contains("a")).filter(line =>
line.contains("c")).count() val numBs = logData.filter(line =>
line.contains("b")).count() println("Lines with a: %s, Lines with b:
%s".format(numAs, numBs)) }}
The example's DAG graph is corresponding to your graph:Let's see how it
works:1. val sc = new SparkContext // This line create the
SparkContext(which is the driver)
2. val numAs = logData.filter(line => line.contains("a")).filter(line =>
line.contains("c")).count()This is a job with 2 transformation and 1 action.
3. val numBs = logData.filter(line => line.contains("b")).count()This is
another job with 1 transformation and 1 action.
Remember the Scala's LAZY calculation strategy.
The job numAs will be calculated by invoking the count() method.It has 3
stages. FilteredRDD(1) <- FilteredRDD(2) <- RDD.count()
(1) RDD.count() will submit it as the Final Stage to DAGScheduler. (2)
DAGScheduler analyse the dependency chain, and asks RDD's parent FilteredRDD(2)
to be computed first, and FilteredRDD(2) will ask its parent FilteredRDD(1) to
computed first. FilteredRDD(1) is the first, so it will be computed.(3) Then
DAGScheduler wrap the FilteredRDD(1) as a TaskSet, and submit the TaskSet to
TaskSchedulerImple.(4) Then TaskSchedulerImple will schedule the TaskSet by
"FIFO" or "FAIR" strategy.(5) The tasks in TaskSet will be distributed to
different Executor. (6) After all the tasks of this TaskSet have finished. This
Stage is marked finished. (RDD will be cached by BlockStore, RDD data can be
shared in this SparkContext. If you have a job numCs,val numCs=
logData.filter(line => line.contains("a")).filter(line =>
line.contains("d")).count() the first filter(line => line.contains("a")) can
reuse the RDD data computed in numAs.)
(7) Then the FilteredRDD(2) will be computed. Then the RDD.count().(8) Finally
you have the result for numAs.
I think you now understand the submit&schedule&run process.Let's see the
questions:
1. Each DAGgraph is related with 1 action. You can write multiple actions in a
spark application. If you want these actions to run simultaneously, you have to
to submit these actions in different threads.2. I think you should pay
attention to "FIFO" or "FAIR" scheduler strategy. If the first action is too
large, maybe the second action will be starved.3. I think the question is how
to persist the RDD data to local disk?You could use saveAsTextFile(path) or
saveAsSequenceFile(path) to persist RDD data to local dist.
Hope this will help you.
Best regards,Patrick Liu
Date: Thu, 28 Aug 2014 23:34:29 -0700
From: [email protected]
To: [email protected]
Subject: Re: RE: The concurrent model of spark job/stage/task
hi, dear
Thanks for the response. Some comments below. and yes, I am using spark on
yarn.1. The release doc of spark says multi jobs can be submitted in one
application if the jobs(actions) are submit by different threads. I wrote some
java thread code in driver, one action in each thread, and the stages are run
concurrently which is observed on stages UI. In my understanding the
DAGscheduler generates different graph for each action. Not sure correct or
not.Originally I was hoping the sparkcontext can generate different jobs for
none-relevant actions, but never try it successfully.
2. If DAGscheduler generates graph as below, can 1 and 2 run concurrently?
3. I want to reterive the original data out of RDD and have other computation
on the data. Like get the value of tempreture or other data, and works on them.
[hidden email]
From: [hidden email]Date: 2014-08-29 14:01To: [hidden email]Subject: RE: The
concurrent model of spark job/stage/task
Hi,
Please see the answers following each question. If there's any mistake, please
let me know. Thanks!
I am not sure which mode you are running. So I will assume you are using
spark-submit script to submit spark applications to spark
cluster(spark-standalone or Yarn)
1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2
actions in the code, but the job still staged in index 0, 1, 2, 3... looks they
run secquencly.A spark application is a job, you init the application by create
a SparkContext. The SparkContext will init the driver program for you.So if you
want to run multiple jobs simultaneously, you have to split the jobs into
different applications, and submit each of them.
The driver program is like an ApplicationMaster in yarn. It translate the spark
application into a DAG graph, and schedule each stage to workers. Each stage
consists of multiple Tasks.The driver program handles the life cycle of a spark
application.
2. are the stages run currently? because they always number in order 0, 1. 2.
3.. I obverserved on the spark stage UI.No. Stages will run sequentially. It's
a DAG graph, each stage depends on its parent.
3. Can I retrieve the data out of RDD? like populate a pojo myself and compute
on it.Not sure what you mean?You can only retrieve a RDD related with your own
SparkContext. But once a spark application is finished, the SparkContext is
released. RDDs related with the SparkContext are released too.
Best regards,Patrick Liu
Date: Thu, 28 Aug 2014 18:35:44 -0700
From: [hidden email]
To: [hidden email]
Subject: The concurrent model of spark job/stage/task
hi, guys
I am trying to understand how spark work on the concurrent model. I read
below from https://spark.apache.org/docs/1.0.2/job-scheduling.html
quote" Inside a given Spark application (SparkContext instance), multiple
parallel jobs can run simultaneously if they were submitted from separate
threads. By “job”, in this section, we mean a Spark action (e.g. save, collect)
and any tasks that need to run to evaluate that action. Spark’s scheduler is
fully thread-safe and supports this use case to enable applications that serve
multiple requests (e.g. queries for multiple users)."
I searched everywhere but not get:1. how to start 2 or more jobs in one spark
driver, in java code.. I wrote 2 actions in the code, but the job still staged
in index 0, 1, 2, 3... looks they run secquencly.2. are the stages run
currently? because they always number in order 0, 1. 2. 3.. I obverserved on
the spark stage UI.3. Can I retrieve the data out of RDD? like populate a pojo
myself and compute on it.
Thanks in advance, guys.
[hidden email]
If you reply to this email, your message will be added to the
discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-tp13083.html
To start a new topic under Apache Spark User List, email
[hidden email]
To unsubscribe from Apache Spark User List, click here.
NAML
View this message in context: RE: The concurrent model of spark job/stage/task
Sent from the Apache Spark User List mailing list archive at Nabble.com.
If you reply to this email, your message will be added to the
discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-tp13083p13104.html
To start a new topic under Apache Spark User List, email
[email protected]
To unsubscribe from Apache Spark User List, click here.
NAML
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-tp13083p13120.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.