Thanks Ted ,I will try on this version.



------------------ 原始邮件 ------------------
发件人: "Ted Yu";<yuzhih...@gmail.com>;
发送时间: 2016年1月28日(星期四) 晚上11:35
收件人: "开心延年"<muyann...@qq.com>; 
抄送: "Jörn Franke"<jornfra...@gmail.com>; "Julio Antonio Soto de 
Vicente"<ju...@esbet.es>; "Maciej Bryński"<mac...@brynski.pl>; 
"dev"<dev@spark.apache.org>; 
主题: Re: 回复: Spark 1.6.0 + Hive + HBase



Under sql/hive/src/main/scala/org/apache/spark/sql/hive/execution , I only see 
HiveTableScan and HiveNativeCommandAt the beginning of HiveTableScan :


 * The Hive table scan operator.  Column and partition pruning are both handled.


Looks like filter pushdown hasn't been implemented.


As far as I know, Huawei's Astro can do filter pushdown :


 http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase



On Thu, Jan 28, 2016 at 5:20 AM, 开心延年 <muyann...@qq.com> wrote:
This not hive`s bug .I test hive on my storage is ok. 
but when i test it on spark-sql is not pass TableScanDesc.FILTER_EXPR_CONF_STR 
params;

so that is the reason cause the full scan.

the source code in HiveHBaseTableInputFormat is as follows,that is the reason 
caused full scan.


 private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, 
boolean isKeyBinary)
      throws IOException {

    // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL

    Scan scan = new Scan();
    String filterObjectSerialized = 
jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR);
    if (filterObjectSerialized != null) {
      HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized,
          HBaseScanRange.class);
      try {
        range.setup(scan, jobConf);
      } catch (Exception e) {
        throw new IOException(e);
      }
      return scan;
    }

    String filterExprSerialized = 
jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
    if (filterExprSerialized == null) {
      return scan;
    }

    ExprNodeGenericFuncDesc filterExpr =
      Utilities.deserializeExpression(filterExprSerialized);

    String keyColName = 
jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
    String colType = 
jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey];
    boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string");

    String tsColName = null;
    if (iTimestamp >= 0) {
      tsColName = 
jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp];
    }





------------------ 原始邮件 ------------------
发件人: "Jörn Franke";<jornfra...@gmail.com>;
发送时间: 2016年1月28日(星期四) 晚上9:09
收件人: "开心延年"<muyann...@qq.com>; 
抄送: "Julio Antonio Soto de Vicente"<ju...@esbet.es>; "Maciej 
Bryński"<mac...@brynski.pl>; "Ted Yu"<yuzhih...@gmail.com>; 
"dev"<dev@spark.apache.org>; 
主题: Re: 回复: Spark 1.6.0 + Hive + HBase



Probably a newer Hive version makes a lot of sense here - at least 1.2.1. What 
storage format are you using?
I think the old Hive version had a bug where it always scanned all partitions 
unless you limit it in the on clause of the query to a certain partition (eg on 
date=20201119)

On 28 Jan 2016, at 13:42, 开心延年 <muyann...@qq.com> wrote:



Is there any body can solve Problem 4)? thanks.
Problem 4)
Spark don't push down predicates for HiveTableScan, which means that every 
query is full scan.





------------------ 原始邮件 ------------------
发件人: "Julio Antonio Soto de Vicente";<ju...@esbet.es>;
发送时间: 2016年1月28日(星期四) 晚上8:09
收件人: "Maciej Bryński"<mac...@brynski.pl>; 
抄送: "Ted Yu"<yuzhih...@gmail.com>; "dev"<dev@spark.apache.org>; 
主题: Re: Spark 1.6.0 + Hive + HBase



Hi,


Indeed, Hive is not able to perform predicate pushdown through a HBase table. 
Nor Hive or Impala can.


Broadly speaking, if you need to query your  HBase table through a field other 
than de rowkey:


A) Try to "encode" as much info as possible in the rowkey field and use it as 
your predicate, or
B) Feel free to use other kind of storage system/create coprocessors in order 
to create a secondary index.



El 28 ene 2016, a las 12:56, Maciej Bryński <mac...@brynski.pl> escribió:


Ted,You're right.
hbase-site.xml resolved problems 2 and 3, but...


Problem 4)
Spark don't push down predicates for HiveTableScan, which means that every 
query is full scan.


== Physical Plan == TungstenAggregate(key=[], 
functions=[(count(1),mode=Final,isDistinct=false)], output=[count#144L]) +- 
TungstenExchange SinglePartition, None    +- TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#147L])      
 +- Project          +- Filter (added_date#141L >= 201601280000)             +- 
HiveTableScan [added_date#141L], MetastoreRelation dwh_diagnostics, 
sessions_hbase, None


Is there any magic option to make this work ?


Regards,
Maciek

2016-01-28 10:25 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:

For the last two problems, hbase-site.xml seems not to be on classpath. 


Once hbase-site.xml is put on classpath, you should be able to make progress. 


Cheers

On Jan 28, 2016, at 1:14 AM, Maciej Bryński <mac...@brynski.pl> wrote:


Hi,I'm trying to run SQL query on Hive table which is stored on HBase.
I'm using:

- Spark 1.6.0
- HDP 2.2
- Hive 0.14.0
- HBase 0.98.4


I managed to configure working classpath, but I have following problems:


1) I have UDF defined in Hive Metastore (FUNCS table).
Spark cannot use it.. 


 File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in 
get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o51.sql.
: org.apache.spark.sql.AnalysisException: undefined function 
dwh.str_to_map_int_str; line 55 pos 30
        at 
org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
        at 
org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69)
        at scala.Option.getOrElse(Option.scala:120)
        at 
org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:68)
        at 
org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:64)
        at scala.util.Try.getOrElse(Try.scala:77)
        at 
org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUDFs.scala:64)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574)
        at 
org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:573)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:570)





2) When I'm using SQL without this function Spark tries to connect to Zookeeper 
on localhost.
I make a tunnel from localhost to one of the zookeeper servers but it's not a 
solution.


16/01/28 10:09:18 INFO ZooKeeper: Client 
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
16/01/28 10:09:18 INFO ZooKeeper: Client environment:host.name=j4.jupyter1
16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.version=1.8.0_66
16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.vendor=Oracle 
Corporation
16/01/28 10:09:18 INFO ZooKeeper: Client 
environment:java.home=/usr/lib/jvm/java-8-oracle/jre
16/01/28 10:09:18 INFO ZooKeeper: Client 
environment:java.class.path=/opt/spark/lib/mysql-connector-java-5.1.35-bin.jar:/opt/spark/lib/dwh-hbase-connector.jar:/opt/spark/lib/hive-hbase-handler-1.2.1.spark.jar:/opt/spark/lib/hbase-server.jar:/opt/spark/lib/hbase-common.jar:/opt/spark/lib/dwh-commons.jar:/opt/spark/lib/guava.jar:/opt/spark/lib/hbase-client.jar:/opt/spark/lib/hbase-protocol.jar:/opt/spark/lib/htrace-core.jar:/opt/spark/conf/:/opt/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark/lib/datanucleus-core-3.2.10.jar:/etc/hadoop/conf/
16/01/28 10:09:18 INFO ZooKeeper: Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp
16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.compiler=<NA>
16/01/28 10:09:18 INFO ZooKeeper: Client environment:os.name=Linux
16/01/28 10:09:18 INFO ZooKeeper: Client environment:os.arch=amd64
16/01/28 10:09:18 INFO ZooKeeper: Client 
environment:os.version=3.13.0-24-generic
16/01/28 10:09:18 INFO ZooKeeper: Client environment:user.name=mbrynski
16/01/28 10:09:18 INFO ZooKeeper: Client environment:user.home=/home/mbrynski
16/01/28 10:09:18 INFO ZooKeeper: Client environment:user.dir=/home/mbrynski
16/01/28 10:09:18 INFO ZooKeeper: Initiating client connection, 
connectString=localhost:2181 sessionTimeout=90000 
watcher=hconnection-0x36079f06, quorum=localhost:2181, baseZNode=/hbase
16/01/28 10:09:18 INFO RecoverableZooKeeper: Process 
identifier=hconnection-0x36079f06 connecting to ZooKeeper 
ensemble=localhost:2181
16/01/28 10:09:18 INFO ClientCnxn: Opening socket connection to server 
localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown 
error)
16/01/28 10:09:18 INFO ClientCnxn: Socket connection established to 
localhost/127.0.0.1:2181, initiating session
16/01/28 10:09:18 INFO ClientCnxn: Session establishment complete on server 
localhost/127.0.0.1:2181, sessionid = 0x15254709ed3c8e1, negotiated timeout = 
40000
16/01/28 10:09:18 INFO ZooKeeperRegistry: ClusterId read in ZooKeeper is null





3) After making tunel I'm getting NPE.


Caused by: java.lang.NullPointerException
        at 
org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.getMetaReplicaNodes(ZooKeeperWatcher.java:269)
        at 
org.apache.hadoop.hbase.zookeeper.MetaRegionTracker.blockUntilAvailable(MetaRegionTracker.java:241)
        at 
org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation(ZooKeeperRegistry.java:62)
        at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateMeta(ConnectionManager.java:1203)
        at 
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1164)
        at 
org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:294)
        at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:130)
        at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:55)
        at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:201)
        ... 91 more


Do you have any ideas how to resolve those problems ?


Regards,
-- 
Maciek Bryński
 


 








-- 
Maciek Bryński

Reply via email to