Hi Eric,

Thanks for your feedback. I'm rebasing my code for the first approach on a more 
recent Spark master and am resolving some conflicts. I'll have a better 
understanding of the relationship to your PR once my rebase is complete.

Cheers,

Michael

> On Aug 8, 2016, at 12:51 PM, Eric Liang <e...@databricks.com> wrote:
> 
> I like the former approach -- it seems more generally applicable to other 
> catalogs and IIUC would let you defer pruning until execution time. Pruning 
> is work that should be done by the catalog anyways, as is the case when 
> querying over an (unconverted) hive table.
> 
> You might also want to look at https://github.com/apache/spark/pull/14241 
> <https://github.com/apache/spark/pull/14241> , which refactors some of the 
> file scan execution to defer pruning.
> 
> 
> On Mon, Aug 8, 2016, 11:53 AM Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> Hello,
> 
> I'd like to propose a modification in the way Hive table partition metadata 
> are loaded and cached. Currently, when a user reads from a partitioned Hive 
> table whose metadata are not cached (and for which Hive table conversion is 
> enabled and supported), all partition metadata is fetched from the metastore:
> 
> https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260
>  
> <https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260>
> 
> This is highly inefficient in some scenarios. In the most extreme case, a 
> user starts a new Spark app, runs a query which reads from a single partition 
> in a table with a large number of partitions and terminates their app. All 
> partition metadata are loaded and their files' schema are merged, but only a 
> single partition is read. Instead, I propose we load and cache partition 
> metadata on-demand, as needed to build query plans.
> 
> We've long encountered this performance problem at VideoAmp and have taken 
> different approaches to address it. In addition to the load time, we've found 
> that loading all of a table's partition metadata can require a significant 
> amount of JVM heap space. Our largest tables OOM our Spark drivers unless we 
> allocate several GB of heap space.
> 
> Certainly one could argue that our situation is pathological and rare, and 
> that the problem in our scenario is with the design of our tables—not Spark. 
> However, even in tables with more modest numbers of partitions, loading only 
> the necessary partition metadata and file schema can significantly reduce the 
> query planning time, and is definitely more memory efficient.
> 
> I've written POCs for a couple of different implementation approaches. Though 
> incomplete, both have been successful in their basic goal. The first extends 
> `org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such is more 
> general. It requires some new abstractions and refactoring of 
> `HadoopFsRelation` and `FileCatalog`, among others. It places a greater 
> burden on other implementations of `ExternalCatalog`. Currently the only 
> other implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code 
> throws an `UnsupportedOperationException` on that implementation.
> 
> The other approach is simpler and only touches code in the codebase's `hive` 
> project. Basically, conversion of `MetastoreRelation` to `HadoopFsRelation` 
> is deferred to physical planning when the metastore relation is partitioned. 
> During physical planning, the partition pruning filters in a logical query 
> plan are used to select the required partition metadata and a 
> `HadoopFsRelation` is built from those. The new logical plan is then 
> re-injected into the planner.
> 
> I'd like to get the community's thoughts on my proposal and implementation 
> approaches.
> 
> Thanks!
> 
> Michael

Reply via email to