hello,

Code:
ZkState zkState = new ZkState(kafkaConfig);
DynamicBrokersReader kafkaBrokerReader = new
DynamicBrokersReader(kafkaConfig, zkState);
int partionCount = kafkaBrokerReader.getNumPartitions();

SparkConf _sparkConf = new SparkConf().setAppName("KafkaReceiver");

final JavaStreamingContext ssc = new JavaStreamingContext(_sparkConf, new
Duration(1000));

final JavaReceiverInputDStream<String> inputStream = ssc.receiverStream(new
KafkaReceiver(_props, partionCount));
final JavaSQLContext sqlContext = new
org.apache.spark.sql.api.java.JavaSQLContext(ssc.sparkContext());
inputStream.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
@Override
      public Void call(JavaRDD<String> rdd, Time time) throws Exception {

 if(rdd != null) {
JavaSchemaRDD schemaObject = sqlContext.jsonRDD(rdd);
schemaObject.saveAsParquetFile("tweet" + time.toString() + ".parquet");
System.out.println("File Saved Successfully");
}else {
System.out.println("rdd is empty");
}

Error:
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:806)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:806)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:40)
at
org.apache.spark.sql.api.java.JavaSQLContext.jsonRDD(JavaSQLContext.scala:123)
at consumer.kafka.client.Consumer$1.call(Consumer.java:115)
at consumer.kafka.client.Consumer$1.call(Consumer.java:109)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Please suggest me solution.
Thanks in Advance.



On Thu, Aug 7, 2014 at 1:06 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> You can use SparkSQL for that very easily. You can convert the rdds you
> get from kafka input stream, convert them to a RDDs of case classes and
> save as parquet files.
> More information here.
>
> https://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
>
>
> On Wed, Aug 6, 2014 at 5:23 AM, Mahebub Sayyed <mahebub...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have referred link "https://github.com/dibbhatt/kafka-spark-consumer";
>> and I have successfully consumed tuples from kafka.
>> Tuples are JSON objects and I want to store that objects in HDFS as
>> parque format.
>>
>> Please suggest me any sample example for that.
>> Thanks in advance.
>>
>>
>>
>>
>>
>> On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> You can try this Kafka Spark Consumer which I recently wrote. This uses
>>> the Low Level Kafka Consumer
>>>
>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>
>>> Dibyendu
>>>
>>>
>>>
>>>
>>> On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s <rafeeq.ec...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am new to Apache Spark and Trying to Develop spark streaming program
>>>> to  *stream data from kafka topics and output as parquet file on HDFS*.
>>>>
>>>> Please share the *sample reference* program to stream data from kafka
>>>> topics and output as parquet file on HDFS.
>>>>
>>>> Thanks in Advance.
>>>>
>>>> Regards,
>>>>
>>>> Rafeeq S
>>>> *(“What you do is what matters, not what you think or say or plan.” )*
>>>>
>>>>
>>>
>>
>>
>> --
>>
>>
>

Reply via email to