GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/126

    [SPARK-1103] [WIP] Automatic garbage collection of RDD, shuffle and 
broadcast data

    This PR allows Spark to automatically cleanup metadata and data related to 
persisted RDDs, shuffles and broadcast variables when the corresponding RDDs, 
shuffles and broadcast variables fall out of scope from the driver program. 
This is still a work in progress as broadcast cleanup has not been implemented.
    
    **Implementation Details:** 
    A new class `ContextCleaner` is responsible cleaning all the state. It is 
instantiated as part of a `SparkContext`. RDD and ShuffleDependency classes 
have overridden `finalize()` function that gets called whenever their instances 
go out of scope. The `finalize()` function enqueues the object’s identifier 
(i.e. RDD ID, shuffle ID, etc.) with the `ContextCleaner`, which is a very 
short and cheap operation and should not significantly affect the garbage 
collection mechanism. The `ContextCleaner`, on a different thread, performs the 
cleanup, whose details are given below.
    
    *RDD cleanup:*
    `ContextCleaner` calls `RDD.unpersist()` is used to cleanup persisted RDDs. 
Regarding metadata, the DAGScheduler automatically cleans up all metadata 
related to a RDD after all jobs have completed. Only the 
`SparkContext.persistentRDDs` keeps strong references to persisted RDDs. The 
`TimeStampedHashMap` used for that has been replaced by 
`TimeStampedWeakValueHashMap` that keeps only weak references to the RDDs, 
allowing them to be garbage collected.
    
    *Shuffle cleanup:*
    New BlockManager message `RemoveShuffle(<shuffle ID>)` asks the 
`BlockManagerMaster` and currently active `BlockManager`s to delete all the 
disk blocks related to the shuffle ID. `ContextCleaner` cleans up shuffle data 
using this message and also cleans up the metadata in the `MapOutputTracker` of 
the driver. The `MapOutputTracker` at the workers, that caches the shuffle 
metadata, maintains a `BoundedHashMap` to limit the shuffle information it 
caches. Refetching the shuffle information from the driver is not too costly.
    
    *Broadcast cleanup:*
    To be done. [This PR](https://github.com/apache/incubator-spark/pull/543/) 
adds mechanism for explicit cleanup of broadcast variables. 
`Broadcast.finalize()` will enqueue its own ID with ContextCleaner and the PRs 
mechanism will be used to unpersist the Broadcast data.
    
    *Other cleanup:*
    `ShuffleMapTask` and `ResultTask` caches tasks and used TTL based cleanup 
(using `TimeStampedHashMap`), so nothing got cleaned up if TTL was not set. 
Instead, they now use `BoundedHashMap` to keep a limited number of map output 
information. Cost of repopulating the cache if necessary is very small. 
    
    *Current state of implementation:*
    Implemented RDD and shuffle cleanup. Things left to be done are. 
    - Cleaning up for broadcast variable still to be done. 
    - Automatic cleaning up keys with empty weak refs as values in 
`TimeStampedWeakValueHashMap`


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark state-cleanup

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #126
    
----
commit e427a9eeb8d6b5def3a5ff1b766458588d8b05a9
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2014-02-14T03:14:31Z

    Added ContextCleaner to automatically clean RDDs and shuffles when they 
fall out of scope. Also replaced TimeStampedHashMap to BoundedHashMaps and 
TimeStampedWeakValueHashMap for the necessary hashmap behavior.

commit 8512612036011b5cf688a1643d0d46f144a0f15e
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2014-02-14T08:01:04Z

    Changed TimeStampedHashMap to use WrappedJavaHashMap.

commit a24fefccbc93675c939621ea03476b7f993abe4e
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2014-03-11T03:46:12Z

    Merge remote-tracking branch 'apache/master' into state-cleanup
    
    Conflicts:
        core/src/main/scala/org/apache/spark/MapOutputTracker.scala
        core/src/main/scala/org/apache/spark/SparkContext.scala
        core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
        core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
        core/src/main/scala/org/apache/spark/storage/BlockManager.scala
        core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
        core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

commit cb0a5a66ce7dbc2ded209d8bdd0cd88953f70b5f
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2014-03-11T18:33:43Z

    Fixed docs and styles.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to