[GitHub] spark pull request: SPARK-897: preemptively serialize closures

2014-03-14 Thread willb
GitHub user willb opened a pull request:

https://github.com/apache/spark/pull/143

SPARK-897:  preemptively serialize closures

These commits cause `ClosureCleaner.clean` to attempt to serialize the 
cleaned closure with the default closure serializer and throw a 
`SparkException` if doing so fails.  This behavior is enabled by default but 
can be disabled at individual callsites of `SparkContext.clean`.

Commit 98e01ae8 fixes some no-op assertions in `GraphSuite` that this work 
exposed; I'm happy to put that in a separate PR if that would be more 
appropriate.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/willb/spark spark-897

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/143.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #143


commit bcab2f0414a956ffa89c5dd0fee16de1b33320a2
Author: William Benton 
Date:   2014-03-13T02:56:32Z

Test case for SPARK-897.

Tests to make sure that passing an unserializable closure
to a transformation fails fast.

commit f2ef54e4ec92d8f0ee3e91af4f507bcabd29a7c0
Author: William Benton 
Date:   2014-03-13T19:21:45Z

Generalized proactive closure serialization test.

commit 6cb921874c02f3f03dd66db697c6995dc9565a0f
Author: William Benton 
Date:   2014-03-13T19:40:42Z

Adds proactive closure-serializablilty checking

ClosureCleaner.clean now checks to ensure that its closure argument
is serializable by default and throws a SparkException with the
underlying NotSerializableException in the detail message otherwise.
As a result, transformation invocations with unserializable closures
will fail at their call sites rather than when they actually execute.

ClosureCleaner.clean now takes a second boolean argument; pass false
to disable serializability-checking behavior at call sites where this
behavior isn't desired.

commit 98e01ae854dd3fce03d753d5f25a6022ae6f58d6
Author: William Benton 
Date:   2014-03-14T16:40:56Z

Ensure assertions in Graph.apply are asserted.

The Graph.apply test in GraphSuite had some assertions in a closure in
a graph transformation. This caused two problems:

1. because assert() was called, test classes were reachable from the
closures, which made them not serializable, and

2. (more importantly) these assertions never actually executed, since
they occurred within a lazy map()

This commit simply changes the Graph.apply test to collects the graph
triplets so it can assert about each triplet from a map method.

commit 70a449d87018e7bfa8dbf7249948a7f48a891719
Author: William Benton 
Date:   2014-03-14T17:33:33Z

Make proactive serializability checking optional.

SparkContext.clean uses ClosureCleaner's proactive serializability
checking by default.  This commit adds an overloaded clean method
to SparkContext that allows clients to specify that serializability
checking should not occur as part of closure cleaning.

commit 9eb301387644d5c14a03a0bbb96c6b007f228f3d
Author: William Benton 
Date:   2014-03-14T17:34:42Z

Don't check serializability of DStream transforms.

Since the DStream is reachable from within these closures, they aren't
checkable by the straightforward technique of passing them to the
closure serializer.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-897: preemptively serialize closures

2014-03-14 Thread willb
Github user willb commented on a diff in the pull request:

https://github.com/apache/spark/pull/143#discussion_r10622998
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
@@ -533,7 +533,7 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
   def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] 
= {
-transform((r: RDD[T], t: Time) => 
context.sparkContext.clean(transformFunc(r)))
+transform((r: RDD[T], t: Time) => 
context.sparkContext.clean(transformFunc(r), false))
--- End diff --

As far as I can tell, the issue here is that the `DStream` object is 
reachable from `transformFunc(r )`, and `DStream.writeObject` generates a 
runtime exception if it is called from within the closure serializer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-897: preemptively serialize closures

2014-03-14 Thread willb
Github user willb commented on the pull request:

https://github.com/apache/spark/pull/143#issuecomment-37686459
  
Yes, my understanding of SPARK-897 is that the issue is ensuring 
serializability errors are reported to the user as soon as possible.  And 
essentially what these commits do is replicate the closure-serializability 
check (which, as you note, occurs now in the scheduler as part of job 
submission) in `ClosureCleaner.clean`, which is called for every closure 
argument to RDD transformation methods in the driver.  (The test cases I added 
in f2ef54e check to see that unserializable-closure failures happen immediately 
on transformation invocation, not merely after actions occur.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-897: preemptively serialize closures

2014-03-14 Thread willb
Github user willb commented on the pull request:

https://github.com/apache/spark/pull/143#issuecomment-37690238
  
Here's what I was thinking about that:  I left the check in `DAGScheduler` 
in place because preemptive checking is optional (and indeed not done 
everywhere) and it seems like it is in general a pretty inexpensive thing to do 
to make sure the error is reported where it's sensible at all (as far as I 
could tell, `clean` may be called multiple times per closure in any case).  I 
absolutely agree that it would be ideal to do serializability checking (as well 
as closure cleaning) only once per closure, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-897: preemptively serialize closures

2014-03-14 Thread willb
Github user willb commented on the pull request:

https://github.com/apache/spark/pull/143#issuecomment-37693355
  
A configuration option makes sense to me and I'm happy to add it.  Let me 
know if you have strong feelings about what it should be called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---