Hi Weng,

another issue now (Exception in thread "main"
org.apache.flink.table.api.TableException: Only tables that originate from
Scala DataStreams can be converted to Scala DataStreams.), here is the full
code
https://github.com/kali786516/FlinkStreamAndSql/blob/15e5e60d6c044bc830f5ef2d79c071389e7460d1/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L128
and pom https://github.com/kali786516/FlinkStreamAndSql/blob/master/pom.xml.

Exception in thread "main" org.apache.flink.table.api.TableException: Only
tables that originate from Scala DataStreams can be converted to Scala
DataStreams.
at
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:100)
at
com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:126)
at
com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

table.printSchema()

import org.apache.flink.streaming.api.scala._

val test1 = tEnv.sqlQuery(query).distinct().toAppendStream[Row]

test1.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3")


On Mon, Jul 15, 2019 at 9:52 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala <kali.tumm...@gmail.com> 于2019年7月16日周二
> 上午3:32写道:
>
>> Hi All,
>>
>> I am trying to read data from kinesis stream and applying SQL
>> transformation (distinct) and then tryting to write to CSV sink which is
>> failinf due to this issue (org.apache.flink.table.api.TableException:
>> AppendStreamTableSink requires that Table has only insert changes.) , full
>> code is here (
>> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
>> ).
>>
>> can anyone help me moveforward on this issue?
>>
>> Full Code:-
>>
>> // set up the streaming execution environment
>> val env = StreamExecutionEnvironment.createLocalEnvironment
>> //env.enableCheckpointing(10)
>>
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>> // Get AWS credentials
>> val credentialsProvider = new DefaultAWSCredentialsProviderChain
>> val credentials = credentialsProvider.getCredentials
>>
>> // Configure Flink Kinesis consumer
>> val consumerConfig = new Properties
>> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>> credentials.getAWSAccessKeyId)
>> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>> credentials.getAWSSecretKey)
>> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
>> "TRIM_HORIZON")
>>
>> // Create Kinesis stream
>> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
>> new SimpleStringSchema(), consumerConfig))
>>
>> val mapFunction: MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]] =
>>   new MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]]() {
>>
>>     override def map(s: String): Tuple10[String, String, 
>> String,String,String,String,String,String,String,String] = {
>>       val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>>
>>       val csvData = data.getCc_num+","+
>>         data.getFirst+","+
>>         data.getLast+","+
>>         data.getTrans_num+","+
>>         data.getTrans_time+","+
>>         data.getCategory+","+
>>         data.getMerchant+","+
>>         data.getAmt+","+
>>         data.getMerch_lat+","+
>>         data.getMerch_long
>>
>>       //println(csvData)
>>
>>       val p:Array[String] = csvData.split(",")
>>       var cc_num:String = p(0)
>>       var first:String = p(1)
>>       var last:String = p(2)
>>       var trans_num:String = p(3)
>>       var trans_time:String = p(4)
>>       var category:String = p(5)
>>       var merchant:String = p(6)
>>       var amt:String = p(7)
>>       var merch_lat:String = p(8)
>>       var merch_long:String = p(9)
>>
>>       val creationDate: Time = new Time(System.currentTimeMillis())
>>       return new Tuple10(cc_num, first, 
>> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>>     }
>>   }
>>
>> val data = kinesis.map(mapFunction)
>>
>> //data.print()
>>
>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>
>> val query = "SELECT distinct 
>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>  FROM transactions where cc_num not in ('cc_num')"
>> val table = tEnv.sqlQuery(query)
>>
>> val table1 = table.distinct()
>>
>> tEnv.registerTable("fromAnotherTable",table1)
>>
>> table.printSchema()
>>
>> val csvSink:TableSink[Row]  = new 
>> CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
>> val fieldNames:Array[String]              = 
>> Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
>> val fieldTypes:Array[TypeInformation[_]]  = Array(
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING,
>>   org.apache.flink.api.common.typeinfo.Types.STRING
>> )
>>
>> tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, 
>> csvSink)
>>
>> tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT 
>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>  from fromAnotherTable")
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala

Reply via email to