[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread sryza
GitHub user sryza opened a pull request:

https://github.com/apache/spark/pull/148

SPARK-1252. On YARN, use container-log4j.properties for executors

container-log4j.properties is a file that YARN provides so that containers 
can have log4j.properties distinct from that of the NodeManagers.

Logs now go to syslog, and stderr and stdout just have the process's 
standard err and standard out.

I tested this on pseudo-distributed clusters for both yarn (Hadoop 2.2) and 
yarn-alpha (Hadoop 0.23.7)/

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sryza/spark sandy-spark-1252

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/148.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #148


commit fa5bdf12eeb7ee4866985a8663b70aa3605e6d6c
Author: Sandy Ryza 
Date:   2014-03-14T23:11:03Z

SPARK-1252. On YARN, use container-log4j.properties for executors




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread rxin
GitHub user rxin opened a pull request:

https://github.com/apache/spark/pull/149

SPARK-1255: Allow user to pass Serializer object instead of class name for 
shuffle.

This is more general than simply passing a string name and leaves more room 
for performance optimizations.

Note that this is technically an API breaking change - but I suspect nobody 
else in this world has used this API other than me in GraphX and Shark.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rxin/spark serializer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/149.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #149


commit 7420185373ca90950af0f24831ad3ef10c097acc
Author: Reynold Xin 
Date:   2014-03-15T08:09:15Z

Allow user to pass Serializer object instead of class name for shuffle.

This is more general than simply passing a string name and leaves more room 
for performance optimizations.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37720234
  
@marmbrus this is for you!
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37720722
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37720723
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37720725
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37720724
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1246, added min max API to Double RDDs i...

2014-03-15 Thread ScrapCodes
Github user ScrapCodes closed the pull request at:

https://github.com/apache/spark/pull/140


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37722638
  
One or more automated tests failed
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13193/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37722637
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13194/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37722634
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37722635
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix SPARK-1256: Master web UI and Worker web U...

2014-03-15 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/150

Fix SPARK-1256: Master web UI and Worker web UI returns a 404 error



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-1256

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/150.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #150


commit c99b0302b8690abc18b9c98f99fc5864d706fad3
Author: witgo 
Date:   2014-03-15T11:11:20Z

Fix SPARK-1256




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix SPARK-1256: Master web UI and Worker web U...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/150#issuecomment-37723748
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [re-cont] map and flatMap

2014-03-15 Thread andy petrella
Yep,
Regarding flatMap and an implicit parameter might work like in scala's
future for instance:
https://github.com/scala/scala/blob/master/src/library/scala/concurrent/Future.scala#L246

Dunno, still waiting for some insights from the team ^^

andy

On Wed, Mar 12, 2014 at 3:23 PM, Pascal Voitot Dev <
pascal.voitot@gmail.com> wrote:

> On Wed, Mar 12, 2014 at 3:06 PM, andy petrella  >wrote:
>
> > Folks,
> >
> > I want just to pint something out...
> > I didn't had time yet to sort it out and to think enough to give valuable
> > strict explanation of -- event though, intuitively I feel they are a lot
> > ===> need spark people or time to move forward.
> > But here is the thing regarding *flatMap*.
> >
> > Actually, it looks like (and again intuitively makes sense) that RDD (and
> > of course DStream) aren't monadic and it is reflected in the
> implementation
> > (and signature) of flatMap.
> >
> > >
> > > *  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = **
> > > new FlatMappedRDD(this, sc.clean(f))*
> >
> >
> > There!? flatMap (or bind, >>=) should take a function that use the same
> > Higher level abstraction in order to be considered as such right?
> >
> >
> I had remarked exactly the same thing and asked myself the same question...
>
> In this case, it takes a function that returns a TraversableOnce which is
> > the type of the content of the RDD, and what represent the output is more
> > the content of the RDD than the RDD itself (still right?).
> >
> > This actually breaks the understand of map and flatMap
> >
> > > *def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this,
> > > sc.clean(f))*
> >
> >
> > Indeed, RDD is a functor and the underlying reason for flatMap to not
> take
> > A => RDD[B] doesn't show up in map.
> >
> > This has a lot of consequence actually, because at first one might want
> to
> > create for-comprehension over RDDs, of even Traversable[F[_]] functions
> > like sequence -- and he will get stuck since the signature aren't
> > compliant.
> > More importantly, Scala uses convention on the structure of a type to
> allow
> > for-comp... so where Traversable[F[_]] will fail on type, for-comp will
> > failed weirdly.
> >
>
> +1
>
>
> >
> > Again this signature sounds normal, because my intuitive feeling about
> RDDs
> > is that they *only can* be monadic but the composition would depend on
> the
> > use case and might have heavy consequences (unioning the RDDs for
> instance
> > => this happening behind the sea can be a big pain, since it wouldn't be
> > efficient at all).
> >
> > So Yes, RDD could be monadic but with care.
> >
>
> At least we can say, it is a Functor...
> Actually, I had imagined studying the monadic aspect of RDDs but as you
> said, it's not so easy...
> So for now, I consider them as pseudo-monadic ;)
>
>
>
> > So what exposes this signature is a way to flatMap over the inner value,
> > like it is almost the case for Map (flatMapValues)
> >
> > So, wouldn't be better to rename flatMap as flatMapData (or whatever
> better
> > name)? Or to have flatMap requiring a Monad instance of RDD?
> >
> >
> renaming is to flatMapData or flatTraversableMap sounds good to me (even if
> lots of people will hate it...)
> flatMap requiring a Monad would make it impossible to use with
> for-comprehension certainly no?
>
>
> > Sorry for the prose, just dropped my thoughts and feelings at once :-/
> >
> >
> I agree with you in case it can help not to feel alone ;)
>
> Pascal
>
> Cheers,
> > andy
> >
> > PS: and my English maybe, although my name's Andy I'm a native Belgian
> ^^.
> >
>


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/149#discussion_r10635398
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends 
Dependency(rdd) {
  * Represents a dependency on the output of a shuffle stage.
  * @param rdd the parent RDD
  * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
+ * @param serializer [[Serializer]] to use. If set to null, the default 
serializer, as specified
+ *  by `spark.serializer` config option, will be used.
  */
 class ShuffleDependency[K, V](
 @transient rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
-val serializerClass: String = null)
+val serializer: Serializer = null)
--- End diff --

do you know roughly how large the serialized serializer ends up being? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/136#discussion_r10635413
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.{TaskContext, Partition}
+
+private[spark]
+class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Array[T])
+  extends Partition with Serializable {
+  override val index: Int = idx
+}
+
+/**
+ * Represents a RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
+ * window over them. The ordering is first based on the partition index 
and then the ordering of
+ * items within each partition. This is similar to sliding in Scala 
collections, except that it
+ * becomes an empty RDD if the window size is greater than the total 
number of items. It needs to
+ * trigger a Spark job if the parent RDD has more than one partitions.
+ *
+ * @param parent the parent RDD
+ * @param windowSize the window size, must be greater than 1
+ *
+ * @see [[org.apache.spark.rdd.RDD#sliding]]
+ */
+private[spark]
+class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val 
windowSize: Int)
--- End diff --

