Hi, I have a simple application which fails with the following exception only when the application is restarted (i.e. the checkpointDir has entires from a previous execution):
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@2264e43c has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at com.brightcove.analytics.tacoma.RawLogProcessor$.start(RawLogProcessor.scala:115) at com.brightcove.analytics.tacoma.Main$delayedInit$body.apply(Main.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at com.brightcove.analytics.tacoma.Main$.main(Main.scala:5) at com.brightcove.analytics.tacoma.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The relavant source is: class RawLogProcessor(ssc: StreamingContext, topic: String, kafkaParams: Map[String, String]) { // create kafka stream val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Set(topic)) //KafkaUtils.createStream[String, Object, StringDecoder, KafkaAvroDecoder](ssc, kafkaParams, Map("qa-rawlogs" -> 10), StorageLevel.MEMORY_AND_DISK_2) val eventStream = rawlogDStream .map({ case (key, rawlogVal) => val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get("id").asInstanceOf[String]) .setAccount(record.get("account").asInstanceOf[String]) .setEvent(record.get("event").asInstanceOf[String]) .setTimestamp(record.get("timestamp").asInstanceOf[Long]) .setUserAgent(record.get("user_agent").asInstanceOf[String]) .setParams(record.get("params").asInstanceOf[java.util.Map[String, String]]) .build() val norm = Normalizer(rlog) (key, rlog.getEvent, norm) }) val videoViewStream = eventStream .filter(_._2 == "video_view") .filter(_._3.isDefined) .map((z) => (z._1, z._3.get)) .map((z) => (z._1, z._2.asInstanceOf[VideoView])) .cache() // repartition by (deviceType, DeviceOS) val deviceTypeVideoViews = videoViewStream.map((v) => ((v._2.getDeviceType, v._2.getDeviceOs), 1)) .reduceByKeyAndWindow(_ + _, Durations.seconds(10)) .print() } object RawLogProcessor extends Logging { /** * If str is surrounded by quotes it return the content between the quotes */ def unquote(str: String) = { if (str != null && str.length >= 2 && str.charAt(0) == '\"' && str.charAt(str.length - 1) == '\"') str.substring(1, str.length - 1) else str } val checkpointDir = "/tmp/checkpointDir_tacoma" var sparkConfig: Config = _ var ssc: StreamingContext = _ var processor: Option[RawLogProcessor] = None val createContext: () => StreamingContext = () => { val batchDurationSecs = sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS) val sparkConf = new SparkConf() sparkConf.registerKryoClasses(Array(classOf[VideoView], classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression])) sparkConfig.entrySet.asScala .map(kv => kv.getKey -> kv.getValue) .foreach { case (k, v) => val value = unquote(v.render()) logInfo(s"spark.$k = $value") sparkConf.set(s"spark.$k", value) } // calculate sparkContext and streamingContext new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs)) } def createProcessor(sparkConf: Config, kafkaConf: Config): RawLogProcessor = { sparkConfig = sparkConf ssc = StreamingContext.getOrCreate(checkpointPath = checkpointDir, creatingFunc = createContext, createOnError = true) ssc.checkpoint(checkpointDir) // kafkaProperties val kafkaParams = kafkaConf.entrySet.asScala .map(kv => kv.getKey -> unquote(kv.getValue.render())) .toMap logInfo(s"Initializing kafkaParams = $kafkaParams") // create processor new RawLogProcessor(ssc, kafkaConf.getString("rawlog.topic"), kafkaParams) } def apply(sparkConfig: Config, kafkaConf: Config) = { if (processor.isEmpty) { processor = Some(createProcessor(sparkConfig, kafkaConf)) } processor.get } def start() = { ssc.start() ssc.awaitTermination() } } Extended logs: https://gist.githubusercontent.com/ankurcha/f35df63f0d8a99da0be4/raw/ec96b932540ac87577e4ce8385d26699c1a7d05e/spark-console.log Could someone tell me what it causes this problem? I tried looking at the stacktrace but I am not very familiar with the codebase to make solid assertions. Any ideas as to what may be happening here. --- Ankur Chauhan
signature.asc
Description: Message signed with OpenPGP using GPGMail