(At the end of your code)

> On 8. Aug 2018, at 00:29, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> Hi Mich,
> 
> Would it be possible to share the full source code ?
> I am missing a call to streamExecEnvironment.execute
> 
> Best regards 
> 
>> On 8. Aug 2018, at 00:02, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
>> 
>> Hi Fabian,
>> 
>> Reading your notes above I have converted the table back to DataStream.
>> 
>>     val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>     tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 
>> 'timeissued, 'price)
>> 
>>            val key = 
>> tableEnv.scan("priceTable").select('key).toDataStream[Row]
>>            val ticker = 
>> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>>            val timeissued = 
>> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>>            val price = 
>> tableEnv.scan("priceTable").select('price).toDataStream[Row]
>> 
>> My intension is to create an Hbase sink as follows:
>> 
>>             // Save prices to Hbase table
>>              var p = new Put(new String(key).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          
>> new String(ticker).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new 
>> String(timeissued).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           
>> new String(priceToString).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         
>> new String(CURRENCY).getBytes())
>>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new 
>> String(1.toString).getBytes())
>>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new 
>> String(System.currentTimeMillis.toString).getBytes())
>>              HbaseTable.put(p)
>>              HbaseTable.flushCommits()
>> 
>> However, I don't seem to be able to get the correct values for the columns!
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>>> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <fhue...@gmail.com> wrote:
>>> A *Table*Source [1], is a special input connector for Flink's relational 
>>> APIs (Table API and SQL) [2].
>>> You can transform and filter with these APIs as well (it's probably even 
>>> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>>> 
>>> However, there is no *Table*Sink for HBase and you would need to convert 
>>> the Table back to a DataStream [3].
>>> That's not very difficult since the APIs are integrated with each other.
>>> 
>>> Best, Fabian
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>>> [3] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>> 
>>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>>> Thanks Fabian. That was very useful.
>>>> 
>>>> How about an operation like below?
>>>> 
>>>>          // create builder
>>>>          val KafkaTableSource = Kafka011JsonTableSource.builder()
>>>>          // set Kafka topic
>>>>             .forTopic(topicsValue)
>>>>          // set Kafka consumer properties
>>>>             .withKafkaProperties(properties)
>>>>          // set Table schema
>>>>         .withSchema(TableSchema.builder()
>>>>         .field("key", Types.STRING)
>>>>         .field("ticker", Types.STRING)
>>>>         .field("timeissued", Types.STRING)
>>>>         .field("price", Types.FLOAT)
>>>>         .build())
>>>> 
>>>> Will that be OK? 
>>>> 
>>>> 
>>>> Dr Mich Talebzadeh
>>>>  
>>>> LinkedIn  
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>  
>>>> http://talebzadehmich.wordpress.com
>>>> 
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>> loss, damage or destruction of data or any other property which may arise 
>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>> The author will in no case be liable for any monetary damages arising from 
>>>> such loss, damage or destruction.
>>>>  
>>>> 
>>>> 
>>>>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> Flink processes streams record by record, instead of micro-batching 
>>>>> records together. Since every record comes by itself, there is no 
>>>>> for-each.
>>>>> Simple record-by-record transformations can be done with a MapFunction, 
>>>>> filtering out records with a FilterFunction. You can also implement a 
>>>>> FlatMapFunction to do both in one step.
>>>>> 
>>>>> Once the stream is transformed and filtered, you can write it to HBase 
>>>>> with a sink function.
>>>>> 
>>>>> 
>>>>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>>>>> Just to clarify these are the individual prices separated by ','. The 
>>>>>> below shows three price lines in the topic
>>>>>> 
>>>>>> UUID,                            Security,         Time,        Price
>>>>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>>>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>>>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>>>> 
>>>>>> 
>>>>>> Dr Mich Talebzadeh
>>>>>>  
>>>>>> LinkedIn  
>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>  
>>>>>> http://talebzadehmich.wordpress.com
>>>>>> 
>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>>>> loss, damage or destruction of data or any other property which may 
>>>>>> arise from relying on this email's technical content is explicitly 
>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>> damages arising from such loss, damage or destruction.
>>>>>>  
>>>>>> 
>>>>>> 
>>>>>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
>>>>>>> <mich.talebza...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds. 
>>>>>>> 
>>>>>>> In Spark streaming I go through the RDD and walk through rows one by 
>>>>>>> one and check prices
>>>>>>> In prices are valuable I store them into an Hbase table
>>>>>>> 
>>>>>>>     val dstream = KafkaUtils.createDirectStream[String, String, 
>>>>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>>>>>>     dstream.cache()
>>>>>>>     dstream.foreachRDD
>>>>>>>     { pricesRDD =>
>>>>>>>           // Work on individual messages
>>>>>>>          for(line <- pricesRDD.collect.toArray)
>>>>>>>          {
>>>>>>>            var key = line._2.split(',').view(0).toString
>>>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>>>             if (price > 90.0)
>>>>>>>            {
>>>>>>>                //save to Hbase table
>>>>>>>            }
>>>>>>>           }
>>>>>>>      }
>>>>>>> 
>>>>>>> That works fine. 
>>>>>>> 
>>>>>>> In Flink I define my source as below
>>>>>>> 
>>>>>>>     val streamExecEnv = 
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>     
>>>>>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>>     val stream = streamExecEnv
>>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
>>>>>>> SimpleStringSchema(), properties))
>>>>>>> 
>>>>>>> Is there anyway I can perform similar operation in Flink? I need to go 
>>>>>>> through every topic load sent and look at the individual rows/ For 
>>>>>>> example what is the equivalent of 
>>>>>>> 
>>>>>>> for(line <- pricesRDD.collect.toArray)
>>>>>>> In flink?
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Dr Mich Talebzadeh
>>>>>>>  
>>>>>>> LinkedIn  
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>  
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>> 
>>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>>>>> loss, damage or destruction of data or any other property which may 
>>>>>>> arise from relying on this email's technical content is explicitly 
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages arising from such loss, damage or destruction.
>>>>>>>  
>>>>> 
>>> 

Reply via email to