Hi Eric, Thanks for your feedback. I'm rebasing my code for the first approach on a more recent Spark master and am resolving some conflicts. I'll have a better understanding of the relationship to your PR once my rebase is complete.
Cheers, Michael > On Aug 8, 2016, at 12:51 PM, Eric Liang <e...@databricks.com> wrote: > > I like the former approach -- it seems more generally applicable to other > catalogs and IIUC would let you defer pruning until execution time. Pruning > is work that should be done by the catalog anyways, as is the case when > querying over an (unconverted) hive table. > > You might also want to look at https://github.com/apache/spark/pull/14241 > <https://github.com/apache/spark/pull/14241> , which refactors some of the > file scan execution to defer pruning. > > > On Mon, Aug 8, 2016, 11:53 AM Michael Allman <mich...@videoamp.com > <mailto:mich...@videoamp.com>> wrote: > Hello, > > I'd like to propose a modification in the way Hive table partition metadata > are loaded and cached. Currently, when a user reads from a partitioned Hive > table whose metadata are not cached (and for which Hive table conversion is > enabled and supported), all partition metadata is fetched from the metastore: > > https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260 > > <https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260> > > This is highly inefficient in some scenarios. In the most extreme case, a > user starts a new Spark app, runs a query which reads from a single partition > in a table with a large number of partitions and terminates their app. All > partition metadata are loaded and their files' schema are merged, but only a > single partition is read. Instead, I propose we load and cache partition > metadata on-demand, as needed to build query plans. > > We've long encountered this performance problem at VideoAmp and have taken > different approaches to address it. In addition to the load time, we've found > that loading all of a table's partition metadata can require a significant > amount of JVM heap space. Our largest tables OOM our Spark drivers unless we > allocate several GB of heap space. > > Certainly one could argue that our situation is pathological and rare, and > that the problem in our scenario is with the design of our tables—not Spark. > However, even in tables with more modest numbers of partitions, loading only > the necessary partition metadata and file schema can significantly reduce the > query planning time, and is definitely more memory efficient. > > I've written POCs for a couple of different implementation approaches. Though > incomplete, both have been successful in their basic goal. The first extends > `org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such is more > general. It requires some new abstractions and refactoring of > `HadoopFsRelation` and `FileCatalog`, among others. It places a greater > burden on other implementations of `ExternalCatalog`. Currently the only > other implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code > throws an `UnsupportedOperationException` on that implementation. > > The other approach is simpler and only touches code in the codebase's `hive` > project. Basically, conversion of `MetastoreRelation` to `HadoopFsRelation` > is deferred to physical planning when the metastore relation is partitioned. > During physical planning, the partition pruning filters in a logical query > plan are used to select the required partition metadata and a > `HadoopFsRelation` is built from those. The new logical plan is then > re-injected into the planner. > > I'd like to get the community's thoughts on my proposal and implementation > approaches. > > Thanks! > > Michael