Here is the code for my streaming job.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~val sparkConf = new
SparkConf().setAppName("SparkStreamingJob")
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")sparkConf.set("spark.default.parallelism",
"100")sparkConf.set("spark.shuffle.consolidateFiles",
"true")sparkConf.set("spark.speculation",
"true")sparkConf.set("spark.speculation.interval",
"5000")sparkConf.set("spark.speculation.quantile",
"0.9")sparkConf.set("spark.speculation.multiplier",
"3")sparkConf.set("spark.mesos.coarse",
"true")sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-XX:+UseConcMarkSweepGC")sparkConf.set("spark.shuffle.manager", "SORT")
val ssc = new StreamingContext(sparkConf,
Seconds(10))ssc.checkpoint(checkpointDir)
val topics = "trace"val numThreads = 1val topicMap =
topics.split(",").map((_,numThreads)).toMap
val kafkaPartitions = 20val kafkaDStreams = (1 to kafkaPartitions).map { _ =>
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)}
val lines = ssc.union(kafkaDStreams)val words = lines.map(line =>
doSomething_1(line))val filteredWords = words.filter(word => word != "test")val
groupedWords = filteredWords.map(word => (word, 1))
val windowedWordCounts = groupedWords.reduceByKeyAndWindow(_ + _, _ - _,
Seconds(30), Seconds(10))val windowedWordsFiltered =
windowedWordCounts.filter{case (word, count) => count > 50}val finalResult =
windowedWordsFiltered.foreachRDD(words => doSomething_2(words))
ssc.start()ssc.awaitTermination()~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
I am running this job on a 9 slave AWS EC2 cluster with each slave node has 32
vCPU & 60GB memory.
When I start this job, the processing time is usually around 5 - 6 seconds for
the 10 seconds batch and the scheduling delay is around 0 seconds or a few ms.
However, as the job run for 6 - 8 hours, the processing time increases to 15 -
20 seconds but the scheduling delay is increasing to 4 - 6 hours.
When I look at the completed stages, I see that the time taken for getCallSite
at DStream.scala:294 keeps increasing as time passes by. It goes from around 2
seconds to more than a few minutes.
Clicking on +details next to this stage description shows the following
execution
trace.org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1088)org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:294)org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)scala.Option.orElse(Option.scala:257)org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:285)org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:221)scala.util.Try$.apply(Try.scala:161)org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:221)org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
When I click on one of these slow stages that executed after 6 - 8 hours, I
find the following information for individual tasks inside.
- All tasks seem to execute with PROCESS_LOCAL locality.- Quite a few of these
tasks seem to spend anywhere between 30 - 80% of their time in GC. Although,
when I look at the total memory usage on each of the slave nodes under
executors information, I see that the usage is only around 200MB out of 20GB
available.
Even after a few hours, the map stages (val groupedWords =
filteredWords.map(word => (word, 1))) seem to have consistent times as during
the start of the job which seems to indicate that this code is fine.Also, the
waiting batches is either at 0 or 1 even after 8 to 10 hours.
Based on the information that map is as fast as during the start of job and
that there is no waiting batches, I am assuming that the getCallSite stages
correspond to getting data out of Kafka? Is this correct or not?If my
assumption is correct, Is there anything that I could do to optimize receiving
data from Kafka?If not, which part of my code needs to be optimized to reduce
the scheduling delay?
Thanks,RK