is this a Bug in Flink Scala?

Full code and Maven POM:-

package com.aws.examples.kinesis.consumer.TransactionExample

import java.lang
import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants,
ConsumerConfigConstants}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.core.fs.{FileSystem, Path}

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.api.java.tuple
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.sinks.AppendStreamTableSink
import org.apache.flink.table.sinks.RetractStreamTableSink
import org.apache.flink.api.java.DataSet



object KinesisConsumer extends RetractStreamTableSink[Row] {

  override def configure(strings: Array[String], typeInformations:
Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]]
= ???

  override def getFieldNames: Array[String] = ???

  override def getFieldTypes: Array[TypeInformation[_]] = ???

  override def emitDataStream(dataStream:
DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???

  override def getOutputType():
TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType()

  override def getRecordType: TypeInformation[Row] = ???


  def main(args: Array[String]): Unit = {

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.enableCheckpointing(10)

    val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment =
TableEnvironment.getTableEnvironment(env)

    // Get AWS credentials
    val credentialsProvider = new DefaultAWSCredentialsProviderChain
    val credentials = credentialsProvider.getCredentials

    // Configure Flink Kinesis consumer
    val consumerConfig = new Properties
    consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
    consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
    consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

    // Create Kinesis stream
    val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))

    val mapFunction: MapFunction[String, Tuple10[String, String,
String, String, String, String, String, String, String, String]] =
      new MapFunction[String, Tuple10[String, String, String, String,
String, String, String, String, String, String]]() {

        override def map(s: String): Tuple10[String, String, String,
String, String, String, String, String, String, String] = {

          val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

          val csvData = data.getCc_num + "," +
            data.getFirst + "," +
            data.getLast + "," +
            data.getTrans_num + "," +
            data.getTrans_time + "," +
            data.getCategory + "," +
            data.getMerchant + "," +
            data.getAmt + "," +
            data.getMerch_lat + "," +
            data.getMerch_long

          //println(csvData)

          val p: Array[String] = csvData.split(",")
          var cc_num: String = p(0)
          var first: String = p(1)
          var last: String = p(2)
          var trans_num: String = p(3)
          var trans_time: String = p(4)
          var category: String = p(5)
          var merchant: String = p(6)
          var amt: String = p(7)
          var merch_lat: String = p(8)
          var merch_long: String = p(9)

          val creationDate: Time = new Time(System.currentTimeMillis())
          return new Tuple10(cc_num, first, last, trans_num,
trans_time, category, merchant, amt, merch_lat, merch_long)
        }
      }

    val data = kinesis.map(mapFunction)
    tEnv.registerDataStream("transactions", data,
"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
    val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
    val table = tEnv.sqlQuery(query)
    import org.apache.flink.streaming.api.scala._
    tEnv.sqlQuery(query).distinct().toRetractStream[Row]
      
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
        FileSystem.WriteMode.NO_OVERWRITE, "~", "|")

    env.execute()
  }
}

*POM:-*

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <modelVersion>4.0.0</modelVersion>

    <groupId>FlinkStreamAndSql</groupId>
    <artifactId>FlinkStreamAndSql</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <!-- see http://davidb.github.com/scala-maven-plugin -->
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.3</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.13</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <!-- If you have classpath issue like
NoDefClassError,... -->
                    <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <!-- "package" command plugin -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4.1</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.derby</groupId>
            <artifactId>derby</artifactId>
            <version>10.13.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.8.1</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>1.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.8.1</version>
        </dependency>

       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-scala_2.11</artifactId>
           <version>1.8.1</version>
       </dependency>

       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-streaming-scala_2.11</artifactId>
           <version>1.8.1</version>
       </dependency>

               <dependency>
                   <groupId>org.apache.flink</groupId>
                   <artifactId>flink-connector-kinesis_2.11</artifactId>
                   <version>1.8.0</version>
               </dependency>

               <dependency>
                   <groupId>org.apache.flink</groupId>
                   <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                   <version>1.8.1</version>
               </dependency>

               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>amazon-kinesis-client</artifactId>
                   <version>1.8.8</version>
               </dependency>

               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-kinesis</artifactId>
                   <version>1.11.579</version>
               </dependency>

               <dependency>
                   <groupId>commons-dbcp</groupId>
                   <artifactId>commons-dbcp</artifactId>
                   <version>1.2.2</version>
               </dependency>
               <dependency>
                   <groupId>com.google.code.gson</groupId>
                   <artifactId>gson</artifactId>
                   <version>2.1</version>
               </dependency>

               <dependency>
                   <groupId>commons-cli</groupId>
                   <artifactId>commons-cli</artifactId>
                   <version>1.4</version>
               </dependency>

               <!--
https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-csv</artifactId>
            <version>1.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-compress</artifactId>
            <version>1.4.1</version>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk</artifactId>
            <version>1.11.579</version>
        </dependency>

    </dependencies>

</project>


On Tue, Jul 16, 2019 at 11:00 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

>
>> Hi All,
>>
>> I am trying to convert sql query results value to distinct and writing to
>> CSV which is failing with below error.
>>
>> *Exception in thread "main" org.apache.flink.table.api.TableException:
>> Only tables that originate from Scala DataStreams can be converted to Scala
>> DataStreams.*
>>
>>
>> * at
>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
>> at
>> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
>> at
>> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)*
>>
>> Code Example:-
>>
>> val data = kinesis.map(mapFunction)
>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>> val query = "SELECT distinct 
>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>  FROM transactions where cc_num not in ('cc_num')"
>> val table = tEnv.sqlQuery(query)
>> import org.apache.flink.streaming.api.scala._
>> tEnv.sqlQuery(query).distinct().toRetractStream[Row]
>>   
>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
>>     FileSystem.WriteMode.NO_OVERWRITE,"~","|")
>>
>>
>>
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to