Hi, I have a Kafka topic that transmits 100 security prices ever 2 seconds.
In Spark streaming I go through the RDD and walk through rows one by one and check prices In prices are valuable I store them into an Hbase table val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) dstream.cache() dstream.foreachRDD { pricesRDD => // Work on individual messages * for(line <- pricesRDD.collect.toArray)* { var key = line._2.split(',').view(0).toString var ticker = line._2.split(',').view(1).toString var timeissued = line._2.split(',').view(2).toString var price = line._2.split(',').view(3).toFloat val priceToString = line._2.split(',').view(3) if (price > 90.0) { //save to Hbase table } } } That works fine. In Flink I define my source as below val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of *for(line <- pricesRDD.collect.toArray)* In flink? Thanks Dr Mich Talebzadeh