No, i dont have any special settings. if i keep only reading line in my
code, it's throwing NPE.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> Do you have any special settings, from your code, I don't think it will
> incur NPE at that place.
>
> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> spark version - spark 1.4.1
>>
>> my code snippet:
>>
>> String brokers = "ip:port,ip:port";
>> String topics = "x,y,z";
>> HashSet<String> TopicsSet = new
>> HashSet<String>(Arrays.asList(topics.split(",")));
>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>> kafkaParams.put("metadata.broker.list", brokers);
>>
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream(
>>            jssc,
>>            String.class,
>>            String.class,
>>            StringDecoder.class,
>>            StringDecoder.class,
>>            kafkaParams,
>>             TopicsSet
>>        );
>>
>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () {
>>             public Void call(JavaPairRDD<String , String> tuple) {
>>                 JavaRDD<String>rdd = tuple.values();
>>                 rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>                 return null;
>>             }
>>        });
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> What Spark version are you using, also a small code snippet of how you
>>> use Spark Streaming would be greatly helpful.
>>>
>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>> wrote:
>>>
>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>> exception. Any idea for this ?
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>> using java over spark. I dont know why i'm getting the following 
>>>>> exception.
>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>
>>>>> Log Trace :
>>>>>
>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>>>> java.lang.NullPointerException
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>         at
>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>         at
>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>         at
>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>         at
>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>         at
>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>> exception: java.lang.NullPointerException
>>>>> java.lang.NullPointerException
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>         at
>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>         at
>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>         at
>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>         at
>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>         at
>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>
>>>>>
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to