You can enable this flag to run multiple jobs concurrently, It might not be
production ready, but you can give it a try:
sc.set("spark.streaming.concurrentJobs","2")
Refer to TD's answer here
<http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming#answers-header>
for more information.
Thanks
Best Regards
On Wed, Apr 22, 2015 at 8:57 AM, Abhay Bansal <[email protected]>
wrote:
> Hi,
>
> I have use case wherein I have to join multiple kafka topics in parallel.
> So if there are 2n topics there is a one to one mapping of topics which
> needs to be joined.
>
>
> val arr= ...
>
> for(condition) {
>
> val dStream1 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topics1
> ).map(a=>(getKey1(a._2),a._2))
>
> val dStream2 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topics2
> ).map(a=>(getKey2(a._2),a._2))
>
> arr(counter) = (dStream1, dStream2);
>
> counter+=1;
>
> }
>
>
>
> arr.par.foreach {
>
> case(dStream1, dStream2) => try {
>
> val joined = dStream1.join(dStream2,4);
>
> joined.saveAsTextFiles("joinedData”)
>
> }
>
> catch {
>
> case t:Exception =>t.printStackTrace();
>
> }
>
> }
>
>
>
> ssc.start()
>
> ssc.awaitTermination()
>
>
> Doing so the streams are getting joined by sequentially. Is there a way
> out of this? I am new to spark, would appreciate any suggestions around
> this.
>
>
> Thanks,
>
> -Abhay
>