Hi Sri, For scala jobs, we should import the corresponding scala Environment and DataStream.
e.g, import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.scala.StreamTableEnvironment See example here[1]. Best, Hequn [1] https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala On Tue, Jul 16, 2019 at 11:03 PM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > is this a Bug in Flink Scala? > > Full code and Maven POM:- > > package com.aws.examples.kinesis.consumer.TransactionExample > > import java.lang > import java.util.Properties > > import com.amazonaws.auth.DefaultAWSCredentialsProviderChain > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.common.serialization.{SimpleStringEncoder, > SimpleStringSchema} > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer > import > org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, > ConsumerConfigConstants} > import com.google.gson.{Gson, JsonObject} > import org.apache.flink.api.java.tuple.{Tuple10, Tuple3} > import java.sql.{DriverManager, Time} > > import com.aws.SchemaJavaClasses.Row1 > import org.apache.flink.types.Row > import org.apache.flink.table.api.scala._ > import org.apache.flink.table.sinks.CsvTableSink > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.table.sinks.TableSink > import org.apache.flink.core.fs.{FileSystem, Path} > > import scala.collection.JavaConversions._ > import org.apache.flink.table.sources.CsvTableSource > import org.apache.flink.table.api.Table > import org.apache.flink.table.api.TableEnvironment > import org.apache.flink.streaming.api.datastream.DataStream > import org.apache.flink.streaming.api.functions.sink.SinkFunction > import > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink > import com.aws.customSinks.CsvCustomSink > import org.apache.flink.api.java.tuple > import org.apache.flink.api.java.typeutils.TupleTypeInfo > import org.apache.flink.table.sinks.AppendStreamTableSink > import org.apache.flink.table.sinks.RetractStreamTableSink > import org.apache.flink.api.java.DataSet > > > > object KinesisConsumer extends RetractStreamTableSink[Row] { > > override def configure(strings: Array[String], typeInformations: > Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ??? > > override def getFieldNames: Array[String] = ??? > > override def getFieldTypes: Array[TypeInformation[_]] = ??? > > override def emitDataStream(dataStream: > DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ??? > > override def getOutputType(): TupleTypeInfo[tuple.Tuple2[lang.Boolean, > Row]] = super.getOutputType() > > override def getRecordType: TypeInformation[Row] = ??? > > > def main(args: Array[String]): Unit = { > > // set up the streaming execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > //env.enableCheckpointing(10) > > val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment = > TableEnvironment.getTableEnvironment(env) > > // Get AWS credentials > val credentialsProvider = new DefaultAWSCredentialsProviderChain > val credentials = credentialsProvider.getCredentials > > // Configure Flink Kinesis consumer > val consumerConfig = new Properties > consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") > consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, > credentials.getAWSAccessKeyId) > consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > credentials.getAWSSecretKey) > consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON") > > // Create Kinesis stream > val kinesis = env.addSource(new > FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), > consumerConfig)) > > val mapFunction: MapFunction[String, Tuple10[String, String, String, > String, String, String, String, String, String, String]] = > new MapFunction[String, Tuple10[String, String, String, String, String, > String, String, String, String, String]]() { > > override def map(s: String): Tuple10[String, String, String, String, > String, String, String, String, String, String] = { > > val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) > > val csvData = data.getCc_num + "," + > data.getFirst + "," + > data.getLast + "," + > data.getTrans_num + "," + > data.getTrans_time + "," + > data.getCategory + "," + > data.getMerchant + "," + > data.getAmt + "," + > data.getMerch_lat + "," + > data.getMerch_long > > //println(csvData) > > val p: Array[String] = csvData.split(",") > var cc_num: String = p(0) > var first: String = p(1) > var last: String = p(2) > var trans_num: String = p(3) > var trans_time: String = p(4) > var category: String = p(5) > var merchant: String = p(6) > var amt: String = p(7) > var merch_lat: String = p(8) > var merch_long: String = p(9) > > val creationDate: Time = new Time(System.currentTimeMillis()) > return new Tuple10(cc_num, first, last, trans_num, trans_time, > category, merchant, amt, merch_lat, merch_long) > } > } > > val data = kinesis.map(mapFunction) > tEnv.registerDataStream("transactions", data, > "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") > val query = "SELECT distinct > cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long > FROM transactions where cc_num not in ('cc_num')" > val table = tEnv.sqlQuery(query) > import org.apache.flink.streaming.api.scala._ > tEnv.sqlQuery(query).distinct().toRetractStream[Row] > > .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", > FileSystem.WriteMode.NO_OVERWRITE, "~", "|") > > env.execute() > } > } > > *POM:-* > > <?xml version="1.0" encoding="UTF-8"?> > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > > <groupId>FlinkStreamAndSql</groupId> > <artifactId>FlinkStreamAndSql</artifactId> > <version>1.0-SNAPSHOT</version> > <build> > <sourceDirectory>src/main/scala</sourceDirectory> > <testSourceDirectory>src/test/scala</testSourceDirectory> > <plugins> > <plugin> > <!-- see http://davidb.github.com/scala-maven-plugin --> > <groupId>net.alchim31.maven</groupId> > <artifactId>scala-maven-plugin</artifactId> > <version>3.1.3</version> > <executions> > <execution> > <goals> > <goal>compile</goal> > <goal>testCompile</goal> > </goals> > <configuration> > </configuration> > </execution> > </executions> > </plugin> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-surefire-plugin</artifactId> > <version>2.13</version> > <configuration> > <useFile>false</useFile> > <disableXmlReport>true</disableXmlReport> > <!-- If you have classpath issue like NoDefClassError,... > --> > <!-- useManifestOnlyJar>false</useManifestOnlyJar --> > <includes> > <include>**/*Test.*</include> > <include>**/*Suite.*</include> > </includes> > </configuration> > </plugin> > > <!-- "package" command plugin --> > <plugin> > <artifactId>maven-assembly-plugin</artifactId> > <version>2.4.1</version> > <configuration> > <descriptorRefs> > <descriptorRef>jar-with-dependencies</descriptorRef> > </descriptorRefs> > </configuration> > <executions> > <execution> > <id>make-assembly</id> > <phase>package</phase> > <goals> > <goal>single</goal> > </goals> > </execution> > </executions> > </plugin> > </plugins> > </build> > <dependencies> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.derby</groupId> > <artifactId>derby</artifactId> > <version>10.13.1.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-jdbc_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-scala_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java</artifactId> > <version>1.8.1</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kinesis_2.11</artifactId> > <version>1.8.0</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.11_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>amazon-kinesis-client</artifactId> > <version>1.8.8</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>aws-java-sdk-kinesis</artifactId> > <version>1.11.579</version> > </dependency> > > <dependency> > <groupId>commons-dbcp</groupId> > <artifactId>commons-dbcp</artifactId> > <version>1.2.2</version> > </dependency> > <dependency> > <groupId>com.google.code.gson</groupId> > <artifactId>gson</artifactId> > <version>2.1</version> > </dependency> > > <dependency> > <groupId>commons-cli</groupId> > <artifactId>commons-cli</artifactId> > <version>1.4</version> > </dependency> > > <!-- > https://mvnrepository.com/artifact/org.apache.commons/commons-csv --> > <dependency> > <groupId>org.apache.commons</groupId> > <artifactId>commons-csv</artifactId> > <version>1.7</version> > </dependency> > > <dependency> > <groupId>org.apache.commons</groupId> > <artifactId>commons-compress</artifactId> > <version>1.4.1</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>dynamodb-streams-kinesis-adapter</artifactId> > <version>1.4.0</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>dynamodb-streams-kinesis-adapter</artifactId> > <version>1.4.0</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>aws-java-sdk</artifactId> > <version>1.11.579</version> > </dependency> > > </dependencies> > > </project> > > > On Tue, Jul 16, 2019 at 11:00 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> >>> Hi All, >>> >>> I am trying to convert sql query results value to distinct and writing >>> to CSV which is failing with below error. >>> >>> *Exception in thread "main" org.apache.flink.table.api.TableException: >>> Only tables that originate from Scala DataStreams can be converted to Scala >>> DataStreams.* >>> >>> >>> * at >>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145) >>> at >>> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153) >>> at >>> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)* >>> >>> Code Example:- >>> >>> val data = kinesis.map(mapFunction) >>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") >>> val query = "SELECT distinct >>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >>> FROM transactions where cc_num not in ('cc_num')" >>> val table = tEnv.sqlQuery(query) >>> import org.apache.flink.streaming.api.scala._ >>> tEnv.sqlQuery(query).distinct().toRetractStream[Row] >>> >>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", >>> FileSystem.WriteMode.NO_OVERWRITE,"~","|") >>> >>> >>> >> Thanks & Regards >> Sri Tummala >> >> > > -- > Thanks & Regards > Sri Tummala > >