(At the end of your code)
> On 8. Aug 2018, at 00:29, Jörn Franke <jornfra...@gmail.com> wrote: > > Hi Mich, > > Would it be possible to share the full source code ? > I am missing a call to streamExecEnvironment.execute > > Best regards > >> On 8. Aug 2018, at 00:02, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: >> >> Hi Fabian, >> >> Reading your notes above I have converted the table back to DataStream. >> >> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, >> 'timeissued, 'price) >> >> val key = >> tableEnv.scan("priceTable").select('key).toDataStream[Row] >> val ticker = >> tableEnv.scan("priceTable").select('ticker).toDataStream[Row] >> val timeissued = >> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row] >> val price = >> tableEnv.scan("priceTable").select('price).toDataStream[Row] >> >> My intension is to create an Hbase sink as follows: >> >> // Save prices to Hbase table >> var p = new Put(new String(key).getBytes()) >> p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), >> new String(ticker).getBytes()) >> p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new >> String(timeissued).getBytes()) >> p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), >> new String(priceToString).getBytes()) >> p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), >> new String(CURRENCY).getBytes()) >> p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new >> String(1.toString).getBytes()) >> p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new >> String(System.currentTimeMillis.toString).getBytes()) >> HbaseTable.put(p) >> HbaseTable.flushCommits() >> >> However, I don't seem to be able to get the correct values for the columns! >> >> Dr Mich Talebzadeh >> >> LinkedIn >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >> http://talebzadehmich.wordpress.com >> >> Disclaimer: Use it at your own risk. Any and all responsibility for any >> loss, damage or destruction of data or any other property which may arise >> from relying on this email's technical content is explicitly disclaimed. The >> author will in no case be liable for any monetary damages arising from such >> loss, damage or destruction. >> >> >> >>> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <fhue...@gmail.com> wrote: >>> A *Table*Source [1], is a special input connector for Flink's relational >>> APIs (Table API and SQL) [2]. >>> You can transform and filter with these APIs as well (it's probably even >>> easier). In SQL this would be the SELECT and WHERE clauses of a query. >>> >>> However, there is no *Table*Sink for HBase and you would need to convert >>> the Table back to a DataStream [3]. >>> That's not very difficult since the APIs are integrated with each other. >>> >>> Best, Fabian >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset >>> >>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: >>>> Thanks Fabian. That was very useful. >>>> >>>> How about an operation like below? >>>> >>>> // create builder >>>> val KafkaTableSource = Kafka011JsonTableSource.builder() >>>> // set Kafka topic >>>> .forTopic(topicsValue) >>>> // set Kafka consumer properties >>>> .withKafkaProperties(properties) >>>> // set Table schema >>>> .withSchema(TableSchema.builder() >>>> .field("key", Types.STRING) >>>> .field("ticker", Types.STRING) >>>> .field("timeissued", Types.STRING) >>>> .field("price", Types.FLOAT) >>>> .build()) >>>> >>>> Will that be OK? >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> LinkedIn >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>>> loss, damage or destruction of data or any other property which may arise >>>> from relying on this email's technical content is explicitly disclaimed. >>>> The author will in no case be liable for any monetary damages arising from >>>> such loss, damage or destruction. >>>> >>>> >>>> >>>>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fhue...@gmail.com> wrote: >>>>> Hi, >>>>> >>>>> Flink processes streams record by record, instead of micro-batching >>>>> records together. Since every record comes by itself, there is no >>>>> for-each. >>>>> Simple record-by-record transformations can be done with a MapFunction, >>>>> filtering out records with a FilterFunction. You can also implement a >>>>> FlatMapFunction to do both in one step. >>>>> >>>>> Once the stream is transformed and filtered, you can write it to HBase >>>>> with a sink function. >>>>> >>>>> >>>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>: >>>>>> Just to clarify these are the individual prices separated by ','. The >>>>>> below shows three price lines in the topic >>>>>> >>>>>> UUID, Security, Time, Price >>>>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88 >>>>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94 >>>>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33 >>>>>> >>>>>> >>>>>> Dr Mich Talebzadeh >>>>>> >>>>>> LinkedIn >>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> >>>>>> http://talebzadehmich.wordpress.com >>>>>> >>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>>>>> loss, damage or destruction of data or any other property which may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>> damages arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh >>>>>>> <mich.talebza...@gmail.com> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds. >>>>>>> >>>>>>> In Spark streaming I go through the RDD and walk through rows one by >>>>>>> one and check prices >>>>>>> In prices are valuable I store them into an Hbase table >>>>>>> >>>>>>> val dstream = KafkaUtils.createDirectStream[String, String, >>>>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) >>>>>>> dstream.cache() >>>>>>> dstream.foreachRDD >>>>>>> { pricesRDD => >>>>>>> // Work on individual messages >>>>>>> for(line <- pricesRDD.collect.toArray) >>>>>>> { >>>>>>> var key = line._2.split(',').view(0).toString >>>>>>> var ticker = line._2.split(',').view(1).toString >>>>>>> var timeissued = line._2.split(',').view(2).toString >>>>>>> var price = line._2.split(',').view(3).toFloat >>>>>>> val priceToString = line._2.split(',').view(3) >>>>>>> if (price > 90.0) >>>>>>> { >>>>>>> //save to Hbase table >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> That works fine. >>>>>>> >>>>>>> In Flink I define my source as below >>>>>>> >>>>>>> val streamExecEnv = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment >>>>>>> >>>>>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>>>> val stream = streamExecEnv >>>>>>> .addSource(new FlinkKafkaConsumer011[String](topicsValue, new >>>>>>> SimpleStringSchema(), properties)) >>>>>>> >>>>>>> Is there anyway I can perform similar operation in Flink? I need to go >>>>>>> through every topic load sent and look at the individual rows/ For >>>>>>> example what is the equivalent of >>>>>>> >>>>>>> for(line <- pricesRDD.collect.toArray) >>>>>>> In flink? >>>>>>> >>>>>>> Thanks >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> LinkedIn >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>>>>>> loss, damage or destruction of data or any other property which may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages arising from such loss, damage or destruction. >>>>>>> >>>>> >>>