spark version - spark 1.4.1
my code snippet:
String brokers = "ip:port,ip:port";
String topics = "x,y,z";
HashSet<String> TopicsSet = new
HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
TopicsSet
);
messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () {
public Void call(JavaPairRDD<String , String> tuple) {
JavaRDD<String>rdd = tuple.values();
rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
return null;
}
});
*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>
On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <[email protected]> wrote:
> What Spark version are you using, also a small code snippet of how you use
> Spark Streaming would be greatly helpful.
>
> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <[email protected]>
> wrote:
>
>> I can able to read and print few lines. Afterthat i'm getting this
>> exception. Any idea for this ?
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read from kafka stream and printing it textfile. I'm using
>>> java over spark. I dont know why i'm getting the following exception.
>>> Also exception message is very abstract. can anyone please help me ?
>>>
>>> Log Trace :
>>>
>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>> at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>> at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>> at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>> at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>> exception: java.lang.NullPointerException
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>> at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>> at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>> at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>> at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>
>>>
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>
>