Under sql/hive/src/main/scala/org/apache/spark/sql/hive/execution , I only see HiveTableScan and HiveNativeCommand At the beginning of HiveTableScan :
* The Hive table scan operator. Column and partition pruning are both handled. Looks like filter pushdown hasn't been implemented. As far as I know, Huawei's Astro can do filter pushdown : http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase On Thu, Jan 28, 2016 at 5:20 AM, 开心延年 <muyann...@qq.com> wrote: > This not hive`s bug .I test hive on my storage is ok. > > but when i test it on spark-sql is not pass > TableScanDesc.FILTER_EXPR_CONF_STR params; > > so that is the reason cause the full scan. > > the source code in HiveHBaseTableInputFormat is as follows,that is the > reason caused full scan. > > > private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, > boolean isKeyBinary) > throws IOException { > > // TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL > > Scan scan = new Scan(); > String filterObjectSerialized = > jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); > if (filterObjectSerialized != null) { > HBaseScanRange range = > Utilities.deserializeObject(filterObjectSerialized, > HBaseScanRange.class); > try { > range.setup(scan, jobConf); > } catch (Exception e) { > throw new IOException(e); > } > return scan; > } > > String filterExprSerialized = > jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); > if (filterExprSerialized == null) { > return scan; > } > > ExprNodeGenericFuncDesc filterExpr = > Utilities.deserializeExpression(filterExprSerialized); > > String keyColName = > jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; > String colType = > jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; > boolean isKeyComparable = isKeyBinary || > colType.equalsIgnoreCase("string"); > > String tsColName = null; > if (iTimestamp >= 0) { > tsColName = > jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp]; > } > > > > ------------------ 原始邮件 ------------------ > *发件人:* "Jörn Franke";<jornfra...@gmail.com>; > *发送时间:* 2016年1月28日(星期四) 晚上9:09 > *收件人:* "开心延年"<muyann...@qq.com>; > *抄送:* "Julio Antonio Soto de Vicente"<ju...@esbet.es>; "Maciej Bryński"< > mac...@brynski.pl>; "Ted Yu"<yuzhih...@gmail.com>; "dev"< > dev@spark.apache.org>; > *主题:* Re: 回复: Spark 1.6.0 + Hive + HBase > > Probably a newer Hive version makes a lot of sense here - at least 1.2.1. > What storage format are you using? > I think the old Hive version had a bug where it always scanned all > partitions unless you limit it in the on clause of the query to a certain > partition (eg on date=20201119) > > On 28 Jan 2016, at 13:42, 开心延年 <muyann...@qq.com> wrote: > > > Is there any body can solve Problem 4)? thanks. > Problem 4) > Spark don't push down predicates for HiveTableScan, which means that every > query is full scan. > > > > ------------------ 原始邮件 ------------------ > *发件人:* "Julio Antonio Soto de Vicente";<ju...@esbet.es>; > *发送时间:* 2016年1月28日(星期四) 晚上8:09 > *收件人:* "Maciej Bryński"<mac...@brynski.pl>; > *抄送:* "Ted Yu"<yuzhih...@gmail.com>; "dev"<dev@spark.apache.org>; > *主题:* Re: Spark 1.6.0 + Hive + HBase > > Hi, > > Indeed, Hive is not able to perform predicate pushdown through a HBase > table. Nor Hive or Impala can. > > Broadly speaking, if you need to query your HBase table through a field > other than de rowkey: > > A) Try to "encode" as much info as possible in the rowkey field and use it > as your predicate, or > B) Feel free to use other kind of storage system/create coprocessors in > order to create a secondary index. > > > El 28 ene 2016, a las 12:56, Maciej Bryński <mac...@brynski.pl> escribió: > > Ted, > You're right. > hbase-site.xml resolved problems 2 and 3, but... > > Problem 4) > Spark don't push down predicates for HiveTableScan, which means that every > query is full scan. > > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#144L]) > +- TungstenExchange SinglePartition, None > +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#147L]) > +- Project > +- Filter (added_date#141L >= 201601280000) > +- HiveTableScan [added_date#141L], MetastoreRelation > dwh_diagnostics, sessions_hbase, None > > > Is there any magic option to make this work ? > > Regards, > Maciek > > 2016-01-28 10:25 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: > >> For the last two problems, hbase-site.xml seems not to be on classpath. >> >> Once hbase-site.xml is put on classpath, you should be able to make >> progress. >> >> Cheers >> >> On Jan 28, 2016, at 1:14 AM, Maciej Bryński <mac...@brynski.pl> wrote: >> >> Hi, >> I'm trying to run SQL query on Hive table which is stored on HBase. >> I'm using: >> - Spark 1.6.0 >> - HDP 2.2 >> - Hive 0.14.0 >> - HBase 0.98.4 >> >> I managed to configure working classpath, but I have following problems: >> >> 1) I have UDF defined in Hive Metastore (FUNCS table). >> Spark cannot use it.. >> >> File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line >> 308, in get_return_value >> py4j.protocol.Py4JJavaError: An error occurred while calling o51.sql. >> : org.apache.spark.sql.AnalysisException: undefined function >> dwh.str_to_map_int_str; line 55 pos 30 >> at >> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69) >> at >> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2$$anonfun$1.apply(hiveUDFs.scala:69) >> at scala.Option.getOrElse(Option.scala:120) >> at >> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:68) >> at >> org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:64) >> at scala.util.Try.getOrElse(Try.scala:77) >> at >> org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUDFs.scala:64) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:574) >> at >> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:573) >> at >> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$12$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:570) >> >> >> 2) When I'm using SQL without this function Spark tries to connect to >> Zookeeper on localhost. >> I make a tunnel from localhost to one of the zookeeper servers but it's >> not a solution. >> >> 16/01/28 10:09:18 INFO ZooKeeper: Client >> environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT >> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:host.name >> =j4.jupyter1 >> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.version=1.8.0_66 >> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.vendor=Oracle >> Corporation >> 16/01/28 10:09:18 INFO ZooKeeper: Client >> environment:java.home=/usr/lib/jvm/java-8-oracle/jre >> 16/01/28 10:09:18 INFO ZooKeeper: Client >> environment:java.class.path=/opt/spark/lib/mysql-connector-java-5.1.35-bin.jar:/opt/spark/lib/dwh-hbase-connector.jar:/opt/spark/lib/hive-hbase-handler-1.2.1.spark.jar:/opt/spark/lib/hbase-server.jar:/opt/spark/lib/hbase-common.jar:/opt/spark/lib/dwh-commons.jar:/opt/spark/lib/guava.jar:/opt/spark/lib/hbase-client.jar:/opt/spark/lib/hbase-protocol.jar:/opt/spark/lib/htrace-core.jar:/opt/spark/conf/:/opt/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark/lib/datanucleus-core-3.2.10.jar:/etc/hadoop/conf/ >> 16/01/28 10:09:18 INFO ZooKeeper: Client >> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib >> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp >> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:java.compiler=<NA> >> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:os.name=Linux >> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:os.arch=amd64 >> 16/01/28 10:09:18 INFO ZooKeeper: Client >> environment:os.version=3.13.0-24-generic >> 16/01/28 10:09:18 INFO ZooKeeper: Client environment:user.name=mbrynski >> 16/01/28 10:09:18 INFO ZooKeeper: Client >> environment:user.home=/home/mbrynski >> 16/01/28 10:09:18 INFO ZooKeeper: Client >> environment:user.dir=/home/mbrynski >> 16/01/28 10:09:18 INFO ZooKeeper: Initiating client connection, >> connectString=localhost:2181 sessionTimeout=90000 >> watcher=hconnection-0x36079f06, quorum=localhost:2181, baseZNode=/hbase >> 16/01/28 10:09:18 INFO RecoverableZooKeeper: Process >> identifier=hconnection-0x36079f06 connecting to ZooKeeper >> ensemble=localhost:2181 >> 16/01/28 10:09:18 INFO ClientCnxn: Opening socket connection to server >> localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL >> (unknown error) >> 16/01/28 10:09:18 INFO ClientCnxn: Socket connection established to >> localhost/127.0.0.1:2181, initiating session >> 16/01/28 10:09:18 INFO ClientCnxn: Session establishment complete on >> server localhost/127.0.0.1:2181, sessionid = 0x15254709ed3c8e1, >> negotiated timeout = 40000 >> 16/01/28 10:09:18 INFO ZooKeeperRegistry: ClusterId read in ZooKeeper is >> null >> >> >> 3) After making tunel I'm getting NPE. >> >> Caused by: java.lang.NullPointerException >> at >> org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.getMetaReplicaNodes(ZooKeeperWatcher.java:269) >> at >> org.apache.hadoop.hbase.zookeeper.MetaRegionTracker.blockUntilAvailable(MetaRegionTracker.java:241) >> at >> org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation(ZooKeeperRegistry.java:62) >> at >> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateMeta(ConnectionManager.java:1203) >> at >> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1164) >> at >> org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:294) >> at >> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:130) >> at >> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:55) >> at >> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:201) >> ... 91 more >> >> Do you have any ideas how to resolve those problems ? >> >> Regards, >> -- >> Maciek Bryński >> >> > > > -- > Maciek Bryński > >