Looks like my problem was the order of awaitTermination() for some reason. *Doesn't work *
outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello1")).start().awaitTermination() outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new KafkaSink("hello2")).start().awaitTermination() *Works* StreamingQuery query1 = outputDS1.writeStream().trigger(Trigger. processingTime(1000)).foreach(new KafkaSink("hello1")).start(); query1.awaitTermination() StreamingQuery query2 =outputDS2.writeStream().trigger(Trigger. processingTime(1000)).foreach(new KafkaSink("hello2")).start(); query2.awaitTermination() On Tue, Sep 19, 2017 at 10:09 PM, kant kodali <kanth...@gmail.com> wrote: > Looks like my problem was the order of awaitTermination() for some reason. > > Doesn't work > > > > > > On Tue, Sep 19, 2017 at 1:54 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi All, >> >> I have the following Psuedo code (I could paste the real code however it >> is pretty long and involves Database calls inside dataset.map operation and >> so on) so I am just trying to simplify my question. would like to know if >> there is something wrong with the following pseudo code? >> >> DataSet<String> inputDS = readFromKaka(topicName) >> >> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Works as >> well >> >> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // Doesn't >> work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> >> *So what's happening with above code is that I can see data coming out of >> hello1 topic but not from hello2 topic.* I thought there is something >> wrong with "outputDS2" so I switched the order so now the code looks like >> this >> >> DataSet<String> inputDS = readFromKaka(topicName) >> >> DataSet<String> mongoDS = inputDS.map(insertIntoDatabase); // Works >> Since I can see data getting populated >> >> DataSet<String> outputDS2 = mongoDS.map( readFromDatabase); // This Works >> >> DataSet<String> outputDS1 = mongoDS.map(readFromDatabase); // Desn't work >> >> outputDS1.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello1")).start().awaitTermination() >> >> outputDS2.writeStream().trigger(Trigger.processingTime(1000)).foreach(new >> KafkaSink("hello2")).start().awaitTermination() >> >> *Now I can see data coming out from hello2 kafka topic but not from >> hello1 topic*. *In short, I can only see data from outputDS1 or >> outputDS2 but not both. * At this point I am not sure what is going on? >> >> Thanks! >> >> >> >