Hi, Mich: You can add write a sink function for that. On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> > Hi, > > I have a Kafka topic that transmits 100 security prices ever 2 seconds. > > In Spark streaming I go through the RDD and walk through rows one by one > and check prices > In prices are valuable I store them into an Hbase table > > val dstream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) > dstream.cache() > dstream.foreachRDD > { pricesRDD => > // Work on individual messages > * for(line <- pricesRDD.collect.toArray)* > { > var key = line._2.split(',').view(0).toString > var ticker = line._2.split(',').view(1).toString > var timeissued = line._2.split(',').view(2).toString > var price = line._2.split(',').view(3).toFloat > val priceToString = line._2.split(',').view(3) > if (price > 90.0) > { > //save to Hbase table > } > } > } > > That works fine. > > In Flink I define my source as below > > val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val stream = streamExecEnv > .addSource(new FlinkKafkaConsumer011[String](topicsValue, new > SimpleStringSchema(), properties)) > > Is there anyway I can perform similar operation in Flink? I need to go > through every topic load sent and look at the individual rows/ For example > what is the equivalent of > > *for(line <- pricesRDD.collect.toArray)* > In flink? > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > -- Liu, Renjie Software Engineer, MVAD