When second attempt is made to cache df3 which has same schema as the first
DataFrame, you would see the warning below:

scala> sqlContext.cacheTable("t1")

scala> sqlContext.isCached("t1")
res5: Boolean = true

scala> sqlContext.sql("select * from t1").show
+---+---+
|  a|  b|
+---+---+
|  1|  1|
|  2|  2|
+---+---+
scala> df3.registerTempTable("t3")

scala> sqlContext.cacheTable("t1")
15/12/18 10:43:25 WARN CacheManager: Asked to cache already cached data.


See this from CacheManager#cacheQuery() :

    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")

On Fri, Dec 18, 2015 at 10:24 AM, Sahil Sareen <sareen...@gmail.com> wrote:

> From the UI I see two rows for this on a streaming application:
>
> RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize
> in ExternalBlockStoreSize on DiskIn-memory table myColorsTableMemory
> Deserialized 1x Replicated2100%728.2 KB0.0 B0.0 BIn-memory table
> myColorsTableMemory Deserialized 1x Replicated2100%728.2 KB0.0 B0.0 BThis
> means it wasn't overwritten :(
> My question now is, if only the latest table is going to be used, why
> isn't the earlier version auto cleared?
>
> On Fri, Dec 18, 2015 at 11:44 PM, Sahil Sareen <sareen...@gmail.com>
> wrote:
>
>> So I looked at the function, my only worry is that the cache should be
>> cleared if I'm overwriting the cache with the same table name. I did this
>> experiment and the cache shows as table not cached but want to confirm
>> this. In addition to not using the old table values is it actually
>> removed/overwritten in memory?
>>
>> scala> df.collect
>> res54: Array[org.apache.spark.sql.Row] = Array([blue,#0033FF], 
>> [red,#FF0000], [green,#FSKA])  <=== 3 rows
>>
>> scala> df2.collect
>> res55: Array[org.apache.spark.sql.Row] = Array([blue,#0033FF], 
>> [red,#FF0000])  <=== 2 rows
>>
>> scala> df.registerTempTable("myColorsTable")
>>
>> scala> sqlContext.isCached("myColorsTable")
>> res58: Boolean = false
>>
>> scala> sqlContext.cacheTable("myColorsTable") <=== cache table in df(3 rows)
>>
>> scala> sqlContext.isCached("myColorsTable")
>> res60: Boolean = true
>>
>> scala> sqlContext.sql("select * from myColorsTable").foreach(println) <=== 
>> sql is running on df(3 rows)
>> [blue,#0033FF]
>> [red,#FF0000]
>> [green,#FSKA]
>>
>> scala> df2.registerTempTable("myColorsTable") <=== register another table 
>> with the same table name
>> *scala> sqlContext.isCached("myColorsTable")
>> res63: Boolean = false*
>>
>> scala> sqlContext.sql("select * from myColorsTable").foreach(println) <=== 
>> sql is running on df2(2 rows)
>> [blue,#0033FF]
>> [red,#FF0000]
>>
>>
>> On Fri, Dec 18, 2015 at 11:17 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> This method in CacheManager:
>>>   private[sql] def lookupCachedData(plan: LogicalPlan):
>>> Option[CachedData] = readLock {
>>>     cachedData.find(cd => plan.sameResult(cd.plan))
>>>
>>> Ied me to the following in
>>> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
>>> :
>>>
>>>   def sameResult(plan: LogicalPlan): Boolean = {
>>>
>>> There is detailed comment above this method which should give some idea.
>>>
>>> Cheers
>>>
>>> On Fri, Dec 18, 2015 at 9:21 AM, Sahil Sareen <sareen...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Ted!
>>>>
>>>> Yes, The schema might be different or the same.
>>>> What would be the answer for each situation?
>>>>
>>>> On Fri, Dec 18, 2015 at 6:02 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> CacheManager#cacheQuery() is called where:
>>>>>   * Caches the data produced by the logical representation of the
>>>>> given [[Queryable]].
>>>>> ...
>>>>>     val planToCache = query.queryExecution.analyzed
>>>>>     if (lookupCachedData(planToCache).nonEmpty) {
>>>>>
>>>>> Is the schema for dfNew different from that of dfOld ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Fri, Dec 18, 2015 at 3:33 AM, Sahil Sareen <sareen...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Spark 1.5.2
>>>>>>
>>>>>> dfOld.registerTempTable("oldTableName")
>>>>>> sqlContext.cacheTable("oldTableName")
>>>>>> // ....
>>>>>> // do something
>>>>>> // ....
>>>>>> dfNew.registerTempTable("oldTableName")
>>>>>> sqlContext.cacheTable("oldTableName")
>>>>>>
>>>>>>
>>>>>> Now when I use the "oldTableName" table I do get the latest contents
>>>>>> from dfNew but do the contents of dfOld get removed from the memory?
>>>>>>
>>>>>> Or is the right usage to do this:
>>>>>> dfOld.registerTempTable("oldTableName")
>>>>>> sqlContext.cacheTable("oldTableName")
>>>>>> // ....
>>>>>> // do something
>>>>>> // ....
>>>>>> dfNew.registerTempTable("oldTableName")
>>>>>> sqlContext.unCacheTable("oldTableName") <========== unCache the old
>>>>>> contents first
>>>>>> sqlContext.cacheTable("oldTableName")
>>>>>>
>>>>>> -Sahil
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to