When second attempt is made to cache df3 which has same schema as the first DataFrame, you would see the warning below:
scala> sqlContext.cacheTable("t1") scala> sqlContext.isCached("t1") res5: Boolean = true scala> sqlContext.sql("select * from t1").show +---+---+ | a| b| +---+---+ | 1| 1| | 2| 2| +---+---+ scala> df3.registerTempTable("t3") scala> sqlContext.cacheTable("t1") 15/12/18 10:43:25 WARN CacheManager: Asked to cache already cached data. See this from CacheManager#cacheQuery() : if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") On Fri, Dec 18, 2015 at 10:24 AM, Sahil Sareen <sareen...@gmail.com> wrote: > From the UI I see two rows for this on a streaming application: > > RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize > in ExternalBlockStoreSize on DiskIn-memory table myColorsTableMemory > Deserialized 1x Replicated2100%728.2 KB0.0 B0.0 BIn-memory table > myColorsTableMemory Deserialized 1x Replicated2100%728.2 KB0.0 B0.0 BThis > means it wasn't overwritten :( > My question now is, if only the latest table is going to be used, why > isn't the earlier version auto cleared? > > On Fri, Dec 18, 2015 at 11:44 PM, Sahil Sareen <sareen...@gmail.com> > wrote: > >> So I looked at the function, my only worry is that the cache should be >> cleared if I'm overwriting the cache with the same table name. I did this >> experiment and the cache shows as table not cached but want to confirm >> this. In addition to not using the old table values is it actually >> removed/overwritten in memory? >> >> scala> df.collect >> res54: Array[org.apache.spark.sql.Row] = Array([blue,#0033FF], >> [red,#FF0000], [green,#FSKA]) <=== 3 rows >> >> scala> df2.collect >> res55: Array[org.apache.spark.sql.Row] = Array([blue,#0033FF], >> [red,#FF0000]) <=== 2 rows >> >> scala> df.registerTempTable("myColorsTable") >> >> scala> sqlContext.isCached("myColorsTable") >> res58: Boolean = false >> >> scala> sqlContext.cacheTable("myColorsTable") <=== cache table in df(3 rows) >> >> scala> sqlContext.isCached("myColorsTable") >> res60: Boolean = true >> >> scala> sqlContext.sql("select * from myColorsTable").foreach(println) <=== >> sql is running on df(3 rows) >> [blue,#0033FF] >> [red,#FF0000] >> [green,#FSKA] >> >> scala> df2.registerTempTable("myColorsTable") <=== register another table >> with the same table name >> *scala> sqlContext.isCached("myColorsTable") >> res63: Boolean = false* >> >> scala> sqlContext.sql("select * from myColorsTable").foreach(println) <=== >> sql is running on df2(2 rows) >> [blue,#0033FF] >> [red,#FF0000] >> >> >> On Fri, Dec 18, 2015 at 11:17 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> This method in CacheManager: >>> private[sql] def lookupCachedData(plan: LogicalPlan): >>> Option[CachedData] = readLock { >>> cachedData.find(cd => plan.sameResult(cd.plan)) >>> >>> Ied me to the following in >>> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala >>> : >>> >>> def sameResult(plan: LogicalPlan): Boolean = { >>> >>> There is detailed comment above this method which should give some idea. >>> >>> Cheers >>> >>> On Fri, Dec 18, 2015 at 9:21 AM, Sahil Sareen <sareen...@gmail.com> >>> wrote: >>> >>>> Thanks Ted! >>>> >>>> Yes, The schema might be different or the same. >>>> What would be the answer for each situation? >>>> >>>> On Fri, Dec 18, 2015 at 6:02 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> CacheManager#cacheQuery() is called where: >>>>> * Caches the data produced by the logical representation of the >>>>> given [[Queryable]]. >>>>> ... >>>>> val planToCache = query.queryExecution.analyzed >>>>> if (lookupCachedData(planToCache).nonEmpty) { >>>>> >>>>> Is the schema for dfNew different from that of dfOld ? >>>>> >>>>> Cheers >>>>> >>>>> On Fri, Dec 18, 2015 at 3:33 AM, Sahil Sareen <sareen...@gmail.com> >>>>> wrote: >>>>> >>>>>> Spark 1.5.2 >>>>>> >>>>>> dfOld.registerTempTable("oldTableName") >>>>>> sqlContext.cacheTable("oldTableName") >>>>>> // .... >>>>>> // do something >>>>>> // .... >>>>>> dfNew.registerTempTable("oldTableName") >>>>>> sqlContext.cacheTable("oldTableName") >>>>>> >>>>>> >>>>>> Now when I use the "oldTableName" table I do get the latest contents >>>>>> from dfNew but do the contents of dfOld get removed from the memory? >>>>>> >>>>>> Or is the right usage to do this: >>>>>> dfOld.registerTempTable("oldTableName") >>>>>> sqlContext.cacheTable("oldTableName") >>>>>> // .... >>>>>> // do something >>>>>> // .... >>>>>> dfNew.registerTempTable("oldTableName") >>>>>> sqlContext.unCacheTable("oldTableName") <========== unCache the old >>>>>> contents first >>>>>> sqlContext.cacheTable("oldTableName") >>>>>> >>>>>> -Sahil >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >