Just tried with sparkSession.streams().awaitAnyTermination(); And thats the only await* I had and it works! But what if I don't want all my queries to fail or stop making progress if one of them fails?
On Wed, Sep 20, 2017 at 2:26 AM, kant kodali <kanth...@gmail.com> wrote: > Hi Burak, > > Are you saying get rid of both > > query1.awaitTermination(); > query2.awaitTermination(); > > and just have the line below? > > sparkSession.streams().awaitAnyTermination(); > > Thanks! > > > On Wed, Sep 20, 2017 at 12:51 AM, kant kodali <kanth...@gmail.com> wrote: > >> If I don't get anywhere after query1.awaitTermination(); >> >> Then I cannot put this sparkSession.streams().awaitAnyTermination(); as >> the last line of code right? Like below >> >> query1.awaitTermination(); >> sparkSession.streams().awaitAnyTermination(); >> >> >> On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz <brk...@gmail.com> wrote: >> >>> Please remove >>> >>> query1.awaitTermination(); >>> query2.awaitTermination(); >>> >>> once >>> >>> query1.awaitTermination(); >>> >>> is called, you don't even get to query2.awaitTermination(). >>> >>> >>> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com> >>> wrote: >>> >>>> Hi Burak, >>>> >>>> Thanks much! had no clue that existed. Now, I changed it to this. >>>> >>>> StreamingQuery query1 = >>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello1")).start(); >>>> StreamingQuery query2 = >>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello2")).start(); >>>> >>>> query1.awaitTermination(); >>>> query2.awaitTermination(); >>>> sparkSession.streams().awaitAnyTermination(); >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote: >>>> >>>>> Hey Kant, >>>>> >>>>> That won't work either. Your second query may fail, and as long as >>>>> your first query is running, you will not know. Put this as the last line >>>>> instead: >>>>> >>>>> spark.streams.awaitAnyTermination() >>>>> >>>>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Looks like my problem was the order of awaitTermination() for some >>>>>> reason. >>>>>> >>>>>> *Doesn't work * >>>>>> >>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>> KafkaSink("hello1")).start().awaitTermination() >>>>>> >>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>> KafkaSink("hello2")).start().awaitTermination() >>>>>> >>>>>> *Works* >>>>>> >>>>>> StreamingQuery query1 = outputDS1.writeStream().trigge >>>>>> r(Trigger.processingTime(1000)).foreach(new >>>>>> KafkaSink("hello1")).start(); >>>>>> >>>>>> query1.awaitTermination() >>>>>> >>>>>> StreamingQuery query2 =outputDS2.writeStream().trigg >>>>>> er(Trigger.processingTime(1000)).foreach(new >>>>>> KafkaSink("hello2")).start(); >>>>>> >>>>>> query2.awaitTermination() >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Looks like my problem was the order of awaitTermination() for some >>>>>>> reason. >>>>>>> >>>>>>> Doesn't work >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I have the following Psuedo code (I could paste the real code >>>>>>>> however it is pretty long and involves Database calls inside >>>>>>>> dataset.map >>>>>>>> operation and so on) so I am just trying to simplify my question. would >>>>>>>> like to know if there is something wrong with the following pseudo >>>>>>>> code? >>>>>>>> >>>>>>>> DataSet<String> inputDS = readFromKaka(topicName) >>>>>>>> >>>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // >>>>>>>> Works Since I can see data getting populated >>>>>>>> >>>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works >>>>>>>> as well >>>>>>>> >>>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // >>>>>>>> Doesn't work >>>>>>>> >>>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>>>> KafkaSink("hello1")).start().awaitTermination() >>>>>>>> >>>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>>>> KafkaSink("hello2")).start().awaitTermination() >>>>>>>> >>>>>>>> >>>>>>>> *So what's happening with above code is that I can see data coming >>>>>>>> out of hello1 topic but not from hello2 topic.* I thought there is >>>>>>>> something wrong with "outputDS2" so I switched the order so now the >>>>>>>> code >>>>>>>> looks like this >>>>>>>> >>>>>>>> DataSet<String> inputDS = readFromKaka(topicName) >>>>>>>> >>>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // >>>>>>>> Works Since I can see data getting populated >>>>>>>> >>>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This >>>>>>>> Works >>>>>>>> >>>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // >>>>>>>> Desn't work >>>>>>>> >>>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>>>> KafkaSink("hello1")).start().awaitTermination() >>>>>>>> >>>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>>>> KafkaSink("hello2")).start().awaitTermination() >>>>>>>> >>>>>>>> *Now I can see data coming out from hello2 kafka topic but not from >>>>>>>> hello1 topic*. *In short, I can only see data from outputDS1 or >>>>>>>> outputDS2 but not both. * At this point I am not sure what is >>>>>>>> going on? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >