Amazing all issues resolved in one go thanks Cheng , one issue though I can't write map.(_._2) to CSV looks like it doesn't support right now have to be TextFile.
below is a full code if someone wants in Scala. Git Code is here:- https://github.com/kali786516/FlinkStreamAndSql package com.aws.examples.kinesis.consumer.transactionExampleScala import java.util.Properties import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass import com.google.gson.Gson import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.types.Row import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants} import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import java.sql.{DriverManager, Time} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.core.fs.{FileSystem, Path} object TransactionScalaTest { /* extends RetractStreamTableSink[Row] override def configure(strings: Array[String], typeInformations: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ??? override def getFieldNames: Array[String] = ??? override def getFieldTypes: Array[TypeInformation[_]] = ??? override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ??? override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType override def getRecordType: TypeInformation[Row] = ??? */ def main(args: Array[String]): Unit = { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment //env.enableCheckpointing(10000) val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // Get AWS credentials val credentialsProvider = new DefaultAWSCredentialsProviderChain val credentials = credentialsProvider.getCredentials // Configure Flink Kinesis consumer val consumerConfig = new Properties consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId) consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey) consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON") // Create Kinesis stream val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), consumerConfig)) val mapFunction: MapFunction[String, (String, String, String, String, String, String, String, String, String, String)] = new MapFunction[String, (String, String, String, String, String, String, String, String, String, String)]() { override def map(s: String): (String, String, String, String, String, String, String, String, String, String) = { val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) val csvData = data.getCc_num + "," + data.getFirst + "," + data.getLast + "," + data.getTrans_num + "," + data.getTrans_time + "," + data.getCategory + "," + data.getMerchant + "," + data.getAmt + "," + data.getMerch_lat + "," + data.getMerch_long //println(csvData) val p: Array[String] = csvData.split(",") var cc_num: String = p(0) var first: String = p(1) var last: String = p(2) var trans_num: String = p(3) var trans_time: String = p(4) var category: String = p(5) var merchant: String = p(6) var amt: String = p(7) var merch_lat: String = p(8) var merch_long: String = p(9) val creationDate:Time = new Time(System.currentTimeMillis()) return (cc_num, first, last, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long) } } val data = kinesis.map(mapFunction) tEnv.registerDataStream("transactions", data, 'cc_num,'first_column,'last_column,'trans_num, 'trans_time,'category_column,'merchant_column,'amt_column,'merch_lat,'merch_long) //tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num') " val table = tEnv.sqlQuery(query) table .toRetractStream(TypeInformation.of(classOf[Row])) .map(_._2) .writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE) table.printSchema() table.toRetractStream(TypeInformation.of(classOf[Row])).print() env.execute() /* table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122", FileSystem.WriteMode.OVERWRITE, "\n","|") val test = table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2) test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE) test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122", FileSystem.WriteMode.OVERWRITE, "\n","|") import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.common.typeinfo.TypeInformation implicit val typeInfo = TypeInformation.of(classOf[Row]) val ds = table.toRetractStream(TypeInformation.of(classOf[Row])) ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE, "\n","|") tEnv.toRetractStream(table, TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15", FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", "\n","|") import org.apache.flink.api.common.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.api.common.typeinfo.TypeInformation implicit val typeInfo = TypeInformation.of(classOf[Row]) table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.OVERWRITE, "\n", "|") table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") ds. writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6)) tEnv.toRetractStream(table) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.OVERWRITE, "\n", "|") tEnv.toRetractStream(table,classOf[T]) */ } } On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Question 1:- > > I did tired map function end up having issue ( > https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i > ) > > I am trying to convert a Tuple[Boolean,Row] to Row using map function, I > am getting this error asking me for InferedR , what is InferedR in FLink? > > val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] = > new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() { > override def map(t: tuple.Tuple2[Boolean, Row]): Row = { > t.f1 > } > /*override def map(t: tuple.Tuple2[Boolean, Row], collector: > Collector[Object]): Unit = { > collector.collect(t.f1) > } > */ > } > > tEnv.toRetractStream(table, > classOf[org.apache.flink.types.Row]).map(mymapFunction) > > .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", > FileSystem.WriteMode.OVERWRITE,"\n","|") > > and when I try to I get a different type of error. > > > > > *Error:(143, 74) type mismatch; found : > org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal] > required: > org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?] > tEnv.toRetractStream(table, > classOf[org.apache.flink.types.Row]).map(mymapFunction)* > > *Question 2:- * > *I dont have any source data issue, to regenerate this issue for testing > its simple.* > > *create a kinesis stream * > *run the producer * > > https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala > > then run the consumer:- > > https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala > > Thanks > Sri > > > > > > > > On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Sri, >> >> Question1: >> You can use a map to filter the "true", i.e, ds.map(_._2). >> Note, it's ok to remove the "true" flag for distinct as it does not >> generate updates. For other query contains updates, such as a non-window >> group by, we should not filter the flag or the result is not correct. >> >> Question 2: >> I can't reproduce this problem in my local environment. Maybe there is >> something wrong with the source data? >> >> Best, Hequn >> >> On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> windows for question 1 or question 2 or both ? >>> >>> Thanks >>> Sri >>> >>> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <taher...@gmail.com> >>> wrote: >>> >>>> Looks like you need a window >>>> >>>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I am trying to write toRetractSream to CSV which is kind of working ok >>>>> but I get extra values like True and then my output data values. >>>>> >>>>> Question1 :- >>>>> I dont want true in my output data how to achieve this? >>>>> >>>>> Scree >>>>> >>>>> Question 2:- >>>>> in the output file (CSV) I am missing data in the last line is the >>>>> toRetractStram closing before writing to file? >>>>> >>>>> Screen Shot attached >>>>> >>>>> Code:- >>>>> >>>>> val data = kinesis.map(mapFunction) >>>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") >>>>> val query = "SELECT distinct >>>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >>>>> FROM transactions where cc_num not in ('cc_num')" >>>>> val table = tEnv.sqlQuery(query) >>>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) >>>>> >>>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", >>>>> FileSystem.WriteMode.OVERWRITE,"\n","|") >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks & Regards >>>>> Sri Tummala >>>>> >>>>> >>> >>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala