Cool, thanks for your help on this.  Any chance of adding it to the 1.1.1
point release, assuming there ends up being one?

On Wed, Sep 10, 2014 at 11:39 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Hey Cody,
>
> Thanks for doing this!  Will look at your PR later today.
>
> Michael
>
> On Wed, Sep 10, 2014 at 9:31 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Tested the patch against a cluster with some real data.  Initial results
>> seem like going from one table to a union of 2 tables is now closer to a
>> doubling of query time as expected, instead of 5 to 10x.
>>
>> Let me know if you see any issues with that PR.
>>
>> On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> So the obvious thing I was missing is that the analyzer has already
>>> resolved attributes by the time the optimizer runs, so the references in
>>> the filter / projection need to be fixed up to match the children.
>>>
>>> Created a PR, let me know if there's a better way to do it.  I'll see
>>> about testing performance against some actual data sets.
>>>
>>> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Ok, so looking at the optimizer code for the first time and trying the
>>>> simplest rule that could possibly work,
>>>>
>>>> object UnionPushdown extends Rule[LogicalPlan] {
>>>>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>>>>     // Push down filter into
>>>> union
>>>>     case f @ Filter(condition, u @ Union(left, right)) =>
>>>>
>>>>       u.copy(left = f.copy(child = left), right = f.copy(child =
>>>> right))
>>>>
>>>>
>>>>     // Push down projection into
>>>> union
>>>>     case p @ Project(projectList, u @ Union(left, right)) =>
>>>>       u.copy(left = p.copy(child = left), right = p.copy(child =
>>>> right))
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> If I try manually applying that rule to a logical plan in the repl, it
>>>> produces the query shape I'd expect, and executing that plan results in
>>>> parquet pushdowns as I'd expect.
>>>>
>>>> But adding those cases to ColumnPruning results in a runtime exception
>>>> (below)
>>>>
>>>> I can keep digging, but it seems like I'm missing some obvious initial
>>>> context around naming of attributes.  If you can provide any pointers to
>>>> speed me on my way I'd appreciate it.
>>>>
>>>>
>>>> java.lang.AssertionError: assertion failed: ArrayBuffer() +
>>>> ArrayBuffer() != WrappedArray(name#6, age#7), List(name#9, age#10,
>>>> phones#11)
>>>>         at scala.Predef$.assert(Predef.scala:179)
>>>>         at
>>>> org.apache.spark.sql.parquet.ParquetTableScan.<init>(ParquetTableOperations.scala:75)
>>>>         at
>>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>>>>         at
>>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>>>>         at
>>>> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
>>>>         at
>>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
>>>>         at
>>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>>>         at
>>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>>>         at
>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>         at
>>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>>>>         at
>>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>>>>         at
>>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>>>>         at
>>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>>>>         at
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>         at
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>         at scala.collection.immutable.List.foreach(List.scala:318)
>>>>         at
>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>         at
>>>> scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>>>         at
>>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
>>>>         at
>>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>>>         at
>>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>>>         at
>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>         at
>>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>>>>         at
>>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>>>>         at
>>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>>>>         at
>>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
>>>>         at
>>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
>>>>         at
>>>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> What Patrick said is correct.  Two other points:
>>>>>  - In the 1.2 release we are hoping to beef up the support for working
>>>>> with partitioned parquet independent of the metastore.
>>>>>  - You can actually do operations like INSERT INTO for parquet tables
>>>>> to add data.  This creates new parquet files for each insertion.  This 
>>>>> will
>>>>> break if there are multiple concurrent writers to the same table.
>>>>>
>>>>> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell <pwend...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think what Michael means is people often use this to read existing
>>>>>> partitioned Parquet tables that are defined in a Hive metastore rather
>>>>>> than data generated directly from within Spark and then reading it
>>>>>> back as a table. I'd expect the latter case to become more common, but
>>>>>> for now most users connect to an existing metastore.
>>>>>>
>>>>>> I think you could go this route by creating a partitioned external
>>>>>> table based on the on-disk layout you create. The downside is that
>>>>>> you'd have to go through a hive metastore whereas what you are doing
>>>>>> now doesn't need hive at all.
>>>>>>
>>>>>> We should also just fix the case you are mentioning where a union is
>>>>>> used directly from within spark. But that's the context.
>>>>>>
>>>>>> - Patrick
>>>>>>
>>>>>> On Tue, Sep 9, 2014 at 12:01 PM, Cody Koeninger <c...@koeninger.org>
>>>>>> wrote:
>>>>>> > Maybe I'm missing something, I thought parquet was generally a
>>>>>> write-once
>>>>>> > format and the sqlContext interface to it seems that way as well.
>>>>>> >
>>>>>> > d1.saveAsParquetFile("/foo/d1")
>>>>>> >
>>>>>> > // another day, another table, with same schema
>>>>>> > d2.saveAsParquetFile("/foo/d2")
>>>>>> >
>>>>>> > Will give a directory structure like
>>>>>> >
>>>>>> > /foo/d1/_metadata
>>>>>> > /foo/d1/part-r-1.parquet
>>>>>> > /foo/d1/part-r-2.parquet
>>>>>> > /foo/d1/_SUCCESS
>>>>>> >
>>>>>> > /foo/d2/_metadata
>>>>>> > /foo/d2/part-r-1.parquet
>>>>>> > /foo/d2/part-r-2.parquet
>>>>>> > /foo/d2/_SUCCESS
>>>>>> >
>>>>>> > // ParquetFileReader will fail, because /foo/d1 is a directory, not
>>>>>> a
>>>>>> > parquet partition
>>>>>> > sqlContext.parquetFile("/foo")
>>>>>> >
>>>>>> > // works, but has the noted lack of pushdown
>>>>>> >
>>>>>> sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2"))
>>>>>> >
>>>>>> >
>>>>>> > Is there another alternative?
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust <
>>>>>> mich...@databricks.com>
>>>>>> > wrote:
>>>>>> >
>>>>>> >> I think usually people add these directories as multiple
>>>>>> partitions of the
>>>>>> >> same table instead of union.  This actually allows us to
>>>>>> efficiently prune
>>>>>> >> directories when reading in addition to standard column pruning.
>>>>>> >>
>>>>>> >> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf <
>>>>>> malouf.g...@gmail.com>
>>>>>> >> wrote:
>>>>>> >>
>>>>>> >>> I'm kind of surprised this was not run into before.  Do people not
>>>>>> >>> segregate their data by day/week in the HDFS directory structure?
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust <
>>>>>> mich...@databricks.com>
>>>>>> >>> wrote:
>>>>>> >>>
>>>>>> >>>> Thanks!
>>>>>> >>>>
>>>>>> >>>> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger <
>>>>>> c...@koeninger.org>
>>>>>> >>>> wrote:
>>>>>> >>>>
>>>>>> >>>> > Opened
>>>>>> >>>> >
>>>>>> >>>> > https://issues.apache.org/jira/browse/SPARK-3462
>>>>>> >>>> >
>>>>>> >>>> > I'll take a look at ColumnPruning and see what I can do
>>>>>> >>>> >
>>>>>> >>>> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
>>>>>> >>>> mich...@databricks.com>
>>>>>> >>>> > wrote:
>>>>>> >>>> >
>>>>>> >>>> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger <
>>>>>> c...@koeninger.org>
>>>>>> >>>> >> wrote:
>>>>>> >>>> >>>
>>>>>> >>>> >>> Is there a reason in general not to push projections and
>>>>>> predicates
>>>>>> >>>> down
>>>>>> >>>> >>> into the individual ParquetTableScans in a union?
>>>>>> >>>> >>>
>>>>>> >>>> >>
>>>>>> >>>> >> This would be a great case to add to ColumnPruning.  Would be
>>>>>> awesome
>>>>>> >>>> if
>>>>>> >>>> >> you could open a JIRA or even a PR :)
>>>>>> >>>> >>
>>>>>> >>>> >
>>>>>> >>>> >
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to