I'm not sure "Slided" is word in english (Slide is an irregular verb). I 
think the past participle of "Slide" is "Slid" or "Slidden". But in any case, 
maybe we could just call this `SlidingRDD`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/136#discussion_r10635444
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.{TaskContext, Partition}
+
+private[spark]
+class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Array[T])
+  extends Partition with Serializable {
+  override val index: Int = idx
+}
+
+/**
+ * Represents a RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
+ * window over them. The ordering is first based on the partition index 
and then the ordering of
+ * items within each partition. This is similar to sliding in Scala 
collections, except that it
+ * becomes an empty RDD if the window size is greater than the total 
number of items. It needs to
+ * trigger a Spark job if the parent RDD has more than one partitions.
+ *
+ * @param parent the parent RDD
+ * @param windowSize the window size, must be greater than 1
+ *
+ * @see [[org.apache.spark.rdd.RDD#sliding]]
+ */
+private[spark]
+class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val 
windowSize: Int)
+  extends RDD[Array[T]](parent) {
--- End diff --

Could this type be more general - e.g. `RDD[Seq[T]]`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/136#discussion_r10635447
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.{TaskContext, Partition}
+
+private[spark]
+class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Array[T])
+  extends Partition with Serializable {
+  override val index: Int = idx
+}
+
+/**
+ * Represents a RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
+ * window over them. The ordering is first based on the partition index 
and then the ordering of
+ * items within each partition. This is similar to sliding in Scala 
collections, except that it
+ * becomes an empty RDD if the window size is greater than the total 
number of items. It needs to
+ * trigger a Spark job if the parent RDD has more than one partitions.
+ *
+ * @param parent the parent RDD
+ * @param windowSize the window size, must be greater than 1
+ *
+ * @see [[org.apache.spark.rdd.RDD#sliding]]
+ */
+private[spark]
+class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val 
windowSize: Int)
+  extends RDD[Array[T]](parent) {
+
+  require(windowSize > 1, "Window size must be greater than 1.")
--- End diff --

It would be nice to also print the provided window size. E.g. if it's 
negative or something due to an overflow people might be confused to get this 
error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [re-cont] map and flatMap

2014-03-15 Thread Koert Kuipers
MappedRDD does:
firstParent[T].iterator(split, context).map(f)

and FlatMappedRDD:
firstParent[T].iterator(split, context).flatMap(f)

do yeah seems like its a map or flatMap over the iterator inside, not the
RDD itself, sort of...


On Sat, Mar 15, 2014 at 9:08 AM, andy petrella wrote:

> Yep,
> Regarding flatMap and an implicit parameter might work like in scala's
> future for instance:
>
> https://github.com/scala/scala/blob/master/src/library/scala/concurrent/Future.scala#L246
>
> Dunno, still waiting for some insights from the team ^^
>
> andy
>
> On Wed, Mar 12, 2014 at 3:23 PM, Pascal Voitot Dev <
> pascal.voitot@gmail.com> wrote:
>
> > On Wed, Mar 12, 2014 at 3:06 PM, andy petrella  > >wrote:
> >
> > > Folks,
> > >
> > > I want just to pint something out...
> > > I didn't had time yet to sort it out and to think enough to give
> valuable
> > > strict explanation of -- event though, intuitively I feel they are a
> lot
> > > ===> need spark people or time to move forward.
> > > But here is the thing regarding *flatMap*.
> > >
> > > Actually, it looks like (and again intuitively makes sense) that RDD
> (and
> > > of course DStream) aren't monadic and it is reflected in the
> > implementation
> > > (and signature) of flatMap.
> > >
> > > >
> > > > *  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = **
> > > > new FlatMappedRDD(this, sc.clean(f))*
> > >
> > >
> > > There!? flatMap (or bind, >>=) should take a function that use the same
> > > Higher level abstraction in order to be considered as such right?
> > >
> > >
> > I had remarked exactly the same thing and asked myself the same
> question...
> >
> > In this case, it takes a function that returns a TraversableOnce which is
> > > the type of the content of the RDD, and what represent the output is
> more
> > > the content of the RDD than the RDD itself (still right?).
> > >
> > > This actually breaks the understand of map and flatMap
> > >
> > > > *def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this,
> > > > sc.clean(f))*
> > >
> > >
> > > Indeed, RDD is a functor and the underlying reason for flatMap to not
> > take
> > > A => RDD[B] doesn't show up in map.
> > >
> > > This has a lot of consequence actually, because at first one might want
> > to
> > > create for-comprehension over RDDs, of even Traversable[F[_]] functions
> > > like sequence -- and he will get stuck since the signature aren't
> > > compliant.
> > > More importantly, Scala uses convention on the structure of a type to
> > allow
> > > for-comp... so where Traversable[F[_]] will fail on type, for-comp will
> > > failed weirdly.
> > >
> >
> > +1
> >
> >
> > >
> > > Again this signature sounds normal, because my intuitive feeling about
> > RDDs
> > > is that they *only can* be monadic but the composition would depend on
> > the
> > > use case and might have heavy consequences (unioning the RDDs for
> > instance
> > > => this happening behind the sea can be a big pain, since it wouldn't
> be
> > > efficient at all).
> > >
> > > So Yes, RDD could be monadic but with care.
> > >
> >
> > At least we can say, it is a Functor...
> > Actually, I had imagined studying the monadic aspect of RDDs but as you
> > said, it's not so easy...
> > So for now, I consider them as pseudo-monadic ;)
> >
> >
> >
> > > So what exposes this signature is a way to flatMap over the inner
> value,
> > > like it is almost the case for Map (flatMapValues)
> > >
> > > So, wouldn't be better to rename flatMap as flatMapData (or whatever
> > better
> > > name)? Or to have flatMap requiring a Monad instance of RDD?
> > >
> > >
> > renaming is to flatMapData or flatTraversableMap sounds good to me (even
> if
> > lots of people will hate it...)
> > flatMap requiring a Monad would make it impossible to use with
> > for-comprehension certainly no?
> >
> >
> > > Sorry for the prose, just dropped my thoughts and feelings at once :-/
> > >
> > >
> > I agree with you in case it can help not to feel alone ;)
> >
> > Pascal
> >
> > Cheers,
> > > andy
> > >
> > > PS: and my English maybe, although my name's Andy I'm a native Belgian
> > ^^.
> > >
> >
>


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/136#discussion_r10635557
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.{TaskContext, Partition}
+
+private[spark]
+class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Array[T])
+  extends Partition with Serializable {
+  override val index: Int = idx
+}
+
+/**
+ * Represents a RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
+ * window over them. The ordering is first based on the partition index 
and then the ordering of
+ * items within each partition. This is similar to sliding in Scala 
collections, except that it
+ * becomes an empty RDD if the window size is greater than the total 
number of items. It needs to
+ * trigger a Spark job if the parent RDD has more than one partitions.
+ *
+ * @param parent the parent RDD
+ * @param windowSize the window size, must be greater than 1
+ *
+ * @see [[org.apache.spark.rdd.RDD#sliding]]
+ */
+private[spark]
+class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val 
windowSize: Int)
--- End diff --

