When we enable checkpoint and use JsonRDD we get the following error: Is this bug ?
Exception in thread "main" java.lang.NullPointerException at org.apache.spark.rdd.RDD.<init>(RDD.scala:125) at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:103) at org.apache.spark.sql.SQLContext.applySchema(SQLContext.scala:132) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:194) at SparkStreamingToParquet$$anonfun$createContext$1.apply(SparkStreamingToParquet.scala:69) at SparkStreamingToParquet$$anonfun$createContext$1.apply(SparkStreamingToParquet.scala:63) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ============================= import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.catalyst.types.{StructType, StructField, StringType} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{Logging, SparkConf} import org.apache.spark.sql.api.java.JavaSchemaRDD import org.apache.spark.sql.hive.api.java.JavaHiveContext import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} object SparkStreamingToParquet extends Logging { /** * * @param args * @throws Exception */ def main(args: Array[String]) { if (args.length < 3) { logInfo("Please provide valid parameters: <hdfsFilesLocation: hdfs://ip:8020/user/hdfs/--/> <IMPALAtableloc hdfs://ip:8020/user/hive/--/> <tablename>") logInfo("make user you give full folder path with '/' at the end i.e /user/hdfs/abc/") System.exit(1) } val HDFS_FILE_LOC = args(0) val IMPALA_TABLE_LOC = args(1) val TEMP_TABLE_NAME = args(2) val CHECKPOINT_DIR = args(3) val jssc: StreamingContext = StreamingContext.getOrCreate(CHECKPOINT_DIR, ()=>{ createContext(args) }) jssc.start jssc.awaitTermination } def createContext(args:Array[String]): StreamingContext = { val HDFS_FILE_LOC = args(0) val IMPALA_TABLE_LOC = args(1) val TEMP_TABLE_NAME = args(2) val CHECKPOINT_DIR = args(3) val sparkConf: SparkConf = new SparkConf().setAppName("Json to Parquet").set("spark.cores.max", "3") val jssc: StreamingContext = new StreamingContext(sparkConf, new Duration(30000)) val hivecontext: HiveContext = new HiveContext(jssc.sparkContext) hivecontext.createParquetFile[Person](IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME); val schemaString = "name age" val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val textFileStream = jssc.textFileStream(HDFS_FILE_LOC) textFileStream.foreachRDD(rdd => { if(rdd !=null && rdd.count()>0) { val schRdd = hivecontext.jsonRDD(rdd,schema) logInfo("inserting into table: " + TEMP_TABLE_NAME) schRdd.insertInto(TEMP_TABLE_NAME) } }) jssc.checkpoint(CHECKPOINT_DIR) jssc } } ==== case class Person(name:String, age:String) extends Serializable Regards, Madhu jahagirdar ________________________________ The information contained in this message may be confidential and legally protected under applicable law. The message is intended solely for the addressee(s). If you are not the intended recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction of this message is strictly prohibited and may be unlawful. If you are not the intended recipient, please contact the sender by return e-mail and destroy all copies of the original message.