You are hitting a weird optimization in withColumn.  Specifically, to avoid
building up huge trees with chained calls to this method, we collapse
projections eagerly (instead of waiting for the optimizer).

Typically we look for cached data in between analysis and optimization, so
that optimizations won't change out ability to recognized cached query
plans.  However, in this case the eager optimization is thwarting that.

On Sat, Nov 19, 2016 at 12:19 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> There might be a bug in how analyzing Datasets or looking up cached
> Datasets works. I'm on master.
>
> ➜  spark git:(master) ✗ ./bin/spark-submit --version
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
>       /_/
>
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_112
> Branch master
> Compiled by user jacek on 2016-11-19T08:39:43Z
> Revision 2a40de408b5eb47edba92f9fe92a42ed1e78bf98
> Url https://github.com/apache/spark.git
> Type --help for more information.
>
> After reviewing CacheManager and how caching works for Datasets I
> thought the following query would use the cached Dataset but it does
> not.
>
> // Cache Dataset -- it is lazy
> scala> val df = spark.range(1).cache
> df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>
> // Trigger caching
> scala> df.show
> +---+
> | id|
> +---+
> |  0|
> +---+
>
> // Visit http://localhost:4040/storage to see the Dataset cached. And it
> is.
>
> // Use the cached Dataset in another query
> // Notice InMemoryRelation in use for cached queries
> // It works as expected.
> scala> df.withColumn("newId", 'id).explain(extended = true)
> == Parsed Logical Plan ==
> 'Project [*, 'id AS newId#16]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Analyzed Logical Plan ==
> id: bigint, newId: bigint
> Project [id#0L, id#0L AS newId#16L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Optimized Logical Plan ==
> Project [id#0L, id#0L AS newId#16L]
> +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>       +- *Range (0, 1, step=1, splits=Some(8))
>
> == Physical Plan ==
> *Project [id#0L, id#0L AS newId#16L]
> +- InMemoryTableScan [id#0L]
>       +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
>             +- *Range (0, 1, step=1, splits=Some(8))
>
> I hoped that the following query would use the cached one but it does
> not. Should it? I thought that QueryExecution.withCachedData [1] would
> do the trick.
>
> [1] https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L70
>
> // The following snippet uses spark.range(1) which is the same as the
> one cached above
> // Why does the physical plan not use InMemoryTableScan and
> InMemoryRelation?
> scala> spark.range(1).withColumn("new", 'id).explain(extended = true)
> == Parsed Logical Plan ==
> 'Project [*, 'id AS new#29]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Analyzed Logical Plan ==
> id: bigint, new: bigint
> Project [id#26L, id#26L AS new#29L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Optimized Logical Plan ==
> Project [id#26L, id#26L AS new#29L]
> +- Range (0, 1, step=1, splits=Some(8))
>
> == Physical Plan ==
> *Project [id#26L, id#26L AS new#29L]
> +- *Range (0, 1, step=1, splits=Some(8))
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to