Hi, Mich:
You can add write a sink function for that.

On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Hi,
>
> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>
> In Spark streaming I go through the RDD and walk through rows one by one
> and check prices
> In prices are valuable I store them into an Hbase table
>
>     val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>     dstream.cache()
>     dstream.foreachRDD
>     { pricesRDD =>
>           // Work on individual messages
>       *   for(line <- pricesRDD.collect.toArray)*
>          {
>            var key = line._2.split(',').view(0).toString
>            var ticker =  line._2.split(',').view(1).toString
>            var timeissued = line._2.split(',').view(2).toString
>            var price = line._2.split(',').view(3).toFloat
>            val priceToString = line._2.split(',').view(3)
>             if (price > 90.0)
>            {
>                //save to Hbase table
>            }
>           }
>      }
>
> That works fine.
>
> In Flink I define my source as below
>
>     val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val stream = streamExecEnv
>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>
> Is there anyway I can perform similar operation in Flink? I need to go
> through every topic load sent and look at the individual rows/ For example
> what is the equivalent of
>
> *for(line <- pricesRDD.collect.toArray)*
> In flink?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
-- 
Liu, Renjie
Software Engineer, MVAD

Reply via email to