hello, Code: ZkState zkState = new ZkState(kafkaConfig); DynamicBrokersReader kafkaBrokerReader = new DynamicBrokersReader(kafkaConfig, zkState); int partionCount = kafkaBrokerReader.getNumPartitions();
SparkConf _sparkConf = new SparkConf().setAppName("KafkaReceiver"); final JavaStreamingContext ssc = new JavaStreamingContext(_sparkConf, new Duration(1000)); final JavaReceiverInputDStream<String> inputStream = ssc.receiverStream(new KafkaReceiver(_props, partionCount)); final JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(ssc.sparkContext()); inputStream.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() { @Override public Void call(JavaRDD<String> rdd, Time time) throws Exception { if(rdd != null) { JavaSchemaRDD schemaObject = sqlContext.jsonRDD(rdd); schemaObject.saveAsParquetFile("tweet" + time.toString() + ".parquet"); System.out.println("File Saved Successfully"); }else { System.out.println("rdd is empty"); } Error: java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:806) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:40) at org.apache.spark.sql.api.java.JavaSQLContext.jsonRDD(JavaSQLContext.scala:123) at consumer.kafka.client.Consumer$1.call(Consumer.java:115) at consumer.kafka.client.Consumer$1.call(Consumer.java:109) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Please suggest me solution. Thanks in Advance. On Thu, Aug 7, 2014 at 1:06 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > You can use SparkSQL for that very easily. You can convert the rdds you > get from kafka input stream, convert them to a RDDs of case classes and > save as parquet files. > More information here. > > https://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files > > > On Wed, Aug 6, 2014 at 5:23 AM, Mahebub Sayyed <mahebub...@gmail.com> > wrote: > >> Hello, >> >> I have referred link "https://github.com/dibbhatt/kafka-spark-consumer" >> and I have successfully consumed tuples from kafka. >> Tuples are JSON objects and I want to store that objects in HDFS as >> parque format. >> >> Please suggest me any sample example for that. >> Thanks in advance. >> >> >> >> >> >> On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya < >> dibyendu.bhattach...@gmail.com> wrote: >> >>> You can try this Kafka Spark Consumer which I recently wrote. This uses >>> the Low Level Kafka Consumer >>> >>> https://github.com/dibbhatt/kafka-spark-consumer >>> >>> Dibyendu >>> >>> >>> >>> >>> On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s <rafeeq.ec...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I am new to Apache Spark and Trying to Develop spark streaming program >>>> to *stream data from kafka topics and output as parquet file on HDFS*. >>>> >>>> Please share the *sample reference* program to stream data from kafka >>>> topics and output as parquet file on HDFS. >>>> >>>> Thanks in Advance. >>>> >>>> Regards, >>>> >>>> Rafeeq S >>>> *(“What you do is what matters, not what you think or say or plan.” )* >>>> >>>> >>> >> >> >> -- >> >> >