Could you also provide the code where you set up the Kafka dstream? I dont see it in the snippet.
On Fri, Jun 26, 2015 at 2:45 PM, Ashish Nigam <[email protected]> wrote: > Here's code - > > def createStreamingContext(checkpointDirectory: String) : > StreamingContext = { > > val conf = new SparkConf().setAppName("KafkaConsumer") > > conf.set("spark.eventLog.enabled", "false") > > logger.info("Going to init spark context") > > conf.getOption("spark.master") match { > > case Some(master) => println(master) > > case None => conf.setMaster("local[*]") > > } > > val sc = new SparkContext(conf) > > // Create a StreamingContext with a 5 second batch size > > val ssc = new StreamingContext(sc, Seconds(5)) > > ssc.checkpoint(checkpointDirectory) > > ssc > > } > > > And here's how it is being called - > > > val ssc = StreamingContext.getOrCreate(checkpointDir, () => { > > createStreamingContext(checkpointDir) > > }) > > > On Fri, Jun 26, 2015 at 2:05 PM, Cody Koeninger <[email protected]> > wrote: > >> Make sure you're following the docs regarding setting up a streaming >> checkpoint. >> >> Post your code if you can't get it figured out. >> >> On Fri, Jun 26, 2015 at 3:45 PM, Ashish Nigam <[email protected]> >> wrote: >> >>> I bring up spark streaming job that uses Kafka as input source. >>> No data to process and then shut it down. And bring it back again. >>> This time job does not start because it complains that DStream is not >>> initialized. >>> >>> 15/06/26 01:10:44 ERROR yarn.ApplicationMaster: User class threw >>> exception: org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has >>> not been initialized >>> >>> org.apache.spark.SparkException: >>> org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has not been >>> initialized >>> >>> at >>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>> >>> at scala.Option.orElse(Option.scala:257) >>> >>> at >>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>> >>> at >>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) >>> >>> at >>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) >>> >>> at >>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) >>> >>> at >>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >>> >>> at >>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >>> >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> >>> at >>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >>> >>> at >>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >>> >>> at >>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) >>> >>> at >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> >>> at >>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90) >>> >>> at >>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) >>> >>> at >>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) >>> .......... >>> >>> I am using spark 1.3.1 and spark-streaming-kafka 1.3.1 versions. >>> >>> Any idea how to resolve this issue? >>> >>> Thanks >>> Ashish >>> >>> >> >
