I just verified that the following code works on 1.3.0 :
val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic1)
val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic2)
stream1.print()
stream2.print()
So something else is probably going on in your case. See if simply
printing the two streams works for you, then compare whats different in
your actual job.
On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz <[email protected]>
wrote:
> Hi,
>
> I'm trying to connect to two topics of Kafka with Spark with DirectStream
> but I get an error. I don't know if there're any limitation to do it,
> because when I just access to one topics everything if right.
>
> * val ssc = new StreamingContext(sparkConf, Seconds(5)) *
> * val kafkaParams = Map[String, String]("metadata.broker.list" ->
> "quickstart.cloudera:9092")*
> * val setTopic1 = Set("topic1")*
> * val setTopic2 = Set("topic2")*
>
> * val stream1 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
> * val stream2 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*
>
>
> The error that I get is:
> * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
> *15/05/22 13:12:40 ERROR OneForOneStrategy: *
> *java.lang.NullPointerException*
> * at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
> * at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
> * at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
> * at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
> * at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
> * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*
>
>
> Are there any limitation to do it?
>