Hi, I have a Kafka topic that transmits 100 security prices ever 2 seconds.
In Spark streaming I go through the RDD and walk through rows one by one and check prices In prices are valuable I store them into an Hbase table val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) dstream.cache() dstream.foreachRDD { pricesRDD => // Work on individual messages * for(line <- pricesRDD.collect.toArray)* { var key = line._2.split(',').view(0).toString var ticker = line._2.split(',').view(1).toString var timeissued = line._2.split(',').view(2).toString var price = line._2.split(',').view(3).toFloat val priceToString = line._2.split(',').view(3) if (price > 90.0) { //save to Hbase table } } } That works fine. In Flink I define my source as below val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of *for(line <- pricesRDD.collect.toArray)* In flink? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.