Hi All,

I am trying to read data from kinesis stream and applying SQL
transformation (distinct) and then tryting to write to CSV sink which is
failinf due to this issue (org.apache.flink.table.api.TableException:
AppendStreamTableSink requires that Table has only insert changes.) , full
code is here (
https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
).

can anyone help me moveforward on this issue?

Full Code:-

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]]() {

    override def map(s: String): Tuple10[String, String,
String,String,String,String,String,String,String,String] = {
      val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

      val csvData = data.getCc_num+","+
        data.getFirst+","+
        data.getLast+","+
        data.getTrans_num+","+
        data.getTrans_time+","+
        data.getCategory+","+
        data.getMerchant+","+
        data.getAmt+","+
        data.getMerch_lat+","+
        data.getMerch_long

      //println(csvData)

      val p:Array[String] = csvData.split(",")
      var cc_num:String = p(0)
      var first:String = p(1)
      var last:String = p(2)
      var trans_num:String = p(3)
      var trans_time:String = p(4)
      var category:String = p(5)
      var merchant:String = p(6)
      var amt:String = p(7)
      var merch_lat:String = p(8)
      var merch_long:String = p(9)

      val creationDate: Time = new Time(System.currentTimeMillis())
      return new Tuple10(cc_num, first,
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
    }
  }

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

val table1 = table.distinct()

tEnv.registerTable("fromAnotherTable",table1)

table.printSchema()

val csvSink:TableSink[Row]  = new
CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
val fieldNames:Array[String]              =
Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
val fieldTypes:Array[TypeInformation[_]]  = Array(
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING
)

tEnv.registerTableSink("s3csvTargetTransaction", fieldNames,
fieldTypes, csvSink)

tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
from fromAnotherTable")


-- 
Thanks & Regards
Sri Tummala

Reply via email to