Hi All, I am trying to read data from kinesis stream and applying SQL transformation (distinct) and then tryting to write to CSV sink which is failinf due to this issue (org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.) , full code is here ( https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112 ).
can anyone help me moveforward on this issue? Full Code:- // set up the streaming execution environment val env = StreamExecutionEnvironment.createLocalEnvironment //env.enableCheckpointing(10) val tEnv = TableEnvironment.getTableEnvironment(env) // Get AWS credentials val credentialsProvider = new DefaultAWSCredentialsProviderChain val credentials = credentialsProvider.getCredentials // Configure Flink Kinesis consumer val consumerConfig = new Properties consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId) consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey) consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON") // Create Kinesis stream val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig)) val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] = new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() { override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = { val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) val csvData = data.getCc_num+","+ data.getFirst+","+ data.getLast+","+ data.getTrans_num+","+ data.getTrans_time+","+ data.getCategory+","+ data.getMerchant+","+ data.getAmt+","+ data.getMerch_lat+","+ data.getMerch_long //println(csvData) val p:Array[String] = csvData.split(",") var cc_num:String = p(0) var first:String = p(1) var last:String = p(2) var trans_num:String = p(3) var trans_time:String = p(4) var category:String = p(5) var merchant:String = p(6) var amt:String = p(7) var merch_lat:String = p(8) var merch_long:String = p(9) val creationDate: Time = new Time(System.currentTimeMillis()) return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long) } } val data = kinesis.map(mapFunction) //data.print() tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')" val table = tEnv.sqlQuery(query) val table1 = table.distinct() tEnv.registerTable("fromAnotherTable",table1) table.printSchema() val csvSink:TableSink[Row] = new CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~"); val fieldNames:Array[String] = Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long") val fieldTypes:Array[TypeInformation[_]] = Array( org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING ) tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, csvSink) tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long from fromAnotherTable") -- Thanks & Regards Sri Tummala