never mind, one of my peers correct the driver program for me - all dstream operations need to be within the scope of getOrCreate API
On Wed, Dec 9, 2015 at 3:32 PM, Renyi Xiong <renyixio...@gmail.com> wrote: > following scala program throws same exception, I know people are running > streaming jobs against kafka, I must be missing something. any idea why? > > package org.apache.spark.streaming.api.csharp > > import java.util.HashMap > > import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder} > > import org.apache.spark.streaming._ > import org.apache.spark.streaming.kafka._ > import org.apache.spark.SparkConf > > object ScalaSML { > def main(args: Array[String]) { > > val checkpointPath = > "hdfs://SparkMasterVIP.AdsOISCP-Sandbox-Ch1d.CH1D.ap.gbl/checkpoint/ScalaSML/HK2" > val sparkConf = new SparkConf().setAppName("ScalaSML") > val ssc = StreamingContext.getOrCreate(checkpointPath, () => { > val context = new StreamingContext(sparkConf, Seconds(60)) > context.checkpoint(checkpointPath) > context > }) > > val kafkaParams = Map("metadata.broker.list" -> "...", > "auto.offset.reset" -> "largest") > > val topics = Set("topic") > > val ds = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], > DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics) > ds.foreachRDD((rdd, time) => println("Time: " + time + " Count: " + > rdd.count())) > > ssc.start() > ssc.awaitTermination() > } > } > > 15/12/09 15:22:43 ERROR StreamingContext: Error starting the context, > marking it as stopped > org.apache.spark.SparkException: > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@15ce2c0 has not > been initialized > at > org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321) > at > org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:83) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) > at > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92) > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:593) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:591) > at > org.apache.spark.streaming.api.csharp.ScalaSML$.main(ScalaSML.scala:48) > at > org.apache.spark.streaming.api.csharp.ScalaSML.main(ScalaSML.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) > at > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > On Wed, Dec 9, 2015 at 12:45 PM, Renyi Xiong <renyixio...@gmail.com> > wrote: > >> hi, >> >> I met following exception when the driver program tried to recover from >> checkpoint, looks like the logic relies on zeroTime being set which doesn't >> seem to happen here. am I missing anything or is it a bug in 1.4.1? >> >> org.apache.spark.SparkException: >> org.apache.spark.streaming.api.csharp.CSharpTransformed2DStream@161f0d27 >> has not been initialized >> at >> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) >> at >> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >> at >> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >> at >> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at >> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) >> at >> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92) >> at >> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) >> at >> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:596) >> at >> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:145) >> at >> org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:90) >> at >> org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:25) >> at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >> at >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> at >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) >> at >> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) >> at java.lang.Thread.run(Thread.java:724) >> > >