Hi Mich,

Here you need to understand that the print call does not print the value of
a field, it is actually a call to an output to STDOUT sink.
So, what you get here is not the value of a variable, please refer to the
hequn recommendation.

Thanks, vino.

Hequn Cheng <chenghe...@gmail.com> 于2018年8月8日周三 上午9:11写道:

> Hi Mich,
>
> We can't convert a DataStream to a value. There are some options:
> 1. Use a TableSink to write data[1] into Hbase.
> 2. Use a UDF[2].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#user-defined-functions
>
> On Wed, Aug 8, 2018 at 2:22 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> I need this operation to stored filtered rows in an Hbase table.
>>
>> I can access an existing Hbase table through flink API
>>
>> My challenge is to put rows into Hbase table. Something like below and I
>> don't seem to be able to extract individual column values from priceTable
>>
>>
>>
>>
>> *                    val key =
>> tableEnv.scan("priceTable").select('key).toDataStream[Row].print()
>> val ticker =
>> tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print()
>> val timeissued =
>> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print()
>> val price =
>> tableEnv.scan("priceTable").select('price).toDataStream[Row].print()*
>>            val CURRENCY = "GBP"
>>            val op_type = "1"
>>            val op_time = System.currentTimeMillis.toString
>> /*
>>            if (price > 99.0)
>>            {
>>             // Save prices to Hbase table
>>              var p = new Put(new String(key).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
>> new String(ticker).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new
>> String(timeissued).getBytes())
>>              p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
>> new String(priceToString).getBytes())
>>              p.add("PRICE_INFO".getBytes(),
>> "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
>>              p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
>> new String(1.toString).getBytes())
>>              p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
>> new String(System.currentTimeMillis.toString).getBytes())
>>              HbaseTable.put(p)
>>              HbaseTable.flushCommits()
>>              if(tableEnv.scan("priceTable").filter('ticker == "VOD" &&
>> 'price > 99.0))
>>              {
>>                sqltext = Calendar.getInstance.getTime.toString + ", Price
>> on "+ticker+" hit " +price.toString
>>                //java.awt.Toolkit.getDefaultToolkit().beep()
>>                println(sqltext)
>>              }
>>            }
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> The following works fine
>>>
>>>    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
>>> 'timeissued, 'price)
>>>     val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
>>> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
>>>     val r = result.toDataStream[Row]
>>>     r.print()
>>>
>>> Now I would like to get the individual column values from priceTable
>>> into local variables
>>>
>>> This does not seem to work
>>>
>>> val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
>>> val ticker =
>>> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>>>  val timeissued =
>>> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>>>  val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
>>>
>>> What alternatives are there?
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
>

Reply via email to