Hi Burak,

Thanks much! had no clue that existed. Now, I changed it to this.

StreamingQuery query1 =
outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
KafkaSink("hello1")).start();
StreamingQuery query2 =
outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
KafkaSink("hello2")).start();

query1.awaitTermination();
query2.awaitTermination();
sparkSession.streams().awaitAnyTermination();





On Tue, Sep 19, 2017 at 11:48 PM, Burak Yavuz <brk...@gmail.com> wrote:

> Hey Kant,
>
> That won't work either. Your second query may fail, and as long as your
> first query is running, you will not know. Put this as the last line
> instead:
>
> spark.streams.awaitAnyTermination()
>
> On Tue, Sep 19, 2017 at 10:11 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Looks like my problem was the order of awaitTermination() for some reason.
>>
>> *Doesn't work *
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>> *Works*
>>
>> StreamingQuery query1 = outputDS1.writeStream().trigge
>> r(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start();
>>
>> query1.awaitTermination()
>>
>> StreamingQuery query2 =outputDS2.writeStream().trigg
>> er(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start();
>>
>> query2.awaitTermination()
>>
>>
>>
>> On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Looks like my problem was the order of awaitTermination() for some
>>> reason.
>>>
>>> Doesn't work
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have the following Psuedo code (I could paste the real code however
>>>> it is pretty long and involves Database calls inside dataset.map operation
>>>> and so on) so I am just trying to simplify my question. would like to know
>>>> if there is something wrong with the following pseudo code?
>>>>
>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>
>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>> Since I can see data getting populated
>>>>
>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as
>>>> well
>>>>
>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
>>>> work
>>>>
>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>
>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>
>>>>
>>>> *So what's happening with above code is that I can see data coming out
>>>> of hello1 topic but not from hello2 topic.* I thought there is
>>>> something wrong with "outputDS2" so I switched the order  so now the code
>>>> looks like this
>>>>
>>>> DataSet<String> inputDS = readFromKaka(topicName)
>>>>
>>>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>>>> Since I can see data getting populated
>>>>
>>>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This
>>>> Works
>>>>
>>>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't
>>>> work
>>>>
>>>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>> KafkaSink("hello1")).start().awaitTermination()
>>>>
>>>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>>>> KafkaSink("hello2")).start().awaitTermination()
>>>>
>>>> *Now I can see data coming out from hello2 kafka topic but not from
>>>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>>>> outputDS2 but not both. * At this point I am not sure what is going on?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to