Hi Burak, Thanks much! had no clue that existed. Now, I changed it to this.
StreamingQuery query1 = outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start(); StreamingQuery query2 = outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start(); query1.awaitTermination(); query2.awaitTermination(); sparkSession.streams().awaitAnyTermination(); On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote: > Hey Kant, > > That won't work either. Your second query may fail, and as long as your > first query is running, you will not know. Put this as the last line > instead: > > spark.streams.awaitAnyTermination() > > On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> wrote: > >> Looks like my problem was the order of awaitTermination() for some reason. >> >> *Doesn't work * >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> *Works* >> >> StreamingQuery query1 = outputDS1.writeStream().trigge >> r(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start(); >> >> query1.awaitTermination() >> >> StreamingQuery query2 =outputDS2.writeStream().trigg >> er(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start(); >> >> query2.awaitTermination() >> >> >> >> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Looks like my problem was the order of awaitTermination() for some >>> reason. >>> >>> Doesn't work >>> >>> >>> >>> >>> >>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I have the following Psuedo code (I could paste the real code however >>>> it is pretty long and involves Database calls inside dataset.map operation >>>> and so on) so I am just trying to simplify my question. would like to know >>>> if there is something wrong with the following pseudo code? >>>> >>>> DataSet<String> inputDS = readFromKaka(topicName) >>>> >>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >>>> Since I can see data getting populated >>>> >>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as >>>> well >>>> >>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't >>>> work >>>> >>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello1")).start().awaitTermination() >>>> >>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello2")).start().awaitTermination() >>>> >>>> >>>> *So what's happening with above code is that I can see data coming out >>>> of hello1 topic but not from hello2 topic.* I thought there is >>>> something wrong with "outputDS2" so I switched the order so now the code >>>> looks like this >>>> >>>> DataSet<String> inputDS = readFromKaka(topicName) >>>> >>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >>>> Since I can see data getting populated >>>> >>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This >>>> Works >>>> >>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't >>>> work >>>> >>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello1")).start().awaitTermination() >>>> >>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello2")).start().awaitTermination() >>>> >>>> *Now I can see data coming out from hello2 kafka topic but not from >>>> hello1 topic*. *In short, I can only see data from outputDS1 or >>>> outputDS2 but not both. * At this point I am not sure what is going on? >>>> >>>> Thanks! >>>> >>>> >>>> >>> >> >