Hi Lee, I did try
Option 1:- it writes to CSV file only if I kill the running job. tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3", FileSystem.WriteMode.OVERWRITE,"~","|") OutPut:- 2> (true,180094108369013,John,Holland,c1ad7a1b73172ef67bd24820438f3f93,2019-07-15 22:48:40,travel,Satterfield-Lowe,81,39.015861,-119.883595) Option 2:- I tried several options thought this workaround is kind of working but I need to replace brakcets,true etc.... import java.io.PrintStream val fileOut = new PrintStream("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut2/out.txt") System.setOut(fileOut) tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).print() System.out.println(tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).print()) On Mon, Jul 15, 2019 at 10:03 PM JingsongLee <lzljs3620...@aliyun.com> wrote: > Hi caizhi and kali: > > I think this table should use toRetractStream instead of toAppendStream, > and you should handle the retract messages. (If you just use distinct, the > message should always be accumulate message) > > Best, JingsongLee > > ------------------------------------------------------------------ > From:Caizhi Weng <tsreape...@gmail.com> > Send Time:2019年7月16日(星期二) 09:52 > To:sri hari kali charan Tummala <kali.tumm...@gmail.com> > Cc:user <user@flink.apache.org> > Subject:Re: Stream to CSV Sink with SQL Distinct Values > > Hi Kali, > > Currently Flink treats all aggregate functions as retractable. As > `distinct` is an aggregate function, it's considered by the planner that it > might update or retract records (although from my perspective it won't...). > Because csv table sink is an append only sink (it's hard to update what has > been written in the middle of a file), the exception you mentioned occurs. > > However, you can use `toAppendStream` method to change the retractable > stream to an append only stream. For example, > `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get > an append only stream. You can then add csv sink to this stream. > > sri hari kali charan Tummala <kali.tumm...@gmail.com> 于2019年7月16日周二 > 上午3:32写道: > Hi All, > > I am trying to read data from kinesis stream and applying SQL > transformation (distinct) and then tryting to write to CSV sink which is > failinf due to this issue (org.apache.flink.table.api.TableException: > AppendStreamTableSink requires that Table has only insert changes.) , full > code is here ( > https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112 > ). > > can anyone help me moveforward on this issue? > > Full Code:- > > // set up the streaming execution environment > val env = StreamExecutionEnvironment.createLocalEnvironment > //env.enableCheckpointing(10) > > val tEnv = TableEnvironment.getTableEnvironment(env) > > // Get AWS credentials > val credentialsProvider = new DefaultAWSCredentialsProviderChain > val credentials = credentialsProvider.getCredentials > > // Configure Flink Kinesis consumer > val consumerConfig = new Properties > consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") > consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, > credentials.getAWSAccessKeyId) > consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > credentials.getAWSSecretKey) > consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON") > > // Create Kinesis stream > val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", > new SimpleStringSchema(), consumerConfig)) > > val mapFunction: MapFunction[String, Tuple10[String, String, > String,String,String,String,String,String,String,String]] = > new MapFunction[String, Tuple10[String, String, > String,String,String,String,String,String,String,String]]() { > > override def map(s: String): Tuple10[String, String, > String,String,String,String,String,String,String,String] = { > val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) > > val csvData = data.getCc_num+","+ > data.getFirst+","+ > data.getLast+","+ > data.getTrans_num+","+ > data.getTrans_time+","+ > data.getCategory+","+ > data.getMerchant+","+ > data.getAmt+","+ > data.getMerch_lat+","+ > data.getMerch_long > > //println(csvData) > > val p:Array[String] = csvData.split(",") > var cc_num:String = p(0) > var first:String = p(1) > var last:String = p(2) > var trans_num:String = p(3) > var trans_time:String = p(4) > var category:String = p(5) > var merchant:String = p(6) > var amt:String = p(7) > var merch_lat:String = p(8) > var merch_long:String = p(9) > > val creationDate: Time = new Time(System.currentTimeMillis()) > return new Tuple10(cc_num, first, > last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long) > } > } > > val data = kinesis.map(mapFunction) > > //data.print() > > tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") > > val query = "SELECT distinct > cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long > FROM transactions where cc_num not in ('cc_num')" > val table = tEnv.sqlQuery(query) > > val table1 = table.distinct() > > tEnv.registerTable("fromAnotherTable",table1) > > table.printSchema() > > val csvSink:TableSink[Row] = new > CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~"); > val fieldNames:Array[String] = > Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long") > val fieldTypes:Array[TypeInformation[_]] = Array( > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING, > org.apache.flink.api.common.typeinfo.Types.STRING > ) > > tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, > csvSink) > > tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT > cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long > from fromAnotherTable") > > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala