I used to hit a NPE when i don't add all the dependency jars to my context
while running it in standalone mode. Can you try adding all these
dependencies to your context?

    
sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.1.jar")
   
sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.1.1.jar")
   
sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar")
   
sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar")


Thanks
Best Regards

On Fri, May 22, 2015 at 5:20 PM, Guillermo Ortiz <konstt2...@gmail.com>
wrote:

> Hi,
>
> I'm trying to connect to two topics of Kafka with Spark with DirectStream
> but I get an error. I don't know if there're any limitation to do it,
> because when I just access to one topics everything if right.
>
> *    val ssc = new StreamingContext(sparkConf, Seconds(5))    *
> *    val kafkaParams = Map[String, String]("metadata.broker.list" ->
> "quickstart.cloudera:9092")*
> *    val setTopic1 = Set("topic1")*
> *    val setTopic2 = Set("topic2")*
>
> *    val stream1 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
> *    val stream2 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*
>
>
> The error that I get is:
> * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
> *15/05/22 13:12:40 ERROR OneForOneStrategy: *
> *java.lang.NullPointerException*
> * at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
> * at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
> * at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
> * at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
> * at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
> * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*
>
>
> Are there any limitation to do it?
>

Reply via email to