Hi, Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each. Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.
Once the stream is transformed and filtered, you can write it to HBase with a sink function. 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: > Just to clarify these are the individual prices separated by ','. The > below shows three price lines in the topic > > UUID, Security, Time, Price > 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88 > 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94 > 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33 > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> >> Hi, >> >> I have a Kafka topic that transmits 100 security prices ever 2 seconds. >> >> In Spark streaming I go through the RDD and walk through rows one by one >> and check prices >> In prices are valuable I store them into an Hbase table >> >> val dstream = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) >> dstream.cache() >> dstream.foreachRDD >> { pricesRDD => >> // Work on individual messages >> * for(line <- pricesRDD.collect.toArray)* >> { >> var key = line._2.split(',').view(0).toString >> var ticker = line._2.split(',').view(1).toString >> var timeissued = line._2.split(',').view(2).toString >> var price = line._2.split(',').view(3).toFloat >> val priceToString = line._2.split(',').view(3) >> if (price > 90.0) >> { >> //save to Hbase table >> } >> } >> } >> >> That works fine. >> >> In Flink I define my source as below >> >> val streamExecEnv = StreamExecutionEnvironment. >> getExecutionEnvironment >> streamExecEnv.setStreamTimeCharacteristic( >> TimeCharacteristic.EventTime) >> val stream = streamExecEnv >> .addSource(new FlinkKafkaConsumer011[String](topicsValue, new >> SimpleStringSchema(), properties)) >> >> Is there anyway I can perform similar operation in Flink? I need to go >> through every topic load sent and look at the individual rows/ For example >> what is the equivalent of >> >> *for(line <- pricesRDD.collect.toArray)* >> In flink? >> >> Thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >