Hi Mich, Would it be possible to share the full source code ? I am missing a call to streamExecEnvironment.execute
Best regards > On 8. Aug 2018, at 00:02, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > Hi Fabian, > > Reading your notes above I have converted the table back to DataStream. > > val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) > tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, > 'timeissued, 'price) > > val key = > tableEnv.scan("priceTable").select('key).toDataStream[Row] > val ticker = > tableEnv.scan("priceTable").select('ticker).toDataStream[Row] > val timeissued = > tableEnv.scan("priceTable").select('timeissued).toDataStream[Row] > val price = > tableEnv.scan("priceTable").select('price).toDataStream[Row] > > My intension is to create an Hbase sink as follows: > > // Save prices to Hbase table > var p = new Put(new String(key).getBytes()) > p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), new > String(ticker).getBytes()) > p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new > String(timeissued).getBytes()) > p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), new > String(priceToString).getBytes()) > p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), > new String(CURRENCY).getBytes()) > p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new > String(1.toString).getBytes()) > p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new > String(System.currentTimeMillis.toString).getBytes()) > HbaseTable.put(p) > HbaseTable.flushCommits() > > However, I don't seem to be able to get the correct values for the columns! > > Dr Mich Talebzadeh > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > http://talebzadehmich.wordpress.com > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. > > > >> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <fhue...@gmail.com> wrote: >> A *Table*Source [1], is a special input connector for Flink's relational >> APIs (Table API and SQL) [2]. >> You can transform and filter with these APIs as well (it's probably even >> easier). In SQL this would be the SELECT and WHERE clauses of a query. >> >> However, there is no *Table*Sink for HBase and you would need to convert the >> Table back to a DataStream [3]. >> That's not very difficult since the APIs are integrated with each other. >> >> Best, Fabian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset >> >> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: >>> Thanks Fabian. That was very useful. >>> >>> How about an operation like below? >>> >>> // create builder >>> val KafkaTableSource = Kafka011JsonTableSource.builder() >>> // set Kafka topic >>> .forTopic(topicsValue) >>> // set Kafka consumer properties >>> .withKafkaProperties(properties) >>> // set Table schema >>> .withSchema(TableSchema.builder() >>> .field("key", Types.STRING) >>> .field("ticker", Types.STRING) >>> .field("timeissued", Types.STRING) >>> .field("price", Types.FLOAT) >>> .build()) >>> >>> Will that be OK? >>> >>> >>> Dr Mich Talebzadeh >>> >>> LinkedIn >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> >>> http://talebzadehmich.wordpress.com >>> >>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>> loss, damage or destruction of data or any other property which may arise >>> from relying on this email's technical content is explicitly disclaimed. >>> The author will in no case be liable for any monetary damages arising from >>> such loss, damage or destruction. >>> >>> >>> >>>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fhue...@gmail.com> wrote: >>>> Hi, >>>> >>>> Flink processes streams record by record, instead of micro-batching >>>> records together. Since every record comes by itself, there is no for-each. >>>> Simple record-by-record transformations can be done with a MapFunction, >>>> filtering out records with a FilterFunction. You can also implement a >>>> FlatMapFunction to do both in one step. >>>> >>>> Once the stream is transformed and filtered, you can write it to HBase >>>> with a sink function. >>>> >>>> >>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: >>>>> Just to clarify these are the individual prices separated by ','. The >>>>> below shows three price lines in the topic >>>>> >>>>> UUID, Security, Time, Price >>>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88 >>>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94 >>>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33 >>>>> >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> LinkedIn >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>>>> loss, damage or destruction of data or any other property which may arise >>>>> from relying on this email's technical content is explicitly disclaimed. >>>>> The author will in no case be liable for any monetary damages arising >>>>> from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh >>>>>> <mich.talebza...@gmail.com> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds. >>>>>> >>>>>> In Spark streaming I go through the RDD and walk through rows one by one >>>>>> and check prices >>>>>> In prices are valuable I store them into an Hbase table >>>>>> >>>>>> val dstream = KafkaUtils.createDirectStream[String, String, >>>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) >>>>>> dstream.cache() >>>>>> dstream.foreachRDD >>>>>> { pricesRDD => >>>>>> // Work on individual messages >>>>>> for(line <- pricesRDD.collect.toArray) >>>>>> { >>>>>> var key = line._2.split(',').view(0).toString >>>>>> var ticker = line._2.split(',').view(1).toString >>>>>> var timeissued = line._2.split(',').view(2).toString >>>>>> var price = line._2.split(',').view(3).toFloat >>>>>> val priceToString = line._2.split(',').view(3) >>>>>> if (price > 90.0) >>>>>> { >>>>>> //save to Hbase table >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> That works fine. >>>>>> >>>>>> In Flink I define my source as below >>>>>> >>>>>> val streamExecEnv = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>> >>>>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>>> val stream = streamExecEnv >>>>>> .addSource(new FlinkKafkaConsumer011[String](topicsValue, new >>>>>> SimpleStringSchema(), properties)) >>>>>> >>>>>> Is there anyway I can perform similar operation in Flink? I need to go >>>>>> through every topic load sent and look at the individual rows/ For >>>>>> example what is the equivalent of >>>>>> >>>>>> for(line <- pricesRDD.collect.toArray) >>>>>> In flink? >>>>>> >>>>>> Thanks >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> LinkedIn >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>>>>> loss, damage or destruction of data or any other property which may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>> damages arising from such loss, damage or destruction. >>>>>> >>>> >>