If I don't get anywhere after query1.awaitTermination();

Then I cannot put this sparkSession.streams().awaitAnyTermination(); as the
last line of code right? Like below

query1.awaitTermination();
sparkSession.streams().awaitAnyTermination();


On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz <brk...@gmail.com> wrote:

> Please remove
>
> query1.awaitTermination();
> query2.awaitTermination();
>
> once
>
> query1.awaitTermination();
>
> is called, you don't even get to query2.awaitTermination().
>
>
> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi Burak,
>>
>> Thanks much! had no clue that existed. Now, I changed it to this.
>>
>> StreamingQuery query1 = 
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>> KafkaSink("hello1")).start();
>> StreamingQuery query2 = 
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>> KafkaSink("hello2")).start();
>>
>> query1.awaitTermination();
>> query2.awaitTermination();
>> sparkSession.streams().awaitAnyTermination();
>>
>>
>>
>>
>>
>> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> Hey Kant,
>>>
>>> That won't work either. Your second query may fail, and as long as your
>>> first query is running, you will not know. Put this as the last line
>>> instead:
>>>
>>> spark.streams.awaitAnyTermination()
>>>
>>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> Looks like my problem was the order of awaitTermination() for some
>>>> reason.
>>>>
>>>> *Doesn't work *
>>>>
>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>
>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>
>>>> *Works*
>>>>
>>>> StreamingQuery query1 = outputDS1.writeStream().trigge
>>>> r(Trigger.processingTime(1000)).foreach(new
>>>> KafkaSink("hello1")).start();
>>>>
>>>> query1.awaitTermination()
>>>>
>>>> StreamingQuery query2 =outputDS2.writeStream().trigg
>>>> er(Trigger.processingTime(1000)).foreach(new
>>>> KafkaSink("hello2")).start();
>>>>
>>>> query2.awaitTermination()
>>>>
>>>>
>>>>
>>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Looks like my problem was the order of awaitTermination() for some
>>>>> reason.
>>>>>
>>>>> Doesn't work
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have the following Psuedo code (I could paste the real code however
>>>>>> it is pretty long and involves Database calls inside dataset.map 
>>>>>> operation
>>>>>> and so on) so I am just trying to simplify my question. would like to 
>>>>>> know
>>>>>> if there is something wrong with the following pseudo code?
>>>>>>
>>>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>>>
>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>>>> Since I can see data getting populated
>>>>>>
>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works
>>>>>> as well
>>>>>>
>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); //
>>>>>> Doesn't work
>>>>>>
>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>>
>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>>
>>>>>>
>>>>>> *So what's happening with above code is that I can see data coming
>>>>>> out of hello1 topic but not from hello2 topic.* I thought there is
>>>>>> something wrong with "outputDS2" so I switched the order  so now the code
>>>>>> looks like this
>>>>>>
>>>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>>>
>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>>>> Since I can see data getting populated
>>>>>>
>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This
>>>>>> Works
>>>>>>
>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't
>>>>>> work
>>>>>>
>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>>
>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>>
>>>>>> *Now I can see data coming out from hello2 kafka topic but not from
>>>>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>>>>> outputDS2 but not both. * At this point I am not sure what is going
>>>>>> on?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to