A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2]. You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.
However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3]. That's not very difficult since the APIs are integrated with each other. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: > Thanks Fabian. That was very useful. > > How about an operation like below? > > // create builder > val KafkaTableSource = Kafka011JsonTableSource.builder() > // set Kafka topic > .forTopic(topicsValue) > // set Kafka consumer properties > .withKafkaProperties(properties) > // set Table schema > .withSchema(TableSchema.builder() > .field("key", Types.STRING) > .field("ticker", Types.STRING) > .field("timeissued", Types.STRING) > .field("price", Types.FLOAT) > .build()) > > Will that be OK? > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> Flink processes streams record by record, instead of micro-batching >> records together. Since every record comes by itself, there is no for-each. >> Simple record-by-record transformations can be done with a MapFunction, >> filtering out records with a FilterFunction. You can also implement a >> FlatMapFunction to do both in one step. >> >> Once the stream is transformed and filtered, you can write it to HBase >> with a sink function. >> >> >> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: >> >>> Just to clarify these are the individual prices separated by ','. The >>> below shows three price lines in the topic >>> >>> UUID, Security, Time, Price >>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88 >>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94 >>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33 >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <mich.talebza...@gmail.com> >>> wrote: >>> >>>> >>>> Hi, >>>> >>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds. >>>> >>>> In Spark streaming I go through the RDD and walk through rows one by >>>> one and check prices >>>> In prices are valuable I store them into an Hbase table >>>> >>>> val dstream = KafkaUtils.createDirectStream[String, String, >>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) >>>> dstream.cache() >>>> dstream.foreachRDD >>>> { pricesRDD => >>>> // Work on individual messages >>>> * for(line <- pricesRDD.collect.toArray)* >>>> { >>>> var key = line._2.split(',').view(0).toString >>>> var ticker = line._2.split(',').view(1).toString >>>> var timeissued = line._2.split(',').view(2).toString >>>> var price = line._2.split(',').view(3).toFloat >>>> val priceToString = line._2.split(',').view(3) >>>> if (price > 90.0) >>>> { >>>> //save to Hbase table >>>> } >>>> } >>>> } >>>> >>>> That works fine. >>>> >>>> In Flink I define my source as below >>>> >>>> val streamExecEnv = StreamExecutionEnvironment. >>>> getExecutionEnvironment >>>> streamExecEnv.setStreamTimeCharacteristic( >>>> TimeCharacteristic.EventTime) >>>> val stream = streamExecEnv >>>> .addSource(new FlinkKafkaConsumer011[String](topicsValue, new >>>> SimpleStringSchema(), properties)) >>>> >>>> Is there anyway I can perform similar operation in Flink? I need to go >>>> through every topic load sent and look at the individual rows/ For example >>>> what is the equivalent of >>>> >>>> *for(line <- pricesRDD.collect.toArray)* >>>> In flink? >>>> >>>> Thanks >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>> >>