Here is the code for my streaming job.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~val sparkConf = new 
SparkConf().setAppName("SparkStreamingJob")
sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")sparkConf.set("spark.default.parallelism",
 "100")sparkConf.set("spark.shuffle.consolidateFiles", 
"true")sparkConf.set("spark.speculation", 
"true")sparkConf.set("spark.speculation.interval", 
"5000")sparkConf.set("spark.speculation.quantile", 
"0.9")sparkConf.set("spark.speculation.multiplier", 
"3")sparkConf.set("spark.mesos.coarse", 
"true")sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
-XX:+UseConcMarkSweepGC")sparkConf.set("spark.shuffle.manager", "SORT")
val ssc = new StreamingContext(sparkConf, 
Seconds(10))ssc.checkpoint(checkpointDir)
val topics = "trace"val numThreads = 1val topicMap = 
topics.split(",").map((_,numThreads)).toMap
val kafkaPartitions = 20val kafkaDStreams = (1 to kafkaPartitions).map { _ =>  
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)}
val lines = ssc.union(kafkaDStreams)val words = lines.map(line => 
doSomething_1(line))val filteredWords = words.filter(word => word != "test")val 
groupedWords = filteredWords.map(word => (word, 1))
val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _, 
Seconds(30), Seconds(10))val windowedWordsFiltered = 
windowedWordCounts.filter{case (word, count) => count > 50}val finalResult = 
windowedWordsFiltered.foreachRDD(words => doSomething_2(words))
ssc.start()ssc.awaitTermination()~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
I am running this job on a 9 slave AWS EC2 cluster with each slave node has 32 
vCPU & 60GB memory.
When I start this job, the processing time is usually around 5 - 6 seconds for 
the 10 seconds batch and the scheduling delay is around 0 seconds or a few ms. 
However, as the job run for 6 - 8 hours, the processing time increases to 15 - 
20 seconds but the scheduling delay is increasing to 4 - 6 hours.
When I look at the completed stages, I see that the time taken for getCallSite 
at DStream.scala:294 keeps increasing as time passes by. It goes from around 2 
seconds to more than a few minutes.
Clicking on +details next to this stage description shows the following 
execution 
trace.org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)scala.Option.orElse(Option.scala:257)org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)scala.util.Try$.apply(Try.scala:161)org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
When I click on one of these slow stages that executed after 6 - 8 hours, I 
find the following information for individual tasks inside.
- All tasks seem to execute with PROCESS_LOCAL locality.- Quite a few of these 
tasks seem to spend anywhere between 30 - 80% of their time in GC. Although, 
when I look at the total memory usage on each of the slave nodes under 
executors information, I see that the usage is only around 200MB out of 20GB 
available.
Even after a few hours, the map stages (val groupedWords = 
filteredWords.map(word => (word, 1))) seem to have consistent times as during 
the start of the job which seems to indicate that this code is fine.Also, the 
waiting batches is either at 0 or 1 even after 8 to 10 hours.
Based on the information that map is as fast as during the start of job and 
that there is no waiting batches, I am assuming that the getCallSite stages 
correspond to getting data out of Kafka? Is this correct or not?If my 
assumption is correct, Is there anything that I could do to optimize receiving 
data from Kafka?If not, which part of my code needs to be optimized to reduce 
the scheduling delay?
Thanks,RK

Reply via email to