Hi,

I have use case wherein I have to join multiple kafka topics in parallel.
So if there are 2n topics there is a one to one mapping of topics which
needs to be joined.


    val arr= ...

    for(condition) {

        val dStream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics1).map(a=>(getKey1(a.
_2),a._2))

        val dStream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics2).map(a=>(getKey2(a.
_2),a._2))

       arr(counter) = (dStream1, dStream2);

       counter+=1;

    }



    arr.par.foreach {

            case(dStream1, dStream2) => try {

                val joined = dStream1.join(dStream2,4);

                joined.saveAsTextFiles("joinedData”)

            }

            catch {

                case t:Exception =>t.printStackTrace();

            }

        }



    ssc.start()

    ssc.awaitTermination()


Doing so the streams are getting joined by sequentially. Is there a way out
of this? I am new to spark, would appreciate any suggestions around this.


Thanks,

-Abhay

Reply via email to