> > > Hi All, > > I am trying to convert sql query results value to distinct and writing to > CSV which is failing with below error. > > *Exception in thread "main" org.apache.flink.table.api.TableException: > Only tables that originate from Scala DataStreams can be converted to Scala > DataStreams.* > > > * at > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145) > at > com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153) > at > com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)* > > Code Example:- > > val data = kinesis.map(mapFunction) > tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") > val query = "SELECT distinct > cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long > FROM transactions where cc_num not in ('cc_num')" > val table = tEnv.sqlQuery(query) > import org.apache.flink.streaming.api.scala._ > tEnv.sqlQuery(query).distinct().toRetractStream[Row] > > .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", > FileSystem.WriteMode.NO_OVERWRITE,"~","|") > > > Thanks & Regards Sri Tummala
Fwd: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
sri hari kali charan Tummala Tue, 16 Jul 2019 08:01:54 -0700
- Fwd: org.apache.flink.table.api.TableExceptio... sri hari kali charan Tummala
- Re: org.apache.flink.table.api.TableExce... sri hari kali charan Tummala
- Re: org.apache.flink.table.api.Table... Hequn Cheng