Cool, thanks for your help on this. Any chance of adding it to the 1.1.1 point release, assuming there ends up being one?
On Wed, Sep 10, 2014 at 11:39 AM, Michael Armbrust <mich...@databricks.com> wrote: > Hey Cody, > > Thanks for doing this! Will look at your PR later today. > > Michael > > On Wed, Sep 10, 2014 at 9:31 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Tested the patch against a cluster with some real data. Initial results >> seem like going from one table to a union of 2 tables is now closer to a >> doubling of query time as expected, instead of 5 to 10x. >> >> Let me know if you see any issues with that PR. >> >> On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> So the obvious thing I was missing is that the analyzer has already >>> resolved attributes by the time the optimizer runs, so the references in >>> the filter / projection need to be fixed up to match the children. >>> >>> Created a PR, let me know if there's a better way to do it. I'll see >>> about testing performance against some actual data sets. >>> >>> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Ok, so looking at the optimizer code for the first time and trying the >>>> simplest rule that could possibly work, >>>> >>>> object UnionPushdown extends Rule[LogicalPlan] { >>>> def apply(plan: LogicalPlan): LogicalPlan = plan transform { >>>> // Push down filter into >>>> union >>>> case f @ Filter(condition, u @ Union(left, right)) => >>>> >>>> u.copy(left = f.copy(child = left), right = f.copy(child = >>>> right)) >>>> >>>> >>>> // Push down projection into >>>> union >>>> case p @ Project(projectList, u @ Union(left, right)) => >>>> u.copy(left = p.copy(child = left), right = p.copy(child = >>>> right)) >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> If I try manually applying that rule to a logical plan in the repl, it >>>> produces the query shape I'd expect, and executing that plan results in >>>> parquet pushdowns as I'd expect. >>>> >>>> But adding those cases to ColumnPruning results in a runtime exception >>>> (below) >>>> >>>> I can keep digging, but it seems like I'm missing some obvious initial >>>> context around naming of attributes. If you can provide any pointers to >>>> speed me on my way I'd appreciate it. >>>> >>>> >>>> java.lang.AssertionError: assertion failed: ArrayBuffer() + >>>> ArrayBuffer() != WrappedArray(name#6, age#7), List(name#9, age#10, >>>> phones#11) >>>> at scala.Predef$.assert(Predef.scala:179) >>>> at >>>> org.apache.spark.sql.parquet.ParquetTableScan.<init>(ParquetTableOperations.scala:75) >>>> at >>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234) >>>> at >>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234) >>>> at >>>> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367) >>>> at >>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230) >>>> at >>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) >>>> at >>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) >>>> at >>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>> at >>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) >>>> at >>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) >>>> at >>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282) >>>> at >>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>> at scala.collection.immutable.List.foreach(List.scala:318) >>>> at >>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>> at >>>> scala.collection.AbstractTraversable.map(Traversable.scala:105) >>>> at >>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282) >>>> at >>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) >>>> at >>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) >>>> at >>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>> at >>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) >>>> at >>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402) >>>> at >>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400) >>>> at >>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406) >>>> at >>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406) >>>> at >>>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431) >>>> >>>> >>>> >>>> >>>> On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> What Patrick said is correct. Two other points: >>>>> - In the 1.2 release we are hoping to beef up the support for working >>>>> with partitioned parquet independent of the metastore. >>>>> - You can actually do operations like INSERT INTO for parquet tables >>>>> to add data. This creates new parquet files for each insertion. This >>>>> will >>>>> break if there are multiple concurrent writers to the same table. >>>>> >>>>> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell <pwend...@gmail.com> >>>>> wrote: >>>>> >>>>>> I think what Michael means is people often use this to read existing >>>>>> partitioned Parquet tables that are defined in a Hive metastore rather >>>>>> than data generated directly from within Spark and then reading it >>>>>> back as a table. I'd expect the latter case to become more common, but >>>>>> for now most users connect to an existing metastore. >>>>>> >>>>>> I think you could go this route by creating a partitioned external >>>>>> table based on the on-disk layout you create. The downside is that >>>>>> you'd have to go through a hive metastore whereas what you are doing >>>>>> now doesn't need hive at all. >>>>>> >>>>>> We should also just fix the case you are mentioning where a union is >>>>>> used directly from within spark. But that's the context. >>>>>> >>>>>> - Patrick >>>>>> >>>>>> On Tue, Sep 9, 2014 at 12:01 PM, Cody Koeninger <c...@koeninger.org> >>>>>> wrote: >>>>>> > Maybe I'm missing something, I thought parquet was generally a >>>>>> write-once >>>>>> > format and the sqlContext interface to it seems that way as well. >>>>>> > >>>>>> > d1.saveAsParquetFile("/foo/d1") >>>>>> > >>>>>> > // another day, another table, with same schema >>>>>> > d2.saveAsParquetFile("/foo/d2") >>>>>> > >>>>>> > Will give a directory structure like >>>>>> > >>>>>> > /foo/d1/_metadata >>>>>> > /foo/d1/part-r-1.parquet >>>>>> > /foo/d1/part-r-2.parquet >>>>>> > /foo/d1/_SUCCESS >>>>>> > >>>>>> > /foo/d2/_metadata >>>>>> > /foo/d2/part-r-1.parquet >>>>>> > /foo/d2/part-r-2.parquet >>>>>> > /foo/d2/_SUCCESS >>>>>> > >>>>>> > // ParquetFileReader will fail, because /foo/d1 is a directory, not >>>>>> a >>>>>> > parquet partition >>>>>> > sqlContext.parquetFile("/foo") >>>>>> > >>>>>> > // works, but has the noted lack of pushdown >>>>>> > >>>>>> sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2")) >>>>>> > >>>>>> > >>>>>> > Is there another alternative? >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust < >>>>>> mich...@databricks.com> >>>>>> > wrote: >>>>>> > >>>>>> >> I think usually people add these directories as multiple >>>>>> partitions of the >>>>>> >> same table instead of union. This actually allows us to >>>>>> efficiently prune >>>>>> >> directories when reading in addition to standard column pruning. >>>>>> >> >>>>>> >> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf < >>>>>> malouf.g...@gmail.com> >>>>>> >> wrote: >>>>>> >> >>>>>> >>> I'm kind of surprised this was not run into before. Do people not >>>>>> >>> segregate their data by day/week in the HDFS directory structure? >>>>>> >>> >>>>>> >>> >>>>>> >>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust < >>>>>> mich...@databricks.com> >>>>>> >>> wrote: >>>>>> >>> >>>>>> >>>> Thanks! >>>>>> >>>> >>>>>> >>>> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger < >>>>>> c...@koeninger.org> >>>>>> >>>> wrote: >>>>>> >>>> >>>>>> >>>> > Opened >>>>>> >>>> > >>>>>> >>>> > https://issues.apache.org/jira/browse/SPARK-3462 >>>>>> >>>> > >>>>>> >>>> > I'll take a look at ColumnPruning and see what I can do >>>>>> >>>> > >>>>>> >>>> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust < >>>>>> >>>> mich...@databricks.com> >>>>>> >>>> > wrote: >>>>>> >>>> > >>>>>> >>>> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger < >>>>>> c...@koeninger.org> >>>>>> >>>> >> wrote: >>>>>> >>>> >>> >>>>>> >>>> >>> Is there a reason in general not to push projections and >>>>>> predicates >>>>>> >>>> down >>>>>> >>>> >>> into the individual ParquetTableScans in a union? >>>>>> >>>> >>> >>>>>> >>>> >> >>>>>> >>>> >> This would be a great case to add to ColumnPruning. Would be >>>>>> awesome >>>>>> >>>> if >>>>>> >>>> >> you could open a JIRA or even a PR :) >>>>>> >>>> >> >>>>>> >>>> > >>>>>> >>>> > >>>>>> >>>> >>>>>> >>> >>>>>> >>> >>>>>> >> >>>>>> >>>>> >>>>> >>>> >>> >> >