Question 1:-

I did tired map function end up having issue (
https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i
)

I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am
getting this error asking me for InferedR , what is InferedR in FLink?

  val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
    new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
      override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
        t.f1
      }
      /*override def map(t: tuple.Tuple2[Boolean, Row], collector:
Collector[Object]): Unit = {
        collector.collect(t.f1)
      }
    */
}

tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).map(mymapFunction)
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
  FileSystem.WriteMode.OVERWRITE,"\n","|")

and when I try to I get a different type of error.




*Error:(143, 74) type mismatch; found   :
org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
required:
org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
  tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).map(mymapFunction)*

*Question 2:- *
*I dont have any source data issue, to regenerate this issue for testing
its simple.*

*create a kinesis stream *
*run the producer *
https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala

then run the consumer:-
https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala

Thanks
Sri







On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi Sri,
>
> Question1:
> You can use a map to filter the "true", i.e, ds.map(_._2).
> Note, it's ok to remove the "true" flag for distinct as it does not
> generate updates. For other query contains updates, such as a non-window
> group by, we should not filter the flag or the result is not correct.
>
> Question 2:
> I can't reproduce this problem in my local environment. Maybe there is
> something wrong with the source data?
>
> Best, Hequn
>
> On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> windows for question 1 or question 2 or both ?
>>
>> Thanks
>> Sri
>>
>> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <taher...@gmail.com>
>> wrote:
>>
>>> Looks like you need a window
>>>
>>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am trying to write toRetractSream to CSV which is kind of working ok
>>>> but I get extra values like True and then my output data values.
>>>>
>>>> Question1 :-
>>>> I dont want true in my output data how to achieve this?
>>>>
>>>> Scree
>>>>
>>>> Question 2:-
>>>> in the output file (CSV) I am missing data in the last line is the
>>>> toRetractStram closing before writing to file?
>>>>
>>>> Screen Shot attached
>>>>
>>>> Code:-
>>>>
>>>> val data = kinesis.map(mapFunction)
>>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>>> val query = "SELECT distinct 
>>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>>>  FROM transactions where cc_num not in ('cc_num')"
>>>> val table = tEnv.sqlQuery(query)
>>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
>>>>   
>>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>>>>     FileSystem.WriteMode.OVERWRITE,"\n","|")
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala

Reply via email to