[GitHub] spark pull request: SPARK-897: preemptively serialize closures
GitHub user willb opened a pull request: https://github.com/apache/spark/pull/143 SPARK-897: preemptively serialize closures These commits cause `ClosureCleaner.clean` to attempt to serialize the cleaned closure with the default closure serializer and throw a `SparkException` if doing so fails. This behavior is enabled by default but can be disabled at individual callsites of `SparkContext.clean`. Commit 98e01ae8 fixes some no-op assertions in `GraphSuite` that this work exposed; I'm happy to put that in a separate PR if that would be more appropriate. You can merge this pull request into a Git repository by running: $ git pull https://github.com/willb/spark spark-897 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/143.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #143 commit bcab2f0414a956ffa89c5dd0fee16de1b33320a2 Author: William Benton Date: 2014-03-13T02:56:32Z Test case for SPARK-897. Tests to make sure that passing an unserializable closure to a transformation fails fast. commit f2ef54e4ec92d8f0ee3e91af4f507bcabd29a7c0 Author: William Benton Date: 2014-03-13T19:21:45Z Generalized proactive closure serialization test. commit 6cb921874c02f3f03dd66db697c6995dc9565a0f Author: William Benton Date: 2014-03-13T19:40:42Z Adds proactive closure-serializablilty checking ClosureCleaner.clean now checks to ensure that its closure argument is serializable by default and throws a SparkException with the underlying NotSerializableException in the detail message otherwise. As a result, transformation invocations with unserializable closures will fail at their call sites rather than when they actually execute. ClosureCleaner.clean now takes a second boolean argument; pass false to disable serializability-checking behavior at call sites where this behavior isn't desired. commit 98e01ae854dd3fce03d753d5f25a6022ae6f58d6 Author: William Benton Date: 2014-03-14T16:40:56Z Ensure assertions in Graph.apply are asserted. The Graph.apply test in GraphSuite had some assertions in a closure in a graph transformation. This caused two problems: 1. because assert() was called, test classes were reachable from the closures, which made them not serializable, and 2. (more importantly) these assertions never actually executed, since they occurred within a lazy map() This commit simply changes the Graph.apply test to collects the graph triplets so it can assert about each triplet from a map method. commit 70a449d87018e7bfa8dbf7249948a7f48a891719 Author: William Benton Date: 2014-03-14T17:33:33Z Make proactive serializability checking optional. SparkContext.clean uses ClosureCleaner's proactive serializability checking by default. This commit adds an overloaded clean method to SparkContext that allows clients to specify that serializability checking should not occur as part of closure cleaning. commit 9eb301387644d5c14a03a0bbb96c6b007f228f3d Author: William Benton Date: 2014-03-14T17:34:42Z Don't check serializability of DStream transforms. Since the DStream is reachable from within these closures, they aren't checkable by the straightforward technique of passing them to the closure serializer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-897: preemptively serialize closures
Github user willb commented on a diff in the pull request: https://github.com/apache/spark/pull/143#discussion_r10622998 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala --- @@ -533,7 +533,7 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { -transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) +transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) --- End diff -- As far as I can tell, the issue here is that the `DStream` object is reachable from `transformFunc(r )`, and `DStream.writeObject` generates a runtime exception if it is called from within the closure serializer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-897: preemptively serialize closures
Github user willb commented on the pull request: https://github.com/apache/spark/pull/143#issuecomment-37686459 Yes, my understanding of SPARK-897 is that the issue is ensuring serializability errors are reported to the user as soon as possible. And essentially what these commits do is replicate the closure-serializability check (which, as you note, occurs now in the scheduler as part of job submission) in `ClosureCleaner.clean`, which is called for every closure argument to RDD transformation methods in the driver. (The test cases I added in f2ef54e check to see that unserializable-closure failures happen immediately on transformation invocation, not merely after actions occur.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-897: preemptively serialize closures
Github user willb commented on the pull request: https://github.com/apache/spark/pull/143#issuecomment-37690238 Here's what I was thinking about that: I left the check in `DAGScheduler` in place because preemptive checking is optional (and indeed not done everywhere) and it seems like it is in general a pretty inexpensive thing to do to make sure the error is reported where it's sensible at all (as far as I could tell, `clean` may be called multiple times per closure in any case). I absolutely agree that it would be ideal to do serializability checking (as well as closure cleaning) only once per closure, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-897: preemptively serialize closures
Github user willb commented on the pull request: https://github.com/apache/spark/pull/143#issuecomment-37693355 A configuration option makes sense to me and I'm happy to add it. Let me know if you have strong feelings about what it should be called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---