I bring up spark streaming job that uses Kafka as input source.
No data to process and then shut it down. And bring it back again.
This time job does not start because it complains that DStream is not
initialized.

15/06/26 01:10:44 ERROR yarn.ApplicationMaster: User class threw exception:
org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has not been
initialized

org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has not been
initialized

        at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)

        at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)

        at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)

        at scala.Option.orElse(Option.scala:257)

        at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)

        at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)

        at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)

        at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)

        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

        at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

        at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

        at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

        at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)

        at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)

        at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)

        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

        at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

        at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)

        at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90)

        at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)

        at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
               ..........

I am using spark 1.3.1 and spark-streaming-kafka 1.3.1 versions.

Any idea how to resolve this issue?

Thanks
Ashish

Reply via email to