I found a workaround, when I create Hive Table using Spark “saveAsTable”, I see filters being pushed down.
-> other approaches I tried where filters are not pushed down Is, 1) when I create Hive Table upfront and load orc into it using Spark SQL 2) when I create orc files using spark SQL and then create Hive External Table If my understanding is correct, when I use saveAsTable spark is using & also registering Hive Metastore with its custom Serde and Is able to pushdown filters. Please correct me. Another question, When i am writing Orc to hive using “saveAsTable”, is there any way I can provide details about Orc Files. for instance: stripe.size, can i create bloom filters etc… Regards Shiv > On Oct 25, 2017, at 1:37 AM, Jörn Franke <jornfra...@gmail.com> wrote: > > Well the meta information is in the file so I am not surprised that it reads > the file, but it should not read all the content, which is probably also not > happening. > > On 24. Oct 2017, at 18:16, Siva Gudavalli <gudavalli.s...@yahoo.com.INVALID > <mailto:gudavalli.s...@yahoo.com.INVALID>> wrote: > >> >> Hello, >> >> I have an update here. >> >> spark SQL is pushing predicates down, if I load the orc files in spark >> Context and Is not the same when I try to read hive Table directly. >> please let me know if i am missing something here. >> >> Is this supported in spark ? >> >> when I load the files in spark Context >> scala> val hhhhhlogsv5 = >> sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5") >> 17/10/24 16:11:15 INFO OrcRelation: Listing >> maprfs:///user/hive/warehouse/hhhhhlogsv5 on driver >> 17/10/24 16:11:15 INFO OrcRelation: Listing >> maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003 on driver >> 17/10/24 16:11:15 INFO OrcRelation: Listing >> maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others on >> driver >> 17/10/24 16:11:15 INFO OrcRelation: Listing >> maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers >> on driver >> hhhhhlogsv5: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: >> string, br: string, rg: string, cat: int, scat: int, usr: string, org: >> string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: >> int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string] >> scala> hhhhhlogsv5.registerTempTable("tempo") >> scala> sqlContext.sql ( "selecT id from tempo where cdt=20171003 and >> usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain >> 17/10/24 16:11:22 INFO ParseDriver: Parsing command: selecT id from tempo >> where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id >> desc limit 10 >> 17/10/24 16:11:22 INFO ParseDriver: Parse Completed >> 17/10/24 16:11:22 INFO DataSourceStrategy: Selected 1 partitions out of 1, >> pruned 0.0% partitions. >> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6 stored as values in >> memory (estimated size 164.5 KB, free 468.0 KB) >> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes >> in memory (estimated size 18.3 KB, free 486.4 KB) >> 17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory >> on 172.21.158.61:43493 (size: 18.3 KB, free: 511.4 MB) >> 17/10/24 16:11:22 INFO SparkContext: Created broadcast 6 from explain at >> <console>:33 >> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7 stored as values in >> memory (estimated size 170.2 KB, free 656.6 KB) >> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes >> in memory (estimated size 18.8 KB, free 675.4 KB) >> 17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory >> on 172.21.158.61:43493 (size: 18.8 KB, free: 511.4 MB) >> 17/10/24 16:11:22 INFO SparkContext: Created broadcast 7 from explain at >> <console>:33 >> == Physical Plan == >> TakeOrderedAndProject(limit=10, orderBy=[id#145 DESC], output=[id#145]) >> +- ConvertToSafe >> +- Project [id#145] >> +- Filter (usr#152 = AA0YP) >> +- Scan OrcRelation[id#145,usr#152] InputPaths: >> maprfs:///user/hive/warehouse/hhhhhlogsv5, PushedFilters: >> [EqualTo(usr,AA0YP)] >> >> when i read this as hive Table >> >> scala> sqlContext.sql ( "selecT id from hhhhhlogsv5 where cdt=20171003 and >> usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain >> 17/10/24 16:11:32 INFO ParseDriver: Parsing command: selecT id from >> hhhhhlogsv5 where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' >> order by id desc limit 10 >> 17/10/24 16:11:32 INFO ParseDriver: Parse Completed >> 17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8 stored as values in >> memory (estimated size 399.1 KB, free 1074.6 KB) >> 17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes >> in memory (estimated size 42.7 KB, free 1117.2 KB) >> 17/10/24 16:11:32 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory >> on 172.21.158.61:43493 (size: 42.7 KB, free: 511.4 MB) >> 17/10/24 16:11:32 INFO SparkContext: Created broadcast 8 from explain at >> <console>:33 >> == Physical Plan == >> TakeOrderedAndProject(limit=10, orderBy=[id#192 DESC], output=[id#192]) >> +- ConvertToSafe >> +- Project [id#192] >> +- Filter (usr#199 = AA0YP) >> +- HiveTableScan [id#192,usr#199], MetastoreRelation default, hhhhhlogsv5, >> None, [(cdt#189 = 20171003),(usrpartkey#191 = hhhUsers)] >> >> >> please let me know if i am missing anything here. thank you >> >> >> On Monday, October 23, 2017 1:56 PM, Siva Gudavalli <gss.su...@gmail.com >> <mailto:gss.su...@gmail.com>> wrote: >> >> >> Hello, >> >> I am working with Spark SQL to query Hive Managed Table (in Orc Format) >> >> I have my data organized by partitions and asked to set indexes for each >> 50,000 Rows by setting ('orc.row.index.stride'='50000') >> >> lets say -> after evaluating partition there are around 50 files in which >> data is organized. >> >> Each file contains data specific to one given "cat" and I have set up a >> bloom filter on cat. >> >> my spark SQL query looks like this -> >> >> select * from logs where cdt= 20171002 and catpartkey= others and >> usrpartkey= logUsers and cat = 24; >> >> I have set following property in my spark Sql context and assuming this will >> push down the filters >> sqlContext.setConf("spark.sql.orc.filterPushdown", "true") >> >> Never my filters are being pushed down. and it seems like partition pruning >> is happening on all files. I dont understand no matter what my query is, it >> is triggering 50 tasks and reading all files. >> >> Here is my debug logs -> >> >> 17/10/23 17:26:43 DEBUG Inode: >Inode Open file: >> /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, >> size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362 >> 17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate >> 17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null >> 17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from >> maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0 >> with {include: [true, true, false, false, false, false, true, false, false, >> false, false, false, false, false, false, false, false, false], offset: 0, >> length: 9223372036854775807} >> 17/10/23 17:26:43 DEBUG MapRClient: Open: path = >> /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0 >> 17/10/23 17:26:43 DEBUG Inode: >Inode Open file: >> /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, >> size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362 >> 17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: >> 15790993, range start: 21131541 end: 21146035] >> 17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, >> 15790993), size: 15723309 type: array-backed, data range [21131541, >> 21146035), size: 14494 type: array-backed] >> 17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not >> initiated, use thread based class loader instead >> 17/10/23 17:26:43 DEBUG HadoopTableReader: >> org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive..serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5> >> 17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, >> IntegerType] = 27)': >> >> and here is my execution plan >> == Parsed Logical Plan == >> 'Limit 1000 >> +- 'Sort ['id DESC], true >> +- 'Project [unresolvedalias('id)] >> +- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey = >> logUsers)) && ('cat = 27)) >> +- 'UnresolvedRelation `auditlogsv5`, None >> == Analyzed Logical Plan == >> id: string >> Limit 1000 >> +- Sort [id#165 DESC], true >> +- Project [id#165] >> +- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && >> (usrpartkey#164 = logUsers)) && (cat#170 = 27)) >> +- MetastoreRelation default, auditlogsv5, None >> == Optimized Logical Plan == >> Limit 1000 >> +- Sort [id#165 DESC], true >> +- Project [id#165] >> +- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && >> (usrpartkey#164 = logUsers)) && (cat#170 = 27)) >> +- MetastoreRelation default, auditlogsv5, None >> == Physical Plan == >> TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165]) >> +- ConvertToSafe >> +- Project [id#165] >> +- Filter (cat#170 = 27) >> +- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, >> [(cdt#162 = 20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)] >> >> >> Am I missing something here. I am on spark 1.6.1 and hive 1.2.0 >> >> please correct me. Thank you >> >>