Hi Burak, Are you saying get rid of both
query1.awaitTermination(); query2.awaitTermination(); and just have the line below? sparkSession.streams().awaitAnyTermination(); Thanks! On Wed, Sep 20, 2017 at 12:51 AM, kant kodali <kanth...@gmail.com> wrote: > If I don't get anywhere after query1.awaitTermination(); > > Then I cannot put this sparkSession.streams().awaitAnyTermination(); as > the last line of code right? Like below > > query1.awaitTermination(); > sparkSession.streams().awaitAnyTermination(); > > > On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz <brk...@gmail.com> wrote: > >> Please remove >> >> query1.awaitTermination(); >> query2.awaitTermination(); >> >> once >> >> query1.awaitTermination(); >> >> is called, you don't even get to query2.awaitTermination(). >> >> >> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi Burak, >>> >>> Thanks much! had no clue that existed. Now, I changed it to this. >>> >>> StreamingQuery query1 = >>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello1")).start(); >>> StreamingQuery query2 = >>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>> KafkaSink("hello2")).start(); >>> >>> query1.awaitTermination(); >>> query2.awaitTermination(); >>> sparkSession.streams().awaitAnyTermination(); >>> >>> >>> >>> >>> >>> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote: >>> >>>> Hey Kant, >>>> >>>> That won't work either. Your second query may fail, and as long as your >>>> first query is running, you will not know. Put this as the last line >>>> instead: >>>> >>>> spark.streams.awaitAnyTermination() >>>> >>>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Looks like my problem was the order of awaitTermination() for some >>>>> reason. >>>>> >>>>> *Doesn't work * >>>>> >>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>> KafkaSink("hello1")).start().awaitTermination() >>>>> >>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>> KafkaSink("hello2")).start().awaitTermination() >>>>> >>>>> *Works* >>>>> >>>>> StreamingQuery query1 = outputDS1.writeStream().trigge >>>>> r(Trigger.processingTime(1000)).foreach(new >>>>> KafkaSink("hello1")).start(); >>>>> >>>>> query1.awaitTermination() >>>>> >>>>> StreamingQuery query2 =outputDS2.writeStream().trigg >>>>> er(Trigger.processingTime(1000)).foreach(new >>>>> KafkaSink("hello2")).start(); >>>>> >>>>> query2.awaitTermination() >>>>> >>>>> >>>>> >>>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Looks like my problem was the order of awaitTermination() for some >>>>>> reason. >>>>>> >>>>>> Doesn't work >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I have the following Psuedo code (I could paste the real code >>>>>>> however it is pretty long and involves Database calls inside dataset.map >>>>>>> operation and so on) so I am just trying to simplify my question. would >>>>>>> like to know if there is something wrong with the following pseudo code? >>>>>>> >>>>>>> DataSet<String> inputDS = readFromKaka(topicName) >>>>>>> >>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >>>>>>> Since I can see data getting populated >>>>>>> >>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works >>>>>>> as well >>>>>>> >>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // >>>>>>> Doesn't work >>>>>>> >>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>>> KafkaSink("hello1")).start().awaitTermination() >>>>>>> >>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>>> KafkaSink("hello2")).start().awaitTermination() >>>>>>> >>>>>>> >>>>>>> *So what's happening with above code is that I can see data coming >>>>>>> out of hello1 topic but not from hello2 topic.* I thought there is >>>>>>> something wrong with "outputDS2" so I switched the order so now the >>>>>>> code >>>>>>> looks like this >>>>>>> >>>>>>> DataSet<String> inputDS = readFromKaka(topicName) >>>>>>> >>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >>>>>>> Since I can see data getting populated >>>>>>> >>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This >>>>>>> Works >>>>>>> >>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't >>>>>>> work >>>>>>> >>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>>> KafkaSink("hello1")).start().awaitTermination() >>>>>>> >>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>>> KafkaSink("hello2")).start().awaitTermination() >>>>>>> >>>>>>> *Now I can see data coming out from hello2 kafka topic but not from >>>>>>> hello1 topic*. *In short, I can only see data from outputDS1 or >>>>>>> outputDS2 but not both. * At this point I am not sure what is going >>>>>>> on? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >