Question 1:- I did tired map function end up having issue ( https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i )
I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am getting this error asking me for InferedR , what is InferedR in FLink? val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] = new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() { override def map(t: tuple.Tuple2[Boolean, Row]): Row = { t.f1 } /*override def map(t: tuple.Tuple2[Boolean, Row], collector: Collector[Object]): Unit = { collector.collect(t.f1) } */ } tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.OVERWRITE,"\n","|") and when I try to I get a different type of error. *Error:(143, 74) type mismatch; found : org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal] required: org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?] tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)* *Question 2:- * *I dont have any source data issue, to regenerate this issue for testing its simple.* *create a kinesis stream * *run the producer * https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala then run the consumer:- https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala Thanks Sri On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi Sri, > > Question1: > You can use a map to filter the "true", i.e, ds.map(_._2). > Note, it's ok to remove the "true" flag for distinct as it does not > generate updates. For other query contains updates, such as a non-window > group by, we should not filter the flag or the result is not correct. > > Question 2: > I can't reproduce this problem in my local environment. Maybe there is > something wrong with the source data? > > Best, Hequn > > On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> windows for question 1 or question 2 or both ? >> >> Thanks >> Sri >> >> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <taher...@gmail.com> >> wrote: >> >>> Looks like you need a window >>> >>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I am trying to write toRetractSream to CSV which is kind of working ok >>>> but I get extra values like True and then my output data values. >>>> >>>> Question1 :- >>>> I dont want true in my output data how to achieve this? >>>> >>>> Scree >>>> >>>> Question 2:- >>>> in the output file (CSV) I am missing data in the last line is the >>>> toRetractStram closing before writing to file? >>>> >>>> Screen Shot attached >>>> >>>> Code:- >>>> >>>> val data = kinesis.map(mapFunction) >>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") >>>> val query = "SELECT distinct >>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >>>> FROM transactions where cc_num not in ('cc_num')" >>>> val table = tEnv.sqlQuery(query) >>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) >>>> >>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", >>>> FileSystem.WriteMode.OVERWRITE,"\n","|") >>>> >>>> >>>> >>>> -- >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala