Just tried  with sparkSession.streams().awaitAnyTermination(); And thats
the only await* I had and it works! But what if I don't want all my queries
to fail or stop making progress if one of them fails?

On Wed, Sep 20, 2017 at 2:26 AM, kant kodali <kanth...@gmail.com> wrote:

> Hi Burak,
>
> Are you saying get rid of both
>
> query1.awaitTermination();
> query2.awaitTermination();
>
> and just have the line below?
>
> sparkSession.streams().awaitAnyTermination();
>
> Thanks!
>
>
> On Wed, Sep 20, 2017 at 12:51 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> If I don't get anywhere after query1.awaitTermination();
>>
>> Then I cannot put this sparkSession.streams().awaitAnyTermination(); as
>> the last line of code right? Like below
>>
>> query1.awaitTermination();
>> sparkSession.streams().awaitAnyTermination();
>>
>>
>> On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> Please remove
>>>
>>> query1.awaitTermination();
>>> query2.awaitTermination();
>>>
>>> once
>>>
>>> query1.awaitTermination();
>>>
>>> is called, you don't even get to query2.awaitTermination().
>>>
>>>
>>> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> Hi Burak,
>>>>
>>>> Thanks much! had no clue that existed. Now, I changed it to this.
>>>>
>>>> StreamingQuery query1 = 
>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>>>> KafkaSink("hello1")).start();
>>>> StreamingQuery query2 = 
>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>>>> KafkaSink("hello2")).start();
>>>>
>>>> query1.awaitTermination();
>>>> query2.awaitTermination();
>>>> sparkSession.streams().awaitAnyTermination();
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>>>
>>>>> Hey Kant,
>>>>>
>>>>> That won't work either. Your second query may fail, and as long as
>>>>> your first query is running, you will not know. Put this as the last line
>>>>> instead:
>>>>>
>>>>> spark.streams.awaitAnyTermination()
>>>>>
>>>>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Looks like my problem was the order of awaitTermination() for some
>>>>>> reason.
>>>>>>
>>>>>> *Doesn't work *
>>>>>>
>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>>
>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>>
>>>>>> *Works*
>>>>>>
>>>>>> StreamingQuery query1 = outputDS1.writeStream().trigge
>>>>>> r(Trigger.processingTime(1000)).foreach(new
>>>>>> KafkaSink("hello1")).start();
>>>>>>
>>>>>> query1.awaitTermination()
>>>>>>
>>>>>> StreamingQuery query2 =outputDS2.writeStream().trigg
>>>>>> er(Trigger.processingTime(1000)).foreach(new
>>>>>> KafkaSink("hello2")).start();
>>>>>>
>>>>>> query2.awaitTermination()
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Looks like my problem was the order of awaitTermination() for some
>>>>>>> reason.
>>>>>>>
>>>>>>> Doesn't work
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have the following Psuedo code (I could paste the real code
>>>>>>>> however it is pretty long and involves Database calls inside 
>>>>>>>> dataset.map
>>>>>>>> operation and so on) so I am just trying to simplify my question. would
>>>>>>>> like to know if there is something wrong with the following pseudo 
>>>>>>>> code?
>>>>>>>>
>>>>>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>>>>>
>>>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); //
>>>>>>>> Works Since I can see data getting populated
>>>>>>>>
>>>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works
>>>>>>>> as well
>>>>>>>>
>>>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); //
>>>>>>>> Doesn't work
>>>>>>>>
>>>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>>>>
>>>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>>>>
>>>>>>>>
>>>>>>>> *So what's happening with above code is that I can see data coming
>>>>>>>> out of hello1 topic but not from hello2 topic.* I thought there is
>>>>>>>> something wrong with "outputDS2" so I switched the order  so now the 
>>>>>>>> code
>>>>>>>> looks like this
>>>>>>>>
>>>>>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>>>>>
>>>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); //
>>>>>>>> Works Since I can see data getting populated
>>>>>>>>
>>>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This
>>>>>>>> Works
>>>>>>>>
>>>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); //
>>>>>>>> Desn't work
>>>>>>>>
>>>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>>>>
>>>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>>>>
>>>>>>>> *Now I can see data coming out from hello2 kafka topic but not from
>>>>>>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>>>>>>> outputDS2 but not both. * At this point I am not sure what is
>>>>>>>> going on?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to