Hi Burak,

Are you saying get rid of both

query1.awaitTermination();
query2.awaitTermination();

and just have the line below?

sparkSession.streams().awaitAnyTermination();

Thanks!


On Wed, Sep 20, 2017 at 12:51 AM, kant kodali <kanth...@gmail.com> wrote:

> If I don't get anywhere after query1.awaitTermination();
>
> Then I cannot put this sparkSession.streams().awaitAnyTermination(); as
> the last line of code right? Like below
>
> query1.awaitTermination();
> sparkSession.streams().awaitAnyTermination();
>
>
> On Wed, Sep 20, 2017 at 12:07 AM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Please remove
>>
>> query1.awaitTermination();
>> query2.awaitTermination();
>>
>> once
>>
>> query1.awaitTermination();
>>
>> is called, you don't even get to query2.awaitTermination().
>>
>>
>> On Tue, Sep 19, 2017 at 11:59 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Burak,
>>>
>>> Thanks much! had no clue that existed. Now, I changed it to this.
>>>
>>> StreamingQuery query1 = 
>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>>> KafkaSink("hello1")).start();
>>> StreamingQuery query2 = 
>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new 
>>> KafkaSink("hello2")).start();
>>>
>>> query1.awaitTermination();
>>> query2.awaitTermination();
>>> sparkSession.streams().awaitAnyTermination();
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>>
>>>> Hey Kant,
>>>>
>>>> That won't work either. Your second query may fail, and as long as your
>>>> first query is running, you will not know. Put this as the last line
>>>> instead:
>>>>
>>>> spark.streams.awaitAnyTermination()
>>>>
>>>> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Looks like my problem was the order of awaitTermination() for some
>>>>> reason.
>>>>>
>>>>> *Doesn't work *
>>>>>
>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>
>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>
>>>>> *Works*
>>>>>
>>>>> StreamingQuery query1 = outputDS1.writeStream().trigge
>>>>> r(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello1")).start();
>>>>>
>>>>> query1.awaitTermination()
>>>>>
>>>>> StreamingQuery query2 =outputDS2.writeStream().trigg
>>>>> er(Trigger.processingTime(1000)).foreach(new
>>>>> KafkaSink("hello2")).start();
>>>>>
>>>>> query2.awaitTermination()
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Looks like my problem was the order of awaitTermination() for some
>>>>>> reason.
>>>>>>
>>>>>> Doesn't work
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I have the following Psuedo code (I could paste the real code
>>>>>>> however it is pretty long and involves Database calls inside dataset.map
>>>>>>> operation and so on) so I am just trying to simplify my question. would
>>>>>>> like to know if there is something wrong with the following pseudo code?
>>>>>>>
>>>>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>>>>
>>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>>>>> Since I can see data getting populated
>>>>>>>
>>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works
>>>>>>> as well
>>>>>>>
>>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); //
>>>>>>> Doesn't work
>>>>>>>
>>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>>>
>>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>>>
>>>>>>>
>>>>>>> *So what's happening with above code is that I can see data coming
>>>>>>> out of hello1 topic but not from hello2 topic.* I thought there is
>>>>>>> something wrong with "outputDS2" so I switched the order  so now the 
>>>>>>> code
>>>>>>> looks like this
>>>>>>>
>>>>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>>>>
>>>>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>>>>> Since I can see data getting populated
>>>>>>>
>>>>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This
>>>>>>> Works
>>>>>>>
>>>>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't
>>>>>>> work
>>>>>>>
>>>>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>>>>
>>>>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>>>>
>>>>>>> *Now I can see data coming out from hello2 kafka topic but not from
>>>>>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>>>>>> outputDS2 but not both. * At this point I am not sure what is going
>>>>>>> on?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to