Hi ,

I am trying to write flink table to streaming Sink it fails at casting Java
to Scala or Scala to Java, it fails at below step can anyone help me out ?
about this error.


    val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new
Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
      new SimpleStringEncoder[Row]("UTF-8")).build()

    table.addSink(sink2)


package com.aws.examples.kinesis.consumer.TransactionExample

import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants,
ConsumerConfigConstants}
import org.apache.flink.table.api.{Table, TableEnvironment}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.java.io.jdbc
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.java._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.sinks.TableSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.core.fs.Path

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.streaming.api.functions.sink.SinkFunction

object KinesisConsumer {

  def main(args: Array[String]): Unit = {

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.createLocalEnvironment
    //env.enableCheckpointing(10)

    val tEnv = TableEnvironment.getTableEnvironment(env)

    // Get AWS credentials
    val credentialsProvider = new DefaultAWSCredentialsProviderChain
    val credentials = credentialsProvider.getCredentials

    // Configure Flink Kinesis consumer
    val consumerConfig = new Properties
    consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
    consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
    consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

    // Create Kinesis stream
    val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))

    val mapFunction: MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]] =
      new MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]]() {

        override def map(s: String): Tuple10[String, String,
String,String,String,String,String,String,String,String] = {
          val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

          val csvData = data.getCc_num+","+
            data.getFirst+","+
            data.getLast+","+
            data.getTrans_num+","+
            data.getTrans_time+","+
            data.getCategory+","+
            data.getMerchant+","+
            data.getAmt+","+
            data.getMerch_lat+","+
            data.getMerch_long

          //println(csvData)

          val p:Array[String] = csvData.split(",")
          var cc_num:String = p(0)
          var first:String = p(1)
          var last:String = p(2)
          var trans_num:String = p(3)
          var trans_time:String = p(4)
          var category:String = p(5)
          var merchant:String = p(6)
          var amt:String = p(7)
          var merch_lat:String = p(8)
          var merch_long:String = p(9)

          val creationDate: Time = new Time(System.currentTimeMillis())
          return new Tuple10(cc_num, first,
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
        }
      }

    val data = kinesis.map(mapFunction)

    //data.print()

    
tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

    val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
    val table = tEnv.sqlQuery(query)

    //println(table.toString())

    //val test = new CsvCustomSink("")

    val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new
Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
      new SimpleStringEncoder[Row]("UTF-8")).build()

    table.addSink(sink2)

    env.execute()

  }

}



-- 
Thanks & Regards
Sri Tummala



-- 
Thanks & Regards
Sri Tummala

Reply via email to