Hi Weng, another issue now (Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.), here is the full code https://github.com/kali786516/FlinkStreamAndSql/blob/15e5e60d6c044bc830f5ef2d79c071389e7460d1/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L128 and pom https://github.com/kali786516/FlinkStreamAndSql/blob/master/pom.xml.
Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams. at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:100) at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:126) at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala) tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')" val table = tEnv.sqlQuery(query) table.printSchema() import org.apache.flink.streaming.api.scala._ val test1 = tEnv.sqlQuery(query).distinct().toAppendStream[Row] test1.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3") On Mon, Jul 15, 2019 at 9:52 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi Kali, > > Currently Flink treats all aggregate functions as retractable. As > `distinct` is an aggregate function, it's considered by the planner that it > might update or retract records (although from my perspective it won't...). > Because csv table sink is an append only sink (it's hard to update what has > been written in the middle of a file), the exception you mentioned occurs. > > However, you can use `toAppendStream` method to change the retractable > stream to an append only stream. For example, > `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get > an append only stream. You can then add csv sink to this stream. > > sri hari kali charan Tummala <kali.tumm...@gmail.com> 于2019年7月16日周二 > 上午3:32写道: > >> Hi All, >> >> I am trying to read data from kinesis stream and applying SQL >> transformation (distinct) and then tryting to write to CSV sink which is >> failinf due to this issue (org.apache.flink.table.api.TableException: >> AppendStreamTableSink requires that Table has only insert changes.) , full >> code is here ( >> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112 >> ). >> >> can anyone help me moveforward on this issue? >> >> Full Code:- >> >> // set up the streaming execution environment >> val env = StreamExecutionEnvironment.createLocalEnvironment >> //env.enableCheckpointing(10) >> >> val tEnv = TableEnvironment.getTableEnvironment(env) >> >> // Get AWS credentials >> val credentialsProvider = new DefaultAWSCredentialsProviderChain >> val credentials = credentialsProvider.getCredentials >> >> // Configure Flink Kinesis consumer >> val consumerConfig = new Properties >> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") >> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, >> credentials.getAWSAccessKeyId) >> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, >> credentials.getAWSSecretKey) >> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, >> "TRIM_HORIZON") >> >> // Create Kinesis stream >> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", >> new SimpleStringSchema(), consumerConfig)) >> >> val mapFunction: MapFunction[String, Tuple10[String, String, >> String,String,String,String,String,String,String,String]] = >> new MapFunction[String, Tuple10[String, String, >> String,String,String,String,String,String,String,String]]() { >> >> override def map(s: String): Tuple10[String, String, >> String,String,String,String,String,String,String,String] = { >> val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) >> >> val csvData = data.getCc_num+","+ >> data.getFirst+","+ >> data.getLast+","+ >> data.getTrans_num+","+ >> data.getTrans_time+","+ >> data.getCategory+","+ >> data.getMerchant+","+ >> data.getAmt+","+ >> data.getMerch_lat+","+ >> data.getMerch_long >> >> //println(csvData) >> >> val p:Array[String] = csvData.split(",") >> var cc_num:String = p(0) >> var first:String = p(1) >> var last:String = p(2) >> var trans_num:String = p(3) >> var trans_time:String = p(4) >> var category:String = p(5) >> var merchant:String = p(6) >> var amt:String = p(7) >> var merch_lat:String = p(8) >> var merch_long:String = p(9) >> >> val creationDate: Time = new Time(System.currentTimeMillis()) >> return new Tuple10(cc_num, first, >> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long) >> } >> } >> >> val data = kinesis.map(mapFunction) >> >> //data.print() >> >> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") >> >> val query = "SELECT distinct >> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >> FROM transactions where cc_num not in ('cc_num')" >> val table = tEnv.sqlQuery(query) >> >> val table1 = table.distinct() >> >> tEnv.registerTable("fromAnotherTable",table1) >> >> table.printSchema() >> >> val csvSink:TableSink[Row] = new >> CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~"); >> val fieldNames:Array[String] = >> Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long") >> val fieldTypes:Array[TypeInformation[_]] = Array( >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING, >> org.apache.flink.api.common.typeinfo.Types.STRING >> ) >> >> tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, >> csvSink) >> >> tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT >> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >> from fromAnotherTable") >> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala