Resending, not sure if had sent to user@spark.apache.org earlier.
Thanks, Arijit ________________________________ From: Arijit <arij...@live.com> Sent: Friday, October 7, 2016 6:06 PM To: user@spark.apache.org Subject: Issue with Spark Streaming with checkpointing in Spark 2.0 In a Spark Streaming sample code I am trying to implicitly convert an RDD to DS and save to permanent storage. Below is the snippet of the code I am trying to run. The job runs fine first time when started with the checkpoint directory empty. However, if I kill and restart the job with the same checkpoint directory I get the following error resulting in job failure: 16/10/07 23:42:50 ERROR JobScheduler: Error running job streaming job 1475883550000 ms.0 java.lang.NullPointerException at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163) at com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72) at com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/10/07 23:42:50 INFO SparkContext: Starting job: print at EventhubsToAzureBlobAsJSON.scala:93 Does anyone have any sample recoverable Spark Streaming code using Spark Session constructs of 2.0? object EventhubsToAzureBlobAsJSON { def createStreamingContext(inputOptions: ArgumentMap): StreamingContext = { ..... val sparkSession : SparkSession = SparkSession.builder.config(sparkConfiguration).getOrCreate import sparkSession.implicits._ val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int])) streamingContext.checkpoint(inputOptions(Symbol(EventhubsArgumentKeys.CheckpointDirectory)).asInstanceOf[String]) val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters) val eventHubsWindowedStream = eventHubsStream .window(Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int])) /** * This fails on restart */ eventHubsWindowedStream.map(x => EventContent(new String(x))) .foreachRDD(rdd => rdd.toDS.toJSON.write.mode(SaveMode.Overwrite) .save(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder)) .asInstanceOf[String])) /** * This runs fine on restart */ /* eventHubsWindowedStream.map(x => EventContent(new String(x))) .foreachRDD(rdd => rdd.saveAsTextFile(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder)) .asInstanceOf[String], classOf[GzipCodec])) */ ..... } def main(inputArguments: Array[String]): Unit = { val inputOptions = EventhubsArgumentParser.parseArguments(Map(), inputArguments.toList) EventhubsArgumentParser.verifyEventhubsToAzureBlobAsJSONArguments(inputOptions) //Create or recreate streaming context val streamingContext = StreamingContext .getOrCreate(inputOptions(Symbol(EventhubsArgumentKeys.CheckpointDirectory)).asInstanceOf[String], () => createStreamingContext(inputOptions)) streamingContext.start() if(inputOptions.contains(Symbol(EventhubsArgumentKeys.TimeoutInMinutes))) { streamingContext.awaitTerminationOrTimeout(inputOptions(Symbol(EventhubsArgumentKeys.TimeoutInMinutes)) .asInstanceOf[Long] * 60 * 1000) } else { streamingContext.awaitTermination() } } } Thanks, Arijit