Looks like my problem was the order of awaitTermination() for some reason.

*Doesn't work *

outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
KafkaSink("hello1")).start().awaitTermination()

outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
KafkaSink("hello2")).start().awaitTermination()

*Works*

StreamingQuery query1 = outputDS1.writeStream().trigger(Trigger.
processingTime(1000)).foreach(new KafkaSink("hello1")).start();

query1.awaitTermination()

StreamingQuery query2 =outputDS2.writeStream().trigger(Trigger.
processingTime(1000)).foreach(new KafkaSink("hello2")).start();

query2.awaitTermination()



On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> wrote:

> Looks like my problem was the order of awaitTermination() for some reason.
>
> Doesn't work
>
>
>
>
>
> On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> Hi All,
>>
>> I have the following Psuedo code (I could paste the real code however it
>> is pretty long and involves Database calls inside dataset.map operation and
>> so on) so I am just trying to simplify my question. would like to know if
>> there is something wrong with the following pseudo code?
>>
>> DataSet<String> inputDS = readFromKaka(topicName)
>>
>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>> Since I can see data getting populated
>>
>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as
>> well
>>
>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't
>> work
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>>
>> *So what's happening with above code is that I can see data coming out of
>> hello1 topic but not from hello2 topic.* I thought there is something
>> wrong with "outputDS2" so I switched the order  so now the code looks like
>> this
>>
>> DataSet<String> inputDS = readFromKaka(topicName)
>>
>> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works
>> Since I can see data getting populated
>>
>> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This Works
>>
>> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't work
>>
>> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello1")).start().awaitTermination()
>>
>> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new
>> KafkaSink("hello2")).start().awaitTermination()
>>
>> *Now I can see data coming out from hello2 kafka topic but not from
>> hello1 topic*. *In  short, I can only see data from outputDS1 or
>> outputDS2 but not both. * At this point I am not sure what is going on?
>>
>> Thanks!
>>
>>
>>
>

Reply via email to