>
>
> Hi All,
>
> I am trying to convert sql query results value to distinct and writing to
> CSV which is failing with below error.
>
> *Exception in thread "main" org.apache.flink.table.api.TableException:
> Only tables that originate from Scala DataStreams can be converted to Scala
> DataStreams.*
>
>
> * at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
> at
> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
> at
> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)*
>
> Code Example:-
>
> val data = kinesis.map(mapFunction)
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
> val query = "SELECT distinct 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
> import org.apache.flink.streaming.api.scala._
> tEnv.sqlQuery(query).distinct().toRetractStream[Row]
>   
> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
>     FileSystem.WriteMode.NO_OVERWRITE,"~","|")
>
>
>
Thanks & Regards
Sri Tummala

Reply via email to