Hi,

Flink processes streams record by record, instead of micro-batching records
together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction,
filtering out records with a FilterFunction. You can also implement a
FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with
a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:

> Just to clarify these are the individual prices separated by ','. The
> below shows three price lines in the topic
>
> UUID,                            Security,         Time,        Price
> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>>
>> Hi,
>>
>> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>>
>> In Spark streaming I go through the RDD and walk through rows one by one
>> and check prices
>> In prices are valuable I store them into an Hbase table
>>
>>     val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>     dstream.cache()
>>     dstream.foreachRDD
>>     { pricesRDD =>
>>           // Work on individual messages
>>       *   for(line <- pricesRDD.collect.toArray)*
>>          {
>>            var key = line._2.split(',').view(0).toString
>>            var ticker =  line._2.split(',').view(1).toString
>>            var timeissued = line._2.split(',').view(2).toString
>>            var price = line._2.split(',').view(3).toFloat
>>            val priceToString = line._2.split(',').view(3)
>>             if (price > 90.0)
>>            {
>>                //save to Hbase table
>>            }
>>           }
>>      }
>>
>> That works fine.
>>
>> In Flink I define my source as below
>>
>>     val streamExecEnv = StreamExecutionEnvironment.
>> getExecutionEnvironment
>>     streamExecEnv.setStreamTimeCharacteristic(
>> TimeCharacteristic.EventTime)
>>     val stream = streamExecEnv
>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>
>> Is there anyway I can perform similar operation in Flink? I need to go
>> through every topic load sent and look at the individual rows/ For example
>> what is the equivalent of
>>
>> *for(line <- pricesRDD.collect.toArray)*
>> In flink?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Reply via email to