GitHub user kellrott opened a pull request:
https://github.com/apache/spark/pull/50
Patch for SPARK-942
This is a port of a pull request original targeted at incubator-spark:
https://github.com/apache/incubator-spark/pull/180
Essentially if a user returns a generative iterator (from a flatMap
operation), when trying to persist the data, Spark would first unroll the
iterator into an ArrayBuffer, and then try to figure out if it could store the
data. In cases where the user provided an iterator that generated more data
then available memory, this would case a crash. With this patch, if the user
requests a persist with a 'StorageLevel.DISK_ONLY', the iterator will be
unrolled as it is inputed into the serializer.
To do this, two changes where made:
1) The type of the 'values' argument in the putValues method of the
BlockStore interface was changed from ArrayBuffer to Iterator (and all code
interfacing with this method was modified to connect correctly.
2) The JavaSerializer now calls the ObjectOutputStream 'reset' method every
1000 objects. This was done because the ObjectOutputStream caches objects (thus
preventing them from being GC'd) to write more compact serialization. If reset
is never called, eventually the memory fills up, if it is called too often then
the serialization streams become much larger because of redundant class
descriptions.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kellrott/spark iterator-to-disk
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/50.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #50
----
commit efe1102c8a7436b2fe112d3bece9f35fedea0dc8
Author: Kyle Ellrott <[email protected]>
Date: 2013-11-13T00:32:54Z
Changing CacheManager and BlockManager to pass iterators directly to the
serializer when a 'DISK_ONLY' persist is called.
This is in response to SPARK-942.
commit cac1fadeec964cfc254ee1f02b82665aac9a5690
Author: Kyle Ellrott <[email protected]>
Date: 2013-11-13T21:49:50Z
Fixing MemoryStore, so that it converts incoming iterators to ArrayBuffer
objects. This was previously done higher up the stack.
commit d32992fd55726d3aa26530136b9a711856e42bd5
Author: Kyle Ellrott <[email protected]>
Date: 2013-11-13T22:10:28Z
Merge remote-tracking branch 'origin/master' into iterator-to-disk
commit 81d670cb9ad9d2e2635a0eb6ecc74f117554a708
Author: Kyle Ellrott <[email protected]>
Date: 2013-11-14T01:20:17Z
Adding unit test for straight to disk iterator methods.
commit f40382630bceed95b2e56e3f76fbc924fdb9f2c8
Author: Kyle Ellrott <[email protected]>
Date: 2013-11-15T06:36:59Z
Merge branch 'master' into iterator-to-disk
commit 5eb2b7e53d5290fdf71a7addd672c7f4ffbf6ec7
Author: Kyle Ellrott <[email protected]>
Date: 2013-11-17T06:19:19Z
Changing the JavaSerializer reset to occur every 1000 objects.
commit 44ec35a3733a25df6038827f480e8cf6991f9344
Author: Kyle Ellrott <[email protected]>
Date: 2013-11-17T06:35:51Z
Adding some comments.
commit 56f71cd10782b3c65df04ff9b083d9fc4f5b2503
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-04T15:49:27Z
Merge branch 'master' into iterator-to-disk
Conflicts:
core/src/main/scala/org/apache/spark/CacheManager.scala
commit 95c7f67b131496de51587afa373eee9da1a5d46b
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-24T21:35:05Z
Simplifying StorageLevel checks
commit 0e6f8084fe2e7cfb5129a016fcd65d62e4005031
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-24T23:56:09Z
Deleting temp output directory when done
commit 2eeda75621eb1d60f10d1f4ab805acae75edd7c5
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T00:20:20Z
Fixing dumb mistake ("||" instead of "&&")
commit a6424ba6b2551d4366a57cd1d5d32ffe5a4a3fd0
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T00:24:03Z
Wrapping long line
commit 9df02765528d57935a9aed8daf754a065f5d0ef5
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T01:21:48Z
Added check to make sure that streamed-to-dist RDD actually returns good
data in the LargeIteratorSuite
commit 31fe08ed356c5fb37a985ea72a10d6e3e165c80b
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T01:34:27Z
Removing un-needed semi-colons
commit 40fe1d7cf83ce2fe29a061ec2d6e6e54bd18a6ff
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T03:34:43Z
Removing rouge space
commit 00c98e07334dac20085f51977d015cab6e2242bb
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T06:22:12Z
Making the Java ObjectStreamSerializer reset rate configurable by the
system variable 'spark.serializer.objectStreamReset', default is not 10000.
commit 8644ee83ffbc6f02f93abaef8c56906c4683e8db
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T07:30:42Z
Merge branch 'master' into iterator-to-disk
commit 656c33e800a0f3c7926dd0857105e12e0cf5fb25
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T18:58:20Z
Fixing the JavaSerializer to read from the SparkConf rather then the System
property.
commit 0f28ec70853a6a5ab198bc113f6af77b78d34d51
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T19:03:23Z
Adding second putValues to BlockStore interface that accepts an ArrayBuffer
(rather then an Iterator).
This will allow BlockStores to have slightly different behaviors dependent
on whether they get an
Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to
duplicate and cache an Iterator
into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the
duplication.
commit 627a8b79d760103674f3c5b108900e911a6a7eeb
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-25T21:20:30Z
Wrapping a few long lines
commit c2fb43056c836ebb520bd076da2b576c32e794cf
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-26T01:09:29Z
Removing more un-needed array-buffer to iterator conversions
commit 16a4ceae706c3458e5a2721f8c27eebbf2cf4c89
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-26T19:05:27Z
Streamlined the LargeIteratorSuite unit test. It should now run in ~25
seconds. Confirmed that it still crashes an unpatched copy of Spark.
commit 7ccc74b7f7a2c58739cde2e4e83950e07e7fd3eb
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-27T21:34:56Z
Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It
doesn't try to invoke a OOM error any more
commit f70d06939bb9c164a0a6c9af42f663bc882c3211
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-27T21:36:06Z
Adding docs for spark.serializer.objectStreamReset configuration
commit 2f684ea15053d1ad934d60c58e082c1edf57b3a0
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-27T21:36:42Z
Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now
using trait 'Values'. Also modified BlockStore.putBytes call to return
PutResult, so that it behaves like putValues.
commit 33ac3900b0b11c0646b85e41e1129734adf6ce5c
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-27T21:39:50Z
Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spark into
iterator-to-disk
Conflicts:
core/src/test/scala/org/apache/spark/storage/LargeIteratorSuite.scala
commit 8aa31cdf94981887fcbc5c7db79a4f2d310dcb59
Author: Kyle Ellrott <[email protected]>
Date: 2014-02-28T23:25:02Z
Merge ../incubator-spark into iterator-to-disk
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---