Will change to SlidingRDD. Thanks for catching this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37732233
  
I don't think we typically run jobs inside of getPartitions - so this 
changes some semantics of calling that function. For instance a lot of the 
other RDD constructors immediately access the partitions of their parents when 
constructed. This would change the lazy evaluation model of Spark.

The difficulties here are coming from the fact that sliding really _isn't_ 
a parallelizable problem. In the limit case where the partitions size is small 
and/or the window size is close to the number of partitions, this effectively 
requires all-to-all communication.

I wonder if it's better implemented using a shuffle. This is more expensive 
but it might be the only way. Curious what @mateiz, @rxin, @aarondav and others 
think on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37732586
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37732587
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/136#discussion_r10635644
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.{TaskContext, Partition}
+
+private[spark]
+class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Array[T])
+  extends Partition with Serializable {
+  override val index: Int = idx
+}
+
+/**
+ * Represents a RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
+ * window over them. The ordering is first based on the partition index 
and then the ordering of
+ * items within each partition. This is similar to sliding in Scala 
collections, except that it
+ * becomes an empty RDD if the window size is greater than the total 
number of items. It needs to
+ * trigger a Spark job if the parent RDD has more than one partitions.
+ *
+ * @param parent the parent RDD
+ * @param windowSize the window size, must be greater than 1
+ *
+ * @see [[org.apache.spark.rdd.RDD#sliding]]
+ */
+private[spark]
+class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val 
windowSize: Int)
+  extends RDD[Array[T]](parent) {
--- End diff --

This is to be consistent with RDD.glom() and RDD.collect(), where Array[T] 
is used. In Scala Array.sliding returns Iterator[Array[T]]. I can change it to 
Seq to be more general. Please see the updated PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/136#discussion_r10635646
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SlidedRDD.scala ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.spark.{TaskContext, Partition}
+
+private[spark]
+class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: 
Array[T])
+  extends Partition with Serializable {
+  override val index: Int = idx
+}
+
+/**
+ * Represents a RDD from grouping items of its parent RDD in fixed size 
blocks by passing a sliding
+ * window over them. The ordering is first based on the partition index 
and then the ordering of
+ * items within each partition. This is similar to sliding in Scala 
collections, except that it
+ * becomes an empty RDD if the window size is greater than the total 
number of items. It needs to
+ * trigger a Spark job if the parent RDD has more than one partitions.
+ *
+ * @param parent the parent RDD
+ * @param windowSize the window size, must be greater than 1
+ *
+ * @see [[org.apache.spark.rdd.RDD#sliding]]
+ */
+private[spark]
+class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val 
windowSize: Int)
+  extends RDD[Array[T]](parent) {
+
+  require(windowSize > 1, "Window size must be greater than 1.")
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37732906
  
@pwendell , the limit case is not a practical example. In that case, we 
need re-partition for most operations to be efficient. Also, this is really for 
small window sizes like 2 to 5 in my mind. We tell users that this would 
trigger a Spark job. I put the work in getPartitions because operations such as 
aggregate and foreach do not need a second job, which we can put the optimized 
version later. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [re-cont] map and flatMap

2014-03-15 Thread Koert Kuipers
just going head first without any thinking, it changed flatMap to
flatMapData and added a flatMap. for FlatMappedRDD my compute is:

firstParent[T].iterator(split, context).flatMap(f andThen (_.compute(split,
context)))


scala> val x = sc.parallelize(1 to 100)
scala> x.flatMap _
res0: (Int => org.apache.spark.rdd.RDD[Nothing]) =>
org.apache.spark.rdd.RDD[Nothing] = 

my f for flatMap is now f: T => RDD[U], however, i am not sure how to write
a useful function for this :)



On Sat, Mar 15, 2014 at 1:17 PM, Koert Kuipers  wrote:

> MappedRDD does:
> firstParent[T].iterator(split, context).map(f)
>
> and FlatMappedRDD:
> firstParent[T].iterator(split, context).flatMap(f)
>
> do yeah seems like its a map or flatMap over the iterator inside, not the
> RDD itself, sort of...
>
>
> On Sat, Mar 15, 2014 at 9:08 AM, andy petrella wrote:
>
>> Yep,
>> Regarding flatMap and an implicit parameter might work like in scala's
>> future for instance:
>>
>> https://github.com/scala/scala/blob/master/src/library/scala/concurrent/Future.scala#L246
>>
>> Dunno, still waiting for some insights from the team ^^
>>
>> andy
>>
>> On Wed, Mar 12, 2014 at 3:23 PM, Pascal Voitot Dev <
>> pascal.voitot@gmail.com> wrote:
>>
>> > On Wed, Mar 12, 2014 at 3:06 PM, andy petrella > > >wrote:
>> >
>> > > Folks,
>> > >
>> > > I want just to pint something out...
>> > > I didn't had time yet to sort it out and to think enough to give
>> valuable
>> > > strict explanation of -- event though, intuitively I feel they are a
>> lot
>> > > ===> need spark people or time to move forward.
>> > > But here is the thing regarding *flatMap*.
>> > >
>> > > Actually, it looks like (and again intuitively makes sense) that RDD
>> (and
>> > > of course DStream) aren't monadic and it is reflected in the
>> > implementation
>> > > (and signature) of flatMap.
>> > >
>> > > >
>> > > > *  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = **
>> > > > new FlatMappedRDD(this, sc.clean(f))*
>> > >
>> > >
>> > > There!? flatMap (or bind, >>=) should take a function that use the
>> same
>> > > Higher level abstraction in order to be considered as such right?
>> > >
>> > >
>> > I had remarked exactly the same thing and asked myself the same
>> question...
>> >
>> > In this case, it takes a function that returns a TraversableOnce which
>> is
>> > > the type of the content of the RDD, and what represent the output is
>> more
>> > > the content of the RDD than the RDD itself (still right?).
>> > >
>> > > This actually breaks the understand of map and flatMap
>> > >
>> > > > *def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this,
>> > > > sc.clean(f))*
>> > >
>> > >
>> > > Indeed, RDD is a functor and the underlying reason for flatMap to not
>> > take
>> > > A => RDD[B] doesn't show up in map.
>> > >
>> > > This has a lot of consequence actually, because at first one might
>> want
>> > to
>> > > create for-comprehension over RDDs, of even Traversable[F[_]]
>> functions
>> > > like sequence -- and he will get stuck since the signature aren't
>> > > compliant.
>> > > More importantly, Scala uses convention on the structure of a type to
>> > allow
>> > > for-comp... so where Traversable[F[_]] will fail on type, for-comp
>> will
>> > > failed weirdly.
>> > >
>> >
>> > +1
>> >
>> >
>> > >
>> > > Again this signature sounds normal, because my intuitive feeling about
>> > RDDs
>> > > is that they *only can* be monadic but the composition would depend on
>> > the
>> > > use case and might have heavy consequences (unioning the RDDs for
>> > instance
>> > > => this happening behind the sea can be a big pain, since it wouldn't
>> be
>> > > efficient at all).
>> > >
>> > > So Yes, RDD could be monadic but with care.
>> > >
>> >
>> > At least we can say, it is a Functor...
>> > Actually, I had imagined studying the monadic aspect of RDDs but as you
>> > said, it's not so easy...
>> > So for now, I consider them as pseudo-monadic ;)
>> >
>> >
>> >
>> > > So what exposes this signature is a way to flatMap over the inner
>> value,
>> > > like it is almost the case for Map (flatMapValues)
>> > >
>> > > So, wouldn't be better to rename flatMap as flatMapData (or whatever
>> > better
>> > > name)? Or to have flatMap requiring a Monad instance of RDD?
>> > >
>> > >
>> > renaming is to flatMapData or flatTraversableMap sounds good to me
>> (even if
>> > lots of people will hate it...)
>> > flatMap requiring a Monad would make it impossible to use with
>> > for-comprehension certainly no?
>> >
>> >
>> > > Sorry for the prose, just dropped my thoughts and feelings at once :-/
>> > >
>> > >
>> > I agree with you in case it can help not to feel alone ;)
>> >
>> > Pascal
>> >
>> > Cheers,
>> > > andy
>> > >
>> > > PS: and my English maybe, although my name's Andy I'm a native Belgian
>> > ^^.
>> > >
>> >
>>
>
>


