Hi Kali,

Currently Flink treats all aggregate functions as retractable. As
`distinct` is an aggregate function, it's considered by the planner that it
might update or retract records (although from my perspective it won't...).
Because csv table sink is an append only sink (it's hard to update what has
been written in the middle of a file), the exception you mentioned occurs.

However, you can use `toAppendStream` method to change the retractable
stream to an append only stream. For example,
`tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
an append only stream. You can then add csv sink to this stream.

sri hari kali charan Tummala <kali.tumm...@gmail.com> 于2019年7月16日周二
上午3:32写道:

> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
> new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
>     override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>       val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>       val csvData = data.getCc_num+","+
>         data.getFirst+","+
>         data.getLast+","+
>         data.getTrans_num+","+
>         data.getTrans_time+","+
>         data.getCategory+","+
>         data.getMerchant+","+
>         data.getAmt+","+
>         data.getMerch_lat+","+
>         data.getMerch_long
>
>       //println(csvData)
>
>       val p:Array[String] = csvData.split(",")
>       var cc_num:String = p(0)
>       var first:String = p(1)
>       var last:String = p(2)
>       var trans_num:String = p(3)
>       var trans_time:String = p(4)
>       var category:String = p(5)
>       var merchant:String = p(6)
>       var amt:String = p(7)
>       var merch_lat:String = p(8)
>       var merch_long:String = p(9)
>
>       val creationDate: Time = new Time(System.currentTimeMillis())
>       return new Tuple10(cc_num, first, 
> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>     }
>   }
>
> val data = kinesis.map(mapFunction)
>
> //data.print()
>
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>
> val query = "SELECT distinct 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
>
> val table1 = table.distinct()
>
> tEnv.registerTable("fromAnotherTable",table1)
>
> table.printSchema()
>
> val csvSink:TableSink[Row]  = new 
> CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
> val fieldNames:Array[String]              = 
> Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
> val fieldTypes:Array[TypeInformation[_]]  = Array(
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING
> )
>
> tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, 
> csvSink)
>
> tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  from fromAnotherTable")
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

Reply via email to