I used to hit a NPE when i don't add all the dependency jars to my context while running it in standalone mode. Can you try adding all these dependencies to your context?
sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.1.jar") sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.1.1.jar") sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar") sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar") Thanks Best Regards On Fri, May 22, 2015 at 5:20 PM, Guillermo Ortiz <konstt2...@gmail.com> wrote: > Hi, > > I'm trying to connect to two topics of Kafka with Spark with DirectStream > but I get an error. I don't know if there're any limitation to do it, > because when I just access to one topics everything if right. > > * val ssc = new StreamingContext(sparkConf, Seconds(5)) * > * val kafkaParams = Map[String, String]("metadata.broker.list" -> > "quickstart.cloudera:9092")* > * val setTopic1 = Set("topic1")* > * val setTopic2 = Set("topic2")* > > * val stream1 = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)* > * val stream2 = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)* > > > The error that I get is: > * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314* > *15/05/22 13:12:40 ERROR OneForOneStrategy: * > *java.lang.NullPointerException* > * at > org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* > * at > org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* > * at > scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)* > * at > scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* > * at > scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)* > * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)* > > > Are there any limitation to do it? >