Hey Yana,
Sorry for the late reply, missed this important thread somehow. And many
thanks for reporting this. It turned out to be a bug — filter pushdown
is only enabled when using client side metadata, which is not expected,
because task side metadata code path is more performant. And I guess
that the reason why setting |parquet.task.side.metadata| to |false|
didn’t reduce input size for you is because you set the configuration
with Spark API, or put it into |spark-defaults.conf|. This configuration
goes to Hadoop |Configuration|, and Spark only merge properties whose
names start with |spark.hadoop| into Hadoop |Configuration| instances.
You may try to put |parquet.task.side.metadata| config into Hadoop
|core-site.xml|, and then re-run the query. I can see significant
differences by doing so.
I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for
reporting all the details!
Cheng
On 1/13/15 12:56 PM, Yana Kadiyska wrote:
Attempting to bump this up in case someone can help out after all. I
spent a few good hours stepping through the code today, so I'll
summarize my observations both in hope I get some help and to help
others that might be looking into this:
1. I am setting *spark.sql.parquet.**filterPushdown=true*
2. I can see by stepping through the driver debugger that
PaquetTableOperations.execute sets the filters via
ParquetInputFormat.setFilterPredicate (I checked the conf object,
things appear OK there)
3. In FilteringParquetRowInputFormat, I get through the codepath for
getTaskSideSplits. It seems that the codepath for getClientSideSplits
would try to drop rowGroups but I don't see similar in getTaskSideSplit.
Does anyone have pointers on where to look after this? Where is
rowgroup filtering happening in the case of getTaskSideSplits? I can
attach to the executor but am not quite sure what code related to
Parquet gets called executor side...also don't see any messages in the
executor logs related to Filtering predicates.
For comparison, I went through the getClientSideSplits and can see
that predicate pushdown works OK:
|sc.hadoopConfiguration.set("parquet.task.side.metadata","false")
15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side
Metadata Split Strategy
15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch,
1417384800)
15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups
that do not pass filter predicate (28 %) !
|
Is it possible that this is just a UI bug? I can see Input=4G when
using ("parquet.task.side.metadata","false") and Input=140G when using
("parquet.task.side.metadata","true") but the runtimes are very
comparable?
Inline image 1
JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.
On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska <yana.kadiy...@gmail.com
<mailto:yana.kadiy...@gmail.com>> wrote:
I am running the following (connecting to an external Hive Metastore)
/a/shark/spark/bin/spark-shell --master spark://ip:7077 --conf
*spark.sql.parquet.filterPushdown=true*
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
and then ran two queries:
|sqlContext.sql("select count(*) from table where partition='blah' ")
and
sqlContext.sql("select count(*) from table where partition='blah' and
epoch=1415561604")
|
According to the Input tab in the UI both scan about 140G of data
which is the size of my whole partition. So I have two questions --
1. is there a way to tell from the plan if a predicate pushdown is
supposed to happen?
I see this for the second query
|res0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
Exchange SinglePartition
Aggregate true, [], [COUNT(1) AS PartialCount#49L]
OutputFaker []
Project []
ParquetTableScan [epoch#139L], (ParquetRelation <list of hdfs files>
|
2. am I doing something obviously wrong that this is not working?
(Im guessing it's not woring because the input size for the second
query shows unchanged and the execution time is almost 2x as long)
thanks in advance for any insights