Hi Sri, Question1: You can use a map to filter the "true", i.e, ds.map(_._2). Note, it's ok to remove the "true" flag for distinct as it does not generate updates. For other query contains updates, such as a non-window group by, we should not filter the flag or the result is not correct.
Question 2: I can't reproduce this problem in my local environment. Maybe there is something wrong with the source data? Best, Hequn On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > windows for question 1 or question 2 or both ? > > Thanks > Sri > > On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <taher...@gmail.com> > wrote: > >> Looks like you need a window >> >> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Hi All, >>> >>> I am trying to write toRetractSream to CSV which is kind of working ok >>> but I get extra values like True and then my output data values. >>> >>> Question1 :- >>> I dont want true in my output data how to achieve this? >>> >>> Scree >>> >>> Question 2:- >>> in the output file (CSV) I am missing data in the last line is the >>> toRetractStram closing before writing to file? >>> >>> Screen Shot attached >>> >>> Code:- >>> >>> val data = kinesis.map(mapFunction) >>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") >>> val query = "SELECT distinct >>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >>> FROM transactions where cc_num not in ('cc_num')" >>> val table = tEnv.sqlQuery(query) >>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) >>> >>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", >>> FileSystem.WriteMode.OVERWRITE,"\n","|") >>> >>> >>> >>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> > > -- > Thanks & Regards > Sri Tummala > >