Re: [re-cont] map and flatMap

2014-03-15 Thread andy petrella
[Thanks a *lot* for your answers!]

That's CoOl, a possible example would be to simply write a
for-comprehension that would do this:
>
> val allEvents = for {
>   deviceId  <- rddFromHdfsOfDeviceId
>   deviceEvent <- rddFromHdfsOfDeviceEvent(deviceId)
> } deviceEvent
> val hist = computeHistOf(allEvents)


It's just an example huh ( :-D )

Also here is the answer I was preparing to your previous answer

yes, it's what does
> And it makes a lot of sense because RDD is just an abstraction of
> the whole sequence of data and distribute it in a resilient fashion.
> So RDD#flatMap would mean: distribute in a resilient fashion a sequence of
> RDDs before aggregate (or consolidate or what not verb ^^) them all.
> A example to explain myself maybe ;-) (and maybe also to confirm my own
> understanding)
> * data (logical representation)
>  > [A1, A2, A3, A4, A5, A6, A7, A8, A9, A10]
> * rdd-ized
> * > rdd1*[A1, A2, A3] *rdd2*[A4, A5] *rdd3*[A6, A7, A8, A9, A10]
> * map with f:A=>B
> * > rdd1'*[B1, B2, B3] *rdd2'*[B4, B5] *rdd3'*[B6, B7, B8, B9, B10]
> * current flatMap with f: A => Traversable[B]  (*not redistributed I
> suppose*)
>  > (intermediate step) *rdd1'*[[B11, B12], [B21] , []] *rdd2'*[[],
> [B41,B42,B43]] *rdd3'*[[B61], [B71, B72], [B81, B82, B83, B84], [], []]
>  > (final result)   *rdd1"*[B11, B12, B21] *rdd2**"*[B41,B42,B43]
> *rdd3**"*[B61, B71, B72, B81, B82, B83, B84]
> * for expression compliant version of flatMap with f: A => RDD[B]  (*could
> involve some redistributions I presume*)
>  > (intermediate step) *rdd1'*[*rdd11*[B11] *rdd11*[B12] *rdd12*[B21]]
> *rdd2'*[*rdd21*[B41] *rdd22*[B42, B43]] *rdd3'*[*rdd31*[B61] *rdd32*[B71,
> B72], *rdd33*[B81] *rdd34*[B82, B83] *rdd35*[B84]]
>  > (final result)   *rdd1**"*[B11, B12, B21] *rdd2**"*
> [B41,B42,B43] *rdd3**"*[B61, B71, B72, B81, B82, B83, B84]
>
> As you can see in the example I tried to depict here, the final result
> that would be seen is the same but in between there might have some
> redistribution (have a quick look at B81, B82, B83, B84).
> That's why, imho, it'd interesting to clear this out by either renaming
> the function -- or even awesomely better making flatMap able to
> redistribute in-between (which can have a big impact at the reconciliation
> => my first mail :-D).
> Tell me if I'm completely wrong ;-) -- or if I forget something in my
> actual understanding -- or if there is something similar already existing
> and I'm not aware of
> Cheers,
> andy
>
>
>

Andy Petrella
Belgium (Liège)

*   *
 Data Engineer in *NextLab  sprl* (owner)
 Engaged Citizen Coder for *WAJUG * (co-founder)
 Author of *Learning Play! Framework 2
*
 Bio: on visify 
*   *
Mobile: *+32 495 99 11 04*
Mails:

   - andy.petre...@nextlab.be
   - andy.petre...@gmail.com

*   *
Socials:

   - Twitter: https://twitter.com/#!/noootsab
   - LinkedIn: http://be.linkedin.com/in/andypetrella
   - Blogger: http://ska-la.blogspot.com/
   - GitHub:  https://github.com/andypetrella
   - Masterbranch: https://masterbranch.com/andy.petrella



On Sat, Mar 15, 2014 at 7:06 PM, Koert Kuipers  wrote:

