If I don't get anywhere after query1.awaitTermination(); Then I cannot put this sparkSession.streams().awaitAnyTermination(); as the last line of code right? Like below
query1.awaitTermination(); sparkSession.streams().awaitAnyTermination(); On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz <brk...@gmail.com> wrote: > Please remove > > query1.awaitTermination(); > query2.awaitTermination(); > > once > > query1.awaitTermination(); > > is called, you don't even get to query2.awaitTermination(). > > > On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi Burak, >> >> Thanks much! had no clue that existed. Now, I changed it to this. >> >> StreamingQuery query1 = >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start(); >> StreamingQuery query2 = >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start(); >> >> query1.awaitTermination(); >> query2.awaitTermination(); >> sparkSession.streams().awaitAnyTermination(); >> >> >> >> >> >> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote: >> >>> Hey Kant, >>> >>> That won't work either. Your second query may fail, and as long as your >>> first query is running, you will not know. Put this as the last line >>> instead: >>> >>> spark.streams.awaitAnyTermination() >>> >>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> >>> wrote: >>> >>>> Looks like my problem was the order of awaitTermination() for some >>>> reason. >>>> >>>> *Doesn't work * >>>> >>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello1")).start().awaitTermination() >>>> >>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello2")).start().awaitTermination() >>>> >>>> *Works* >>>> >>>> StreamingQuery query1 = outputDS1.writeStream().trigge >>>> r(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello1")).start(); >>>> >>>> query1.awaitTermination() >>>> >>>> StreamingQuery query2 =outputDS2.writeStream().trigg >>>> er(Trigger.processingTime(1000)).foreach(new >>>> KafkaSink("hello2")).start(); >>>> >>>> query2.awaitTermination() >>>> >>>> >>>> >>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Looks like my problem was the order of awaitTermination() for some >>>>> reason. >>>>> >>>>> Doesn't work >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I have the following Psuedo code (I could paste the real code however >>>>>> it is pretty long and involves Database calls inside dataset.map >>>>>> operation >>>>>> and so on) so I am just trying to simplify my question. would like to >>>>>> know >>>>>> if there is something wrong with the following pseudo code? >>>>>> >>>>>> DataSet<String> inputDS = readFromKaka(topicName) >>>>>> >>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >>>>>> Since I can see data getting populated >>>>>> >>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works >>>>>> as well >>>>>> >>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // >>>>>> Doesn't work >>>>>> >>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>> KafkaSink("hello1")).start().awaitTermination() >>>>>> >>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>> KafkaSink("hello2")).start().awaitTermination() >>>>>> >>>>>> >>>>>> *So what's happening with above code is that I can see data coming >>>>>> out of hello1 topic but not from hello2 topic.* I thought there is >>>>>> something wrong with "outputDS2" so I switched the order so now the code >>>>>> looks like this >>>>>> >>>>>> DataSet<String> inputDS = readFromKaka(topicName) >>>>>> >>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >>>>>> Since I can see data getting populated >>>>>> >>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This >>>>>> Works >>>>>> >>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't >>>>>> work >>>>>> >>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>> KafkaSink("hello1")).start().awaitTermination() >>>>>> >>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >>>>>> KafkaSink("hello2")).start().awaitTermination() >>>>>> >>>>>> *Now I can see data coming out from hello2 kafka topic but not from >>>>>> hello1 topic*. *In short, I can only see data from outputDS1 or >>>>>> outputDS2 but not both. * At this point I am not sure what is going >>>>>> on? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >