Hi,
I have use case wherein I have to join multiple kafka topics in parallel.
So if there are 2n topics there is a one to one mapping of topics which
needs to be joined.
val arr= ...
for(condition) {
val dStream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics1).map(a=>(getKey1(a.
_2),a._2))
val dStream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics2).map(a=>(getKey2(a.
_2),a._2))
arr(counter) = (dStream1, dStream2);
counter+=1;
}
arr.par.foreach {
case(dStream1, dStream2) => try {
val joined = dStream1.join(dStream2,4);
joined.saveAsTextFiles("joinedData”)
}
catch {
case t:Exception =>t.printStackTrace();
}
}
ssc.start()
ssc.awaitTermination()
Doing so the streams are getting joined by sequentially. Is there a way out
of this? I am new to spark, would appreciate any suggestions around this.
Thanks,
-Abhay