Amazing all issues resolved in one go thanks Cheng , one issue though I
can't write map.(_._2) to CSV looks like it doesn't support right now have
to be TextFile.

below is a full code if someone wants in Scala.

Git Code is here:-
https://github.com/kali786516/FlinkStreamAndSql

package com.aws.examples.kinesis.consumer.transactionExampleScala

import java.util.Properties
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass
import com.google.gson.Gson
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants,
ConsumerConfigConstants}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import java.sql.{DriverManager, Time}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.{FileSystem, Path}

object TransactionScalaTest {

  /*
  extends RetractStreamTableSink[Row]
  override def configure(strings: Array[String], typeInformations:
Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]]
= ???

  override def getFieldNames: Array[String] = ???

  override def getFieldTypes: Array[TypeInformation[_]] = ???

  override def emitDataStream(dataStream:
DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???

  override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean,
Row]] = super.getOutputType

  override def getRecordType: TypeInformation[Row] = ???

   */

  def main(args: Array[String]): Unit = {



    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.enableCheckpointing(10000)

    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // Get AWS credentials
    val credentialsProvider = new DefaultAWSCredentialsProviderChain
    val credentials = credentialsProvider.getCredentials

    // Configure Flink Kinesis consumer
    val consumerConfig = new Properties
    consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
    consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
    consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

    // Create Kinesis stream
    val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(),
consumerConfig))

    val mapFunction: MapFunction[String, (String, String, String,
String, String, String, String, String, String, String)] =
      new MapFunction[String, (String, String, String, String, String,
String, String, String, String, String)]() {

        override def map(s: String): (String, String, String, String,
String, String, String, String, String, String) = {

          val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

          val csvData = data.getCc_num + "," +
            data.getFirst + "," +
            data.getLast + "," +
            data.getTrans_num + "," +
            data.getTrans_time + "," +
            data.getCategory + "," +
            data.getMerchant + "," +
            data.getAmt + "," +
            data.getMerch_lat + "," +
            data.getMerch_long

          //println(csvData)

          val p: Array[String] = csvData.split(",")
          var cc_num: String = p(0)
          var first: String = p(1)
          var last: String = p(2)
          var trans_num: String = p(3)
          var trans_time: String = p(4)
          var category: String = p(5)
          var merchant: String = p(6)
          var amt: String = p(7)
          var merch_lat: String = p(8)
          var merch_long: String = p(9)

          val creationDate:Time = new Time(System.currentTimeMillis())
          return (cc_num, first, last, trans_num, trans_time,
category, merchant, amt, merch_lat, merch_long)
        }
      }


    val data = kinesis.map(mapFunction)

    tEnv.registerDataStream("transactions", data,
'cc_num,'first_column,'last_column,'trans_num,
      
'trans_time,'category_column,'merchant_column,'amt_column,'merch_lat,'merch_long)
    //tEnv.registerDataStream("transactions", data,
"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
    val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num') "
    val table = tEnv.sqlQuery(query)

    table
      .toRetractStream(TypeInformation.of(classOf[Row]))
      .map(_._2)
      
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE)

    table.printSchema()

    table.toRetractStream(TypeInformation.of(classOf[Row])).print()

    env.execute()

    /*

    
table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
      FileSystem.WriteMode.OVERWRITE,
      "\n","|")

    val test = table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2)

    
test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE)

    
test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
      FileSystem.WriteMode.OVERWRITE,
      "\n","|")

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.api.common.typeinfo.TypeInformation
    implicit val typeInfo = TypeInformation.of(classOf[Row])

    val ds = table.toRetractStream(TypeInformation.of(classOf[Row]))

    
ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE,
    "\n","|")

    tEnv.toRetractStream(table,
TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",
      FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

    
table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
      "\n","|")

    import org.apache.flink.api.common.time.Time
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.api.common.typeinfo.TypeInformation

    implicit val typeInfo = TypeInformation.of(classOf[Row])

    
table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
      FileSystem.WriteMode.OVERWRITE, "\n", "|")

    
table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
      FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

    ds.
      
writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
        FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

    tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))

    tEnv.toRetractStream(table)
      
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
        FileSystem.WriteMode.OVERWRITE, "\n", "|")

    tEnv.toRetractStream(table,classOf[T])

    */

  }

}







On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Question 1:-
>
> I did tired map function end up having issue (
> https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i
> )
>
> I am trying to convert a Tuple[Boolean,Row] to Row using map function, I
> am getting this error asking me for InferedR , what is InferedR in FLink?
>
>   val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
>     new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
>       override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
>         t.f1
>       }
>       /*override def map(t: tuple.Tuple2[Boolean, Row], collector: 
> Collector[Object]): Unit = {
>         collector.collect(t.f1)
>       }
>     */
> }
>
> tEnv.toRetractStream(table, 
> classOf[org.apache.flink.types.Row]).map(mymapFunction)
>   
> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>   FileSystem.WriteMode.OVERWRITE,"\n","|")
>
> and when I try to I get a different type of error.
>
>
>
>
> *Error:(143, 74) type mismatch; found   :
> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
>  required:
> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
>   tEnv.toRetractStream(table,
> classOf[org.apache.flink.types.Row]).map(mymapFunction)*
>
> *Question 2:- *
> *I dont have any source data issue, to regenerate this issue for testing
> its simple.*
>
> *create a kinesis stream *
> *run the producer *
>
> https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala
>
> then run the consumer:-
>
> https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala
>
> Thanks
> Sri
>
>
>
>
>
>
>
> On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi Sri,
>>
>> Question1:
>> You can use a map to filter the "true", i.e, ds.map(_._2).
>> Note, it's ok to remove the "true" flag for distinct as it does not
>> generate updates. For other query contains updates, such as a non-window
>> group by, we should not filter the flag or the result is not correct.
>>
>> Question 2:
>> I can't reproduce this problem in my local environment. Maybe there is
>> something wrong with the source data?
>>
>> Best, Hequn
>>
>> On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> windows for question 1 or question 2 or both ?
>>>
>>> Thanks
>>> Sri
>>>
>>> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <taher...@gmail.com>
>>> wrote:
>>>
>>>> Looks like you need a window
>>>>
>>>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
>>>> kali.tumm...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am trying to write toRetractSream to CSV which is kind of working ok
>>>>> but I get extra values like True and then my output data values.
>>>>>
>>>>> Question1 :-
>>>>> I dont want true in my output data how to achieve this?
>>>>>
>>>>> Scree
>>>>>
>>>>> Question 2:-
>>>>> in the output file (CSV) I am missing data in the last line is the
>>>>> toRetractStram closing before writing to file?
>>>>>
>>>>> Screen Shot attached
>>>>>
>>>>> Code:-
>>>>>
>>>>> val data = kinesis.map(mapFunction)
>>>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>>>> val query = "SELECT distinct 
>>>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>>>>  FROM transactions where cc_num not in ('cc_num')"
>>>>> val table = tEnv.sqlQuery(query)
>>>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
>>>>>   
>>>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>>>>>     FileSystem.WriteMode.OVERWRITE,"\n","|")
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards
>>>>> Sri Tummala
>>>>>
>>>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to