> just going head first without any thinking, it changed flatMap to
> flatMapData and added a flatMap. for FlatMappedRDD my compute is:
>
> firstParent[T].iterator(split, context).flatMap(f andThen (_.compute(split,
> context)))
>
>
> scala> val x = sc.parallelize(1 to 100)
> scala> x.flatMap _
> res0: (Int => org.apache.spark.rdd.RDD[Nothing]) =>
> org.apache.spark.rdd.RDD[Nothing] = 
>
> my f for flatMap is now f: T => RDD[U], however, i am not sure how to write
> a useful function for this :)
>
>
>
> On Sat, Mar 15, 2014 at 1:17 PM, Koert Kuipers  wrote:
>
> > MappedRDD does:
> > firstParent[T].iterator(split, context).map(f)
> >
> > and FlatMappedRDD:
> > firstParent[T].iterator(split, context).flatMap(f)
> >
> > do yeah seems like its a map or flatMap over the iterator inside, not the
> > RDD itself, sort of...
> >
> >
> > On Sat, Mar 15, 2014 at 9:08 AM, andy petrella  >wrote:
> >
> >> Yep,
> >> Regarding flatMap and an implicit parameter might work like in scala's
> >> future for instance:
> >>
> >>
> https://github.com/scala/scala/blob/master/src/library/scala/concurrent/Future.scala#L246
> >>
> >> Dunno, still waiting for some insights from the team ^^
> >>
> >> andy
> >>
> >> On Wed, Mar 12, 2014 at 3:23 PM, Pascal Voitot Dev <
> >> pascal.voitot@gmail.com> wrote:
> >>
> >> > On Wed, Mar 12, 2014 at 3:06 PM, andy petrella <
> andy.petre...@gmail.com
> >> > >wrote:
> >> >
> >> > > Folks,
> >> > >
> >> > > I want just to pint something out...
> >> > > I didn't had time yet to sort it out and to think enough to give
> >> valuable
> >> > > strict explanation of -- event though, intuitively I feel they are a
> >> lot
> >> > > ===> need spark people or time to move 

[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37733755
  
Seems reasonable to me. You still working on this or is it good to go?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37733845
  
Ah I see - so this isn't going to be externally a user-visible class (I 
didn't notice it was `private[spark]`)? Would it make sense to throw an 
assertion error if the slide interval isn't small?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37733908
  
Even if it's private we can end up with cases where users have a e.g. 
10,000 partition RDD with only a few items in each partition. Do we know a 
priori when calling this that that won't be the case? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/147#discussion_r10635962
  
--- Diff: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
---
@@ -136,4 +123,47 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
 // failure should be cached
 intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 
0) }
   }
+
+  test("remote fetch exceeding akka frame size") {
+val newConf = new SparkConf
+newConf.set("spark.akka.frameSize", "1")
+newConf.set("spark.akka.askTimeout", "1") // Fail fast
+val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(newConf)
+
+// Frame size should be ~123B, and no exception should be thrown
+masterTracker.registerShuffle(10, 1)
+masterTracker.registerMapOutput(10, 0, new MapStatus(
+  BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
+slaveTracker.getServerStatuses(10, 0)
+
+// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should 
throw exception
+masterTracker.registerShuffle(20, 100)
+(0 until 100).foreach { i =>
+  masterTracker.registerMapOutput(20, i, new MapStatus(
+BlockManagerId("999", "mps", 1000, 0), 
Array.fill[Byte](400)(0)))
+}
+intercept[SparkException] { slaveTracker.getServerStatuses(20, 0) }
+  }
+
+  private def setUpMasterSlaveSystem(conf: SparkConf) = {
--- End diff --

Note that test before this one also sets up an entire actor system but 
finishes in under a second. This test is ~4 seconds long because we have to 
wait for the akka timeouts before an exception is thrown.

In any case, I used the akka test framework you suggested and that 
considerably simplified the test. (Note that the duration is still the same, 
however)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/147#discussion_r10635964
  
--- Diff: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
---
@@ -136,4 +142,30 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
 // failure should be cached
 intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 
0) }
   }
+
+  test("remote fetch exceeding akka frame size") {
+val newConf = new SparkConf
+newConf.set("spark.akka.frameSize", "1")
+newConf.set("spark.akka.askTimeout", "1") // Fail fast
--- End diff --

is this conf needed anymore? is it relevant with the local actor ref?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/147#discussion_r10635967
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -35,13 +35,21 @@ private[spark] case class 
GetMapOutputStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
 private[spark] case object StopMapOutputTracker extends 
MapOutputTrackerMessage
 
-private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster)
+private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster, conf: SparkConf)
   extends Actor with Logging {
+  val maxAkkaFrameSize = conf.getInt("spark.akka.frameSize", 10) * 1024 * 
1024 // MB
--- End diff --

Not sure what you mean exactly about 1)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/147#discussion_r10635968
  
--- Diff: core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
---
@@ -136,4 +142,30 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
 // failure should be cached
 intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 
0) }
   }
