My Spark streaming application processes the data received in each interval.
In Spark Stages UI, all the stages are pointed to single line of code*
windowDStream.foreachRDD* only (not the actions inside the DStream)
- Following is the information from Spark Stages UI page:
Stage Id Description
Submitted Duration Tasks: Succeeded/Total Input
Output Shuffle Read Shuffle Write
2 foreachRDD at Parser.scala:58 +details 06-04-2015 16:21
19 min 3125/3125 (43 failed) 154.4 MB 23.9 MB
1 foreachRDD at Parser.scala:58 +details 06-04-2015 16:19
2.3 min 3125/3125 149.7 MB
0 foreachRDD at Parser.scala:58 +details 06-04-2015 16:16
3.0 min 3125/3125 149.7 MB
- Following is the code snippet at Parser.scala:58:
val windowDStream = ssc.fileStream[LongWritable, Text,
CustomInputFormat](args(0), (x : Path) => true, false)
*windowDStream.foreachRDD *{ IncomingFiles =>
println("Interval data processing
"+Calendar.getInstance().getTime());
if (IncomingFiles.count() == 0) {
println("No files received in this interval")
} else {
println(IncomingFiles.count()+" files received in this
interval");
//convert each xml text to RDD[Elem]
val inputRDD = IncomingFiles.map(eachXML => {
MyXML.loadString(eachXML._2.toString().trim().replaceFirst("^([\\W]+)<",
"<")) });
//Create a schema RDD for querying the data
val MySchemaRDD = inputRDD.map(x => {
Bd((x \\ "Oied" \\ "oeuo").text, List("placeholder1",
"placeholder2", "placeholder3"))
//Bd is a case class - case class Bd(oeuo : String, mi
: List[String])
})
// Save the file for debuging
MySchemaRDD.saveAsTextFile("/home/spark/output/result.txt")
//Spark SQL processing starts from here
MySchemaRDD.registerTempTable("MySchemaTable")
//Todo processing with Sparl-SQL
MySchemaRDD.printSchema()
println("end of processing");
}
}
Spark UI Details for Stage 2
http://pastebin.com/c2QYeSJj
I have tested this with 150 MB of input data.
All the Spark memory options as default and with executor Memory 512.0 MB.
- Is it possible to see the stages information within the *windowDStream*
operation (which action inside the Dstream processing)?
- During Stage 2 executor had restarted many times due to
OutOfMemoryError. is this an expected behavior? (Please find the stage 2
details)
Regards
Vijay
On 3 April 2015 at 13:21, Tathagata Das <[email protected]> wrote:
> What he meant is that look it up in the Spark UI, specifically in the
> Stage tab to see what is taking so long. And yes code snippet helps us
> debug.
>
> TD
>
> On Fri, Apr 3, 2015 at 12:47 AM, Akhil Das <[email protected]>
> wrote:
>
>> You need open the Stage\'s page which is taking time, and see how long
>> its spending on GC etc. Also it will be good to post that Stage and its
>> previous transformation's code snippet to make us understand it better.
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Apr 3, 2015 at 1:05 PM, Vijay Innamuri <[email protected]>
>> wrote:
>>
>>>
>>> When I run the Spark application (streaming) in local mode I could see
>>> the execution progress as below..
>>>
>>> [Stage
>>> 0:========================================================================>
>>> (1817 + 1) / 3125]
>>> ....
>>> [Stage
>>> 2:=======================================>
>>> (740 + 1) / 3125]
>>>
>>> One of the stages is taking long time for execution.
>>>
>>> How to find the transformations/ actions associated with a particular
>>> stage?
>>> Is there anyway to find the execution DAG of a Spark Application?
>>>
>>> Regards
>>> Vijay
>>>
>>
>>
>