yes even the delimiter can be replaced, have to test what happens if the
data itself has a comma in it I need to test.
table.toRetractStream(TypeInformation.of(classOf[Row]))
.map(_._2.toString.replaceAll(",","~"))
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",
FileSystem.WriteMode.OVERWRITE)
On Wed, Jul 17, 2019 at 6:47 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:
> Amazing all issues resolved in one go thanks Cheng , one issue though I
> can't write map.(_._2) to CSV looks like it doesn't support right now have
> to be TextFile.
>
> below is a full code if someone wants in Scala.
>
> Git Code is here:-
> https://github.com/kali786516/FlinkStreamAndSql
>
> package com.aws.examples.kinesis.consumer.transactionExampleScala
>
> import java.util.Properties
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import
> com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass
> import com.google.gson.Gson
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.scala.{DataStream,
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import
> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants,
> ConsumerConfigConstants}
> import org.apache.flink.api.scala._
> import org.apache.flink.table.api.scala._
> import java.sql.{DriverManager, Time}
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.core.fs.{FileSystem, Path}
>
> object TransactionScalaTest {
>
> /*
> extends RetractStreamTableSink[Row]
> override def configure(strings: Array[String], typeInformations:
> Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???
>
> override def getFieldNames: Array[String] = ???
>
> override def getFieldTypes: Array[TypeInformation[_]] = ???
>
> override def emitDataStream(dataStream:
> DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
>
> override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]]
> = super.getOutputType
>
> override def getRecordType: TypeInformation[Row] = ???
>
> */
>
> def main(args: Array[String]): Unit = {
>
>
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> //env.enableCheckpointing(10000)
>
> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new
> FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(),
> consumerConfig))
>
> val mapFunction: MapFunction[String, (String, String, String, String,
> String, String, String, String, String, String)] =
> new MapFunction[String, (String, String, String, String, String,
> String, String, String, String, String)]() {
>
> override def map(s: String): (String, String, String, String, String,
> String, String, String, String, String) = {
>
> val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
> val csvData = data.getCc_num + "," +
> data.getFirst + "," +
> data.getLast + "," +
> data.getTrans_num + "," +
> data.getTrans_time + "," +
> data.getCategory + "," +
> data.getMerchant + "," +
> data.getAmt + "," +
> data.getMerch_lat + "," +
> data.getMerch_long
>
> //println(csvData)
>
> val p: Array[String] = csvData.split(",")
> var cc_num: String = p(0)
> var first: String = p(1)
> var last: String = p(2)
> var trans_num: String = p(3)
> var trans_time: String = p(4)
> var category: String = p(5)
> var merchant: String = p(6)
> var amt: String = p(7)
> var merch_lat: String = p(8)
> var merch_long: String = p(9)
>
> val creationDate:Time = new Time(System.currentTimeMillis())
> return (cc_num, first, last, trans_num, trans_time, category,
> merchant, amt, merch_lat, merch_long)
> }
> }
>
>
> val data = kinesis.map(mapFunction)
>
> tEnv.registerDataStream("transactions", data,
> 'cc_num,'first_column,'last_column,'trans_num,
>
> 'trans_time,'category_column,'merchant_column,'amt_column,'merch_lat,'merch_long)
> //tEnv.registerDataStream("transactions", data,
> "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
> val query = "SELECT distinct
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
> FROM transactions where cc_num not in ('cc_num') "
> val table = tEnv.sqlQuery(query)
>
> table
> .toRetractStream(TypeInformation.of(classOf[Row]))
> .map(_._2)
>
> .writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE)
>
> table.printSchema()
>
> table.toRetractStream(TypeInformation.of(classOf[Row])).print()
>
> env.execute()
>
> /*
>
>
> table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
> FileSystem.WriteMode.OVERWRITE,
> "\n","|")
>
> val test =
> table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2)
>
>
> test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE)
>
>
> test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
> FileSystem.WriteMode.OVERWRITE,
> "\n","|")
>
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.api.common.typeinfo.TypeInformation
> implicit val typeInfo = TypeInformation.of(classOf[Row])
>
> val ds = table.toRetractStream(TypeInformation.of(classOf[Row]))
>
>
> ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE,
> "\n","|")
>
> tEnv.toRetractStream(table,
> TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",
> FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")
>
>
> table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
> "\n","|")
>
> import org.apache.flink.api.common.time.Time
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.api.common.typeinfo.TypeInformation
>
> implicit val typeInfo = TypeInformation.of(classOf[Row])
>
>
> table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
> FileSystem.WriteMode.OVERWRITE, "\n", "|")
>
>
> table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
> FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")
>
> ds.
>
> writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
> FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")
>
>
> tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))
>
> tEnv.toRetractStream(table)
>
> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
> FileSystem.WriteMode.OVERWRITE, "\n", "|")
>
> tEnv.toRetractStream(table,classOf[T])
>
> */
>
> }
>
> }
>
>
>
>
>
>
>
> On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Question 1:-
>>
>> I did tired map function end up having issue (
>> https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i
>> )
>>
>> I am trying to convert a Tuple[Boolean,Row] to Row using map function, I
>> am getting this error asking me for InferedR , what is InferedR in FLink?
>>
>> val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
>> new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
>> override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
>> t.f1
>> }
>> /*override def map(t: tuple.Tuple2[Boolean, Row], collector:
>> Collector[Object]): Unit = {
>> collector.collect(t.f1)
>> }
>> */
>> }
>>
>> tEnv.toRetractStream(table,
>> classOf[org.apache.flink.types.Row]).map(mymapFunction)
>>
>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>> FileSystem.WriteMode.OVERWRITE,"\n","|")
>>
>> and when I try to I get a different type of error.
>>
>>
>>
>>
>> *Error:(143, 74) type mismatch; found :
>> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
>> required:
>> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
>> tEnv.toRetractStream(table,
>> classOf[org.apache.flink.types.Row]).map(mymapFunction)*
>>
>> *Question 2:- *
>> *I dont have any source data issue, to regenerate this issue for testing
>> its simple.*
>>
>> *create a kinesis stream *
>> *run the producer *
>>
>> https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala
>>
>> then run the consumer:-
>>
>> https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala
>>
>> Thanks
>> Sri
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <chenghe...@gmail.com>
>> wrote:
>>
>>> Hi Sri,
>>>
>>> Question1:
>>> You can use a map to filter the "true", i.e, ds.map(_._2).
>>> Note, it's ok to remove the "true" flag for distinct as it does not
>>> generate updates. For other query contains updates, such as a non-window
>>> group by, we should not filter the flag or the result is not correct.
>>>
>>> Question 2:
>>> I can't reproduce this problem in my local environment. Maybe there is
>>> something wrong with the source data?
>>>
>>> Best, Hequn
>>>
>>> On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> windows for question 1 or question 2 or both ?
>>>>
>>>> Thanks
>>>> Sri
>>>>
>>>> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <taher...@gmail.com>
>>>> wrote:
>>>>
>>>>> Looks like you need a window
>>>>>
>>>>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
>>>>> kali.tumm...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am trying to write toRetractSream to CSV which is kind of working
>>>>>> ok but I get extra values like True and then my output data values.
>>>>>>
>>>>>> Question1 :-
>>>>>> I dont want true in my output data how to achieve this?
>>>>>>
>>>>>> Scree
>>>>>>
>>>>>> Question 2:-
>>>>>> in the output file (CSV) I am missing data in the last line is the
>>>>>> toRetractStram closing before writing to file?
>>>>>>
>>>>>> Screen Shot attached
>>>>>>
>>>>>> Code:-
>>>>>>
>>>>>> val data = kinesis.map(mapFunction)
>>>>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>>>>> val query = "SELECT distinct
>>>>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>>>>> FROM transactions where cc_num not in ('cc_num')"
>>>>>> val table = tEnv.sqlQuery(query)
>>>>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
>>>>>>
>>>>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>>>>>> FileSystem.WriteMode.OVERWRITE,"\n","|")
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards
>>>>>> Sri Tummala
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>
--
Thanks & Regards
Sri Tummala