+
+  test("remote fetch exceeding akka frame size") {
+val newConf = new SparkConf
+newConf.set("spark.akka.frameSize", "1")
+newConf.set("spark.akka.askTimeout", "1") // Fail fast
--- End diff --

Yeah, we need it to lower the frame size so we can actually do this within 
a short amount of time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37734195
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13195/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37734193
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/147#issuecomment-37734238
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/147#issuecomment-37734239
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37734242
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37734241
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread mengxr
Github user mengxr commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37734634
  
It is hard to say what threshold to use. I couldn't think of a use case 
that requires a large window size, but I cannot say there is none.

Another possible approach is to pass all parent partitions to 
SlidingRDDPartition and then retrieve the tail to append in compute(). If we 
find we need to scan many partitions to assemble the tail, we send a warning 
message. I'm not sure whether this would be more efficient than the current 
implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: fix compile error for hadoop CDH 4.4+

2014-03-15 Thread gzm55
GitHub user gzm55 opened a pull request:

https://github.com/apache/spark/pull/151

fix compile error for hadoop CDH 4.4+

Fix the compilation error when set SPARK_HADOOP_VERSION to 2.0.0-cdh4.4.0, 
That is, the yarn-alpha project should work with hadoop CDH 4.4.0 and later.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gzm55/spark work/yarn-beta-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/151.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #151


commit 02083d64b8fab01fb2b98d5894da5bf90e9f85f4
Author: James Z.M. Gao 
Date:   2014-03-14T10:50:08Z

fix compile error for hadoop CDH 4.4+

Using a macro, we work round the difference between hadoop 2.0-alpha and
2.1-beta api, and fix the compilation error when set
SPARK_HADOOP_VERSION to 2.0.0-cdh4.4.0. That is, the yarn-alpha project
should work with hadoop CDH 4.4+ and later.

commit 5eefaa92a764b54ed3d452a8a9dcee930d80e163
Author: James Z.M. Gao 
Date:   2014-03-06T07:28:18Z

fix compile error of streaming project

explicit return type for implicit function




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37735835
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13197/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37735834
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/147#issuecomment-37735832
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13196/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/147#issuecomment-37735830
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: fix compile error for hadoop CDH 4.4+

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/151#issuecomment-37735857
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1144 Added license and RAT to check lice...

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/125#issuecomment-37736072
  
@ScrapCodes this is a good start but right now it doesn't actually fail the 
build if RAT doesn't succeed. Also, RAT reports a bunch of failures for python 
files that I think we want to ignore here.

I'd actually pipe the output of RAT to a file called `rat-results.txt` and 
then we should grep that file and see if there are any license errors and if 
there are we should exit nonzero from the test runner.

```
java -jar lib/apache-rat-0.10.jar -E .rat-excludes -d . > rat-results.txt
cat rat-results.txt  | grep "??" 
```

Then if the errors are non-empty we should echo back to the user which 
files are missing and fail the test script



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1144 Added license and RAT to check lice...

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/125#discussion_r10636342
  
--- Diff: dev/rat.bash ---
@@ -0,0 +1,49 @@
+#!/usr/bin/env bash
--- End diff --

could you remove the `.bash` extension here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/149#discussion_r10636356
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends 
Dependency(rdd) {
  * Represents a dependency on the output of a shuffle stage.
  * @param rdd the parent RDD
  * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
+ * @param serializer [[Serializer]] to use. If set to null, the default 
serializer, as specified
+ *  by `spark.serializer` config option, will be used.
  */
 class ShuffleDependency[K, V](
 @transient rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
-val serializerClass: String = null)
+val serializer: Serializer = null)
--- End diff --

Compared with passing string, this adds about 58B to the task closure.  




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37736391
  
I am not sure what the intent of this PR is.
log config for workers should pretty much mirror what is in master.

Also, the hardcoding of the config file, root logger levels, etc when we 
already ship a logging property file seems counter intutive - I am probably 
missing something here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37736435
  
To a step back, given how niche this seems to be and how it violates the
"usual" expectations of how our users use spark (lazy execution, etc as
mentioned above) - do we want it to be part of the core api ?


On Sat, Mar 15, 2014 at 12:41 PM, UCB AMPLab 
wrote:

> All automated tests passed.
> Refer to this link for build results:
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13197/
>
> --
> Reply to this email directly or view it on 
GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: fix compile error for hadoop CDH 4.4+

2014-03-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/151#discussion_r10636404
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 ---
@@ -736,7 +736,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, 
V)])(
 }
 
 object JavaPairDStream {
-  implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: 
DStream[(K, V)]) = {
+  implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: 
DStream[(K, V)]): JavaPairDStream[K, V] = {
 new JavaPairDStream[K, V](dstream)
--- End diff --

Unrelated change ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: fix compile error for hadoop CDH 4.4+

2014-03-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/151#discussion_r10636411
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandlerMacro.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.language.experimental.macros
+import scala.reflect.macros.Context
+
+private[yarn] object YarnAllocationHandlerMacro {
+  def getAMResp(resp: Any): Any = macro getAMRespImpl
+
+  /**
+   * From Hadoop CDH 4.4.0+ (2.1.0-beta),
+   * AMResponse is merged into AllocateResponse,
+   * so we don't need to call getAMResponse(), just use AllocateResponse 
directly.
+   * This macro will test the existence of AMResponse,
+   * and generate diffenert expressions.
+   *
+   * This macro now is only used in spark's alpha version of yarn api.
+   * It stays in the core project, for the two-stage compiling of
+   * the scala macro system.
+   */
+  def getAMRespImpl(c: Context)(resp: c.Expr[Any]) = {
+try {
+  import c.universe._
+  c.mirror.staticClass("org.apache.hadoop.yarn.api.records.AMResponse")
+  c.Expr[Any](Apply(Select(resp.tree, newTermName("getAMResponse")), 
List()))
+} catch {
+  case _: Throwable => resp
+}
--- End diff --

Instead of doing this for each invocation - find out which method is 
exposed and set some flag to return appropriate value based on that via 
reflection.
I will defer to Tom on whether there is a better way to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/149#discussion_r10636424
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends 
Dependency(rdd) {
  * Represents a dependency on the output of a shuffle stage.
  * @param rdd the parent RDD
  * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
+ * @param serializer [[Serializer]] to use. If set to null, the default 
serializer, as specified
+ *  by `spark.serializer` config option, will be used.
  */
 class ShuffleDependency[K, V](
 @transient rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
-val serializerClass: String = null)
+val serializer: Serializer = null)
--- End diff --

Is it 'cos the SparkConf is getting shipped around ?
If yes, using the broadcast variable should alleviate it ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/147#discussion_r10636463
  
--- Diff: core/src/main/scala/org/apache/spark/util/AkkaUtils.scala ---
@@ -121,4 +121,9 @@ private[spark] object AkkaUtils extends Logging {
   def lookupTimeout(conf: SparkConf): FiniteDuration = {
 Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, 
"seconds")
   }
+
+  /** Returns the default max frame size for Akka messages in MB. */
+  def maxFrameSize(conf: SparkConf): Int = {
--- End diff --

Great - I like this even more than the suggested approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/147#discussion_r10636474
  
--- Diff: core/src/main/scala/org/apache/spark/util/AkkaUtils.scala ---
@@ -121,4 +121,9 @@ private[spark] object AkkaUtils extends Logging {
   def lookupTimeout(conf: SparkConf): FiniteDuration = {
 Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, 
"seconds")
   }
+
+  /** Returns the default max frame size for Akka messages in MB. */
+  def maxFrameSize(conf: SparkConf): Int = {
--- End diff --

So one idea - why not make this `maxFrameSizeBytes` (i.e. do the byte 
conversion inside of here) and also use the same code inside of 
`Executor.scala` in place of the thing that's there now. Then we have a single 
code path for doing this every time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/147#issuecomment-37736922
  
LGMT pending a minor comment about unifying the code path with the Executor 
thing that reads the frame size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37737398
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37737399
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/149#discussion_r10636602
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends 
Dependency(rdd) {
  * Represents a dependency on the output of a shuffle stage.
  * @param rdd the parent RDD
  * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
+ * @param serializer [[Serializer]] to use. If set to null, the default 
serializer, as specified
+ *  by `spark.serializer` config option, will be used.
  */
 class ShuffleDependency[K, V](
 @transient rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
-val serializerClass: String = null)
+val serializer: Serializer = null)
--- End diff --

No it wasn't. SparkConf was not serializable.

On Saturday, March 15, 2014, Mridul Muralidharan 
wrote:

> In core/src/main/scala/org/apache/spark/Dependency.scala:
>
> >   */
> >  class ShuffleDependency[K, V](
> >  @transient rdd: RDD[_ <: Product2[K, V]],
> >  val partitioner: Partitioner,
> > -val serializerClass: String = null)
> > +val serializer: Serializer = null)
>
> Is it 'cos the SparkConf is getting shipped around ?
> If yes, using the broadcast variable should alleviate it ?
>
> —
> Reply to this email directly or view it on 
GitHub
> .
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Spark 615 map partitions with index callable f...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/16#issuecomment-37740255
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37738873
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Spark 615 map partitions with index callable f...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/16#issuecomment-37740256
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/149#issuecomment-37738874
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13198/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/149#discussion_r10637181
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends 
Dependency(rdd) {
  * Represents a dependency on the output of a shuffle stage.
  * @param rdd the parent RDD
  * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
+ * @param serializer [[Serializer]] to use. If set to null, the default 
serializer, as specified
+ *  by `spark.serializer` config option, will be used.
  */
 class ShuffleDependency[K, V](
 @transient rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
-val serializerClass: String = null)
+val serializer: Serializer = null)
--- End diff --

good point. wondering what is adding this overhead then.
For JavaSerializer, it is just a single int (and some header for the class).
Is this 58k for kyro ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Spark 615 map partitions with index callable f...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/16#issuecomment-37742137
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13199/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Spark 615 map partitions with index callable f...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/16#issuecomment-37742136
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1254. Consolidate, order, and harmonize ...

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/145#issuecomment-37742365
  
Thanks I've merged this. One small change I added is to use 
`Resolver.mavenLocal` that sbt provides for you instead of hard coding it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1254. Consolidate, order, and harmonize ...

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/145#issuecomment-37742370
  

https://github.com/sbt/sbt/blob/0.13/ivy/src/main/scala/sbt/Resolver.scala?source=c#L289


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37742731
  
@mridulm I think the RDD definition is actually `private[spark]` and it's 
just intended to be used internally for higher level algorithms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37742890
  
@mridulm I think in YARN environments cluster operators can set a logging 
file on all of the machines to be shared across applications (e.g. Spark, 
MapReduce, etc). So the idea here is to point to that file. 

That said @sryza one thing here is that even if a user bundles a log4j file 
it will get ignored and replaced with the cluster default. That actually seems 
not ideal and I'm not sure there is a good way to detect whether the user has 
included their own file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Akka frame

2014-03-15 Thread pwendell
GitHub user pwendell opened a pull request:

https://github.com/apache/spark/pull/152

Akka frame

This is a very small change on top of @andrewor14's patch in #147.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pwendell/spark akka-frame

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #152


commit 281d7c9f83344cf107ed946635eda7be922e0267
Author: Andrew Or 
Date:   2014-03-15T03:18:35Z

Throw exception on spark.akka.frameSize exceeded + Unit tests

commit c9b610958da9fa01b34936ee4283bf266aec66bc
Author: Andrew Or 
Date:   2014-03-15T18:31:48Z

Simplify test + make access to akka frame size more modular

commit 9fef4c7755d78403347d9d0fa035c50a1ec98bd7
Author: Patrick Wendell 
Date:   2014-03-16T00:05:32Z

Consolidate Executor use of akka frame size




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/147#discussion_r10637306
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -35,13 +35,21 @@ private[spark] case class 
GetMapOutputStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
 private[spark] case object StopMapOutputTracker extends 
MapOutputTrackerMessage
 
-private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster)
+private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster, conf: SparkConf)
   extends Actor with Logging {
+  val maxAkkaFrameSize = AkkaUtils.maxFrameSize(conf) * 1024 * 1024 // MB
+
   def receive = {
 case GetMapOutputStatuses(shuffleId: Int) =>
   val hostPort = sender.path.address.hostPort
   logInfo("Asked to send map output locations for shuffle " + 
shuffleId + " to " + hostPort)
-  sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
+  val mapOutputStatuses = 
tracker.getSerializedMapOutputStatuses(shuffleId)
+  val serializedSize = mapOutputStatuses.size
+  if (serializedSize > maxAkkaFrameSize) {
+throw new SparkException(
+  "spark.akka.frameSize exceeded! Map output statuses were %d 
bytes".format(serializedSize))
--- End diff --

would be helpful to also include the configured frame size in this output 
message, since otherwise these problems can be extremely annoying to debug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/147#issuecomment-37743968
  
This should be ready to merge unless other people have more to add.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/147#issuecomment-37744049
  
Hey @andrewor14 I submitted some small changes on top of this while you 
were working on it over at #152.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1254. Consolidate, order, and harmonize ...

2014-03-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/145


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1244: Throw exception if map output stat...

2014-03-15 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/152#issuecomment-37744154
  
Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread andrewor14
Github user andrewor14 closed the pull request at:

https://github.com/apache/spark/pull/147


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1244] Throw exception if map output sta...

2014-03-15 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/147#issuecomment-37744167
  
Continued at #152. Closing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1244: Throw exception if map output stat...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/152#issuecomment-37744244
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1244: Throw exception if map output stat...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/152#issuecomment-37744245
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1244: Throw exception if map output stat...

2014-03-15 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/152#discussion_r10637484
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -35,13 +35,21 @@ private[spark] case class 
GetMapOutputStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
 private[spark] case object StopMapOutputTracker extends 
MapOutputTrackerMessage
 
-private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster)
+private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster, conf: SparkConf)
   extends Actor with Logging {
+  val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+
   def receive = {
 case GetMapOutputStatuses(shuffleId: Int) =>
   val hostPort = sender.path.address.hostPort
   logInfo("Asked to send map output locations for shuffle " + 
shuffleId + " to " + hostPort)
-  sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
+  val mapOutputStatuses = 
tracker.getSerializedMapOutputStatuses(shuffleId)
+  val serializedSize = mapOutputStatuses.size
+  if (serializedSize > maxAkkaFrameSize) {
+throw new SparkException(s"Map output statuses were 
$serializedSize bytes which " +
+  s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).")
--- End diff --

What are you expecting to happen to this thrown exception?  From what I'm 
seeing, nothing will happen except that this actor gets killed and a new one 
restarted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1244: Throw exception if map output stat...

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/152#discussion_r10637519
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -35,13 +35,21 @@ private[spark] case class 
GetMapOutputStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
 private[spark] case object StopMapOutputTracker extends 
MapOutputTrackerMessage
 
-private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster)
+private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster, conf: SparkConf)
   extends Actor with Logging {
+  val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+
   def receive = {
 case GetMapOutputStatuses(shuffleId: Int) =>
   val hostPort = sender.path.address.hostPort
   logInfo("Asked to send map output locations for shuffle " + 
shuffleId + " to " + hostPort)
-  sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
+  val mapOutputStatuses = 
tracker.getSerializedMapOutputStatuses(shuffleId)
+  val serializedSize = mapOutputStatuses.size
+  if (serializedSize > maxAkkaFrameSize) {
+throw new SparkException(s"Map output statuses were 
$serializedSize bytes which " +
+  s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).")
--- End diff --

Hm great point @markhamstra I forgot the actor will swallow it. I think our 
only option here is to just log an error because we can't change the message 
format between the master and worker (e.g. to allow the master to signal a 
failure in it's response) in a maintenance release and this is something we are 
trying to fix for 0.9.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1244: Throw exception if map output stat...

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/152#discussion_r10637523
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -35,13 +35,21 @@ private[spark] case class 
GetMapOutputStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
 private[spark] case object StopMapOutputTracker extends 
MapOutputTrackerMessage
 
-private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster)
+private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster, conf: SparkConf)
   extends Actor with Logging {
+  val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+
   def receive = {
 case GetMapOutputStatuses(shuffleId: Int) =>
   val hostPort = sender.path.address.hostPort
   logInfo("Asked to send map output locations for shuffle " + 
shuffleId + " to " + hostPort)
-  sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
+  val mapOutputStatuses = 
tracker.getSerializedMapOutputStatuses(shuffleId)
+  val serializedSize = mapOutputStatuses.size
+  if (serializedSize > maxAkkaFrameSize) {
+throw new SparkException(s"Map output statuses were 
$serializedSize bytes which " +
+  s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes).")
--- End diff --

We could also throw an exception anyways (e.g. to allow us to test this) 
but log an error message first. At then there is something the user can see 
rather than a silent failure as there is now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Code documentation

2014-03-15 Thread David Thomas
Is there any documentation available that explains the code architecture
that can help a new Spark framework developer?


Re: Code documentation

2014-03-15 Thread Reynold Xin
Take a look at
https://cwiki.apache.org/confluence/display/SPARK/Spark+Internals


On Sat, Mar 15, 2014 at 6:19 PM, David Thomas  wrote:

> Is there any documentation available that explains the code architecture
> that can help a new Spark framework developer?
>


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/149#discussion_r10637576
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends 
Dependency(rdd) {
  * Represents a dependency on the output of a shuffle stage.
  * @param rdd the parent RDD
  * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
+ * @param serializer [[Serializer]] to use. If set to null, the default 
serializer, as specified
+ *  by `spark.serializer` config option, will be used.
  */
 class ShuffleDependency[K, V](
 @transient rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
-val serializerClass: String = null)
+val serializer: Serializer = null)
--- End diff --

It's only 58 bytes, not 58KB, which is less than 5% increase for 
serializing ShuffleDependency, which is part of any RDD task. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1244: Throw exception if map output stat...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/152#issuecomment-37745299
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13200/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1244: Throw exception if map output stat...

2014-03-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/152#issuecomment-37745298
  
Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/149#discussion_r10637809
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends 
Dependency(rdd) {
  * Represents a dependency on the output of a shuffle stage.
  * @param rdd the parent RDD
  * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
+ * @param serializer [[Serializer]] to use. If set to null, the default 
serializer, as specified
+ *  by `spark.serializer` config option, will be used.
  */
 class ShuffleDependency[K, V](
 @transient rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
-val serializerClass: String = null)
+val serializer: Serializer = null)
--- End diff --

@rxin seems fine to me... thanks for digging into it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: fix compile error for hadoop CDH 4.4+

2014-03-15 Thread gzm55
Github user gzm55 commented on a diff in the pull request:

https://github.com/apache/spark/pull/151#discussion_r10637844
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 ---
@@ -736,7 +736,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, 
V)])(
 }
 
 object JavaPairDStream {
-  implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: 
DStream[(K, V)]) = {
+  implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: 
DStream[(K, V)]): JavaPairDStream[K, V] = {
 new JavaPairDStream[K, V](dstream)
--- End diff --

yeah, I will move it to another pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1241] Add sliding to RDD

2014-03-15 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/136#issuecomment-37748284
  
@pwendell I was referring not to the actual implementation, but expectation 
when using the exposed API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: fix compile error for hadoop CDH 4.4+

2014-03-15 Thread gzm55
Github user gzm55 commented on a diff in the pull request:

https://github.com/apache/spark/pull/151#discussion_r10637918
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandlerMacro.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.language.experimental.macros
+import scala.reflect.macros.Context
+
+private[yarn] object YarnAllocationHandlerMacro {
+  def getAMResp(resp: Any): Any = macro getAMRespImpl
+
+  /**
+   * From Hadoop CDH 4.4.0+ (2.1.0-beta),
+   * AMResponse is merged into AllocateResponse,
+   * so we don't need to call getAMResponse(), just use AllocateResponse 
directly.
+   * This macro will test the existence of AMResponse,
+   * and generate diffenert expressions.
+   *
+   * This macro now is only used in spark's alpha version of yarn api.
+   * It stays in the core project, for the two-stage compiling of
+   * the scala macro system.
+   */
+  def getAMRespImpl(c: Context)(resp: c.Expr[Any]) = {
+try {
+  import c.universe._
+  c.mirror.staticClass("org.apache.hadoop.yarn.api.records.AMResponse")
+  c.Expr[Any](Apply(Select(resp.tree, newTermName("getAMResponse")), 
List()))
+} catch {
+  case _: Throwable => resp
+}
--- End diff --

getAMRespImpl() macro is only called once when compiling 
allocateContainers() method at compile-time, so invocations of  
allocateContainers() will not do reflection at run-time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37748592
  
But that would be to debug yarn/hadoop api's primarily - and no easy way to 
inject spark specific logging levels. I am curious why this was required 
actually.

Currently, we have fairly fine grained control over logging from various 
packages/classes by redirecting logging output to stdout/stderr - which is 
actually quite heavily used (mute most of spark, enable user code; enable 
specific parts of spark for debug, etc) in user applications.

Having said that, @tgravescs did the initial logging integration in yarn, 
so will defer to him though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1252. On YARN, use container-log4j.prope...

2014-03-15 Thread mridulm
Github user mridulm commented on the pull request:

https://github.com/apache/spark/pull/148#issuecomment-37748627
  
To clarify, I am not saying we should not be configuring what is in 
container-log4j.properties - but we should be trying to do that while 
preserving the ability to configure (and override container-log4j.properties) 
via user specified logging config.
Since log4j appears to initialize only once per classloader, it is going to 
be interesting to make this happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1255: Allow user to pass Serializer obje...

2014-03-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/149#discussion_r10637947
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -43,12 +44,13 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends 
Dependency(rdd) {
  * Represents a dependency on the output of a shuffle stage.
  * @param rdd the parent RDD
  * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
+ * @param serializer [[Serializer]] to use. If set to null, the default 
serializer, as specified
+ *  by `spark.serializer` config option, will be used.
  */
 class ShuffleDependency[K, V](
 @transient rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
-val serializerClass: String = null)
+val serializer: Serializer = null)
--- End diff --

Now I am wondering why I read it as 58KB and not 58B ... apologies !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: fix compile error of streaming project

2014-03-15 Thread gzm55
GitHub user gzm55 opened a pull request:

https://github.com/apache/spark/pull/153

fix compile error of streaming project

explicit return type for implicit function

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gzm55/spark work/streaming-compile

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #153


commit 166ff10a8b3cf6d683e1e02ecda1106d2ba762f6
Author: James Z.M. Gao 
Date:   2014-03-06T07:28:18Z

fix compile error of streaming project

explicit return type for implicit function




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >