GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/126
[SPARK-1103] [WIP] Automatic garbage collection of RDD, shuffle and broadcast data This PR allows Spark to automatically cleanup metadata and data related to persisted RDDs, shuffles and broadcast variables when the corresponding RDDs, shuffles and broadcast variables fall out of scope from the driver program. This is still a work in progress as broadcast cleanup has not been implemented. **Implementation Details:** A new class `ContextCleaner` is responsible cleaning all the state. It is instantiated as part of a `SparkContext`. RDD and ShuffleDependency classes have overridden `finalize()` function that gets called whenever their instances go out of scope. The `finalize()` function enqueues the objectâs identifier (i.e. RDD ID, shuffle ID, etc.) with the `ContextCleaner`, which is a very short and cheap operation and should not significantly affect the garbage collection mechanism. The `ContextCleaner`, on a different thread, performs the cleanup, whose details are given below. *RDD cleanup:* `ContextCleaner` calls `RDD.unpersist()` is used to cleanup persisted RDDs. Regarding metadata, the DAGScheduler automatically cleans up all metadata related to a RDD after all jobs have completed. Only the `SparkContext.persistentRDDs` keeps strong references to persisted RDDs. The `TimeStampedHashMap` used for that has been replaced by `TimeStampedWeakValueHashMap` that keeps only weak references to the RDDs, allowing them to be garbage collected. *Shuffle cleanup:* New BlockManager message `RemoveShuffle(<shuffle ID>)` asks the `BlockManagerMaster` and currently active `BlockManager`s to delete all the disk blocks related to the shuffle ID. `ContextCleaner` cleans up shuffle data using this message and also cleans up the metadata in the `MapOutputTracker` of the driver. The `MapOutputTracker` at the workers, that caches the shuffle metadata, maintains a `BoundedHashMap` to limit the shuffle information it caches. Refetching the shuffle information from the driver is not too costly. *Broadcast cleanup:* To be done. [This PR](https://github.com/apache/incubator-spark/pull/543/) adds mechanism for explicit cleanup of broadcast variables. `Broadcast.finalize()` will enqueue its own ID with ContextCleaner and the PRs mechanism will be used to unpersist the Broadcast data. *Other cleanup:* `ShuffleMapTask` and `ResultTask` caches tasks and used TTL based cleanup (using `TimeStampedHashMap`), so nothing got cleaned up if TTL was not set. Instead, they now use `BoundedHashMap` to keep a limited number of map output information. Cost of repopulating the cache if necessary is very small. *Current state of implementation:* Implemented RDD and shuffle cleanup. Things left to be done are. - Cleaning up for broadcast variable still to be done. - Automatic cleaning up keys with empty weak refs as values in `TimeStampedWeakValueHashMap` You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark state-cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/126.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #126 ---- commit e427a9eeb8d6b5def3a5ff1b766458588d8b05a9 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2014-02-14T03:14:31Z Added ContextCleaner to automatically clean RDDs and shuffles when they fall out of scope. Also replaced TimeStampedHashMap to BoundedHashMaps and TimeStampedWeakValueHashMap for the necessary hashmap behavior. commit 8512612036011b5cf688a1643d0d46f144a0f15e Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2014-02-14T08:01:04Z Changed TimeStampedHashMap to use WrappedJavaHashMap. commit a24fefccbc93675c939621ea03476b7f993abe4e Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2014-03-11T03:46:12Z Merge remote-tracking branch 'apache/master' into state-cleanup Conflicts: core/src/main/scala/org/apache/spark/MapOutputTracker.scala core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala core/src/main/scala/org/apache/spark/storage/BlockManager.scala core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala commit cb0a5a66ce7dbc2ded209d8bdd0cd88953f70b5f Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2014-03-11T18:33:43Z Fixed docs and styles. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---