Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds.

In Spark streaming I go through the RDD and walk through rows one by one
and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages
      *   for(line <- pricesRDD.collect.toArray)*
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)
           {
               //save to Hbase table
           }
          }
     }

That works fine.

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go
through every topic load sent and look at the individual rows/ For example
what is the equivalent of

*for(line <- pricesRDD.collect.toArray)*
In flink?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Reply via email to