Hi Mich, Here you need to understand that the print call does not print the value of a field, it is actually a call to an output to STDOUT sink. So, what you get here is not the value of a variable, please refer to the hequn recommendation.
Thanks, vino. Hequn Cheng <chenghe...@gmail.com> 于2018年8月8日周三 上午9:11写道: > Hi Mich, > > We can't convert a DataStream to a value. There are some options: > 1. Use a TableSink to write data[1] into Hbase. > 2. Use a UDF[2]. > > Best, Hequn > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink > [2] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#user-defined-functions > > On Wed, Aug 8, 2018 at 2:22 AM, Mich Talebzadeh <mich.talebza...@gmail.com > > wrote: > >> I need this operation to stored filtered rows in an Hbase table. >> >> I can access an existing Hbase table through flink API >> >> My challenge is to put rows into Hbase table. Something like below and I >> don't seem to be able to extract individual column values from priceTable >> >> >> >> >> * val key = >> tableEnv.scan("priceTable").select('key).toDataStream[Row].print() >> val ticker = >> tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print() >> val timeissued = >> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print() >> val price = >> tableEnv.scan("priceTable").select('price).toDataStream[Row].print()* >> val CURRENCY = "GBP" >> val op_type = "1" >> val op_time = System.currentTimeMillis.toString >> /* >> if (price > 99.0) >> { >> // Save prices to Hbase table >> var p = new Put(new String(key).getBytes()) >> p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), >> new String(ticker).getBytes()) >> p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new >> String(timeissued).getBytes()) >> p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), >> new String(priceToString).getBytes()) >> p.add("PRICE_INFO".getBytes(), >> "CURRENCY".getBytes(), new String(CURRENCY).getBytes()) >> p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), >> new String(1.toString).getBytes()) >> p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), >> new String(System.currentTimeMillis.toString).getBytes()) >> HbaseTable.put(p) >> HbaseTable.flushCommits() >> if(tableEnv.scan("priceTable").filter('ticker == "VOD" && >> 'price > 99.0)) >> { >> sqltext = Calendar.getInstance.getTime.toString + ", Price >> on "+ticker+" hit " +price.toString >> //java.awt.Toolkit.getDefaultToolkit().beep() >> println(sqltext) >> } >> } >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Hi, >>> >>> The following works fine >>> >>> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, >>> 'timeissued, 'price) >>> val result = tableEnv.scan("priceTable").filter('ticker === "VOD" && >>> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price) >>> val r = result.toDataStream[Row] >>> r.print() >>> >>> Now I would like to get the individual column values from priceTable >>> into local variables >>> >>> This does not seem to work >>> >>> val key = tableEnv.scan("priceTable").select('key).toDataStream[Row] >>> val ticker = >>> tableEnv.scan("priceTable").select('ticker).toDataStream[Row] >>> val timeissued = >>> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row] >>> val price = tableEnv.scan("priceTable").select('price).toDataStream[Row] >>> >>> What alternatives are there? >>> >>> Thanks >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >> >