[VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Patrick Wendell
Please vote on releasing the following candidate as Apache Spark version 1.3.0!

The tag to be voted on is v1.3.0-rc1 (commit f97b0d4a):
https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f97b0d4a6b26504916816d7aefcf3132cd1da6c2

The release files, including signatures, digests, etc. can be found at:
http://people.apache.org/~pwendell/spark-1.3.0-rc1/

Release artifacts are signed with the following key:
https://people.apache.org/keys/committer/pwendell.asc

The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapachespark-1069/

The documentation corresponding to this release can be found at:
http://people.apache.org/~pwendell/spark-1.3.0-rc1-docs/

Please vote on releasing this package as Apache Spark 1.3.0!

The vote is open until Saturday, February 21, at 08:03 UTC and passes
if a majority of at least 3 +1 PMC votes are cast.

[ ] +1 Release this package as Apache Spark 1.3.0
[ ] -1 Do not release this package because ...

To learn more about Apache Spark, please see
http://spark.apache.org/

== How can I help test this release? ==
If you are a Spark user, you can help us test this release by
taking a Spark 1.2 workload and running on this release candidate,
then reporting any regressions.

== What justifies a -1 vote for this release? ==
This vote is happening towards the end of the 1.3 QA period,
so -1 votes should only occur for significant regressions from 1.2.1.
Bugs already present in 1.2.X, minor regressions, or bugs related
to new features will not block this release.

- Patrick

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Merging code into branch 1.3

2015-02-18 Thread Patrick Wendell
Hey Committers,

Now that Spark 1.3 rc1 is cut, please restrict branch-1.3 merges to
the following:

1. Fixes for issues blocking the 1.3 release (i.e. 1.2.X regressions)
2. Documentation and tests.
3. Fixes for non-blocker issues that are surgical, low-risk, and/or
outside of the core.

If there is a lower priority bug fix (a non-blocker) that requires
nontrivial code changes, do not merge it into 1.3. If something seems
borderline, feel free to reach out to me and we can work through it
together.

This is what we've done for the last few releases to make sure rc's
become progressively more stable, and it is important towards helping
us cut timely releases.

Thanks!

- Patrick

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Replacing Jetty with TomCat

2015-02-18 Thread Sean Owen
I do not think it makes sense to make the web server configurable.
Mostly because there's no real problem in running an HTTP service
internally based on Netty while you run your own HTTP service based on
something else like Tomcat. What's the problem?

On Wed, Feb 18, 2015 at 3:14 AM, Niranda Perera
 wrote:
> Hi Sean,
> The main issue we have is, running two web servers in a single product. we
> think it would not be an elegant solution.
>
> Could you please point me to the main areas where jetty server is tightly
> coupled or extension points where I could plug tomcat instead of jetty?
> If successful I could contribute it to the spark project. :-)
>
> cheers
>
>
>
> On Mon, Feb 16, 2015 at 4:51 PM, Sean Owen  wrote:
>>
>> There's no particular reason you have to remove the embedded Jetty
>> server, right? it doesn't prevent you from using it inside another app
>> that happens to run in Tomcat. You won't be able to switch it out
>> without rewriting a fair bit of code, no, but you don't need to.
>>
>> On Mon, Feb 16, 2015 at 5:08 AM, Niranda Perera
>>  wrote:
>> > Hi,
>> >
>> > We are thinking of integrating Spark server inside a product. Our
>> > current
>> > product uses Tomcat as its webserver.
>> >
>> > Is it possible to switch the Jetty webserver in Spark to Tomcat
>> > off-the-shelf?
>> >
>> > Cheers
>> >
>> > --
>> > Niranda
>
>
>
>
> --
> Niranda

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Sean Owen
The serializer is created with

val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)

Which is definitely not the closure serializer and so should respect
what you are setting with spark.serializer.

Maybe you can do a quick bit of debugging to see where that assumption
breaks down? like are you sure spark.serializer is set everywhere?

On Wed, Feb 18, 2015 at 4:31 AM, Matt Cheah  wrote:
> Hi everyone,
>
> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD’s aggregate() instead and not
> use a key.
>
> I have set spark.serializer to use Kryo. As a result, JavaRDD’s combineByKey
> requires that a “createCombiner” function is provided, and the return value
> from that function must be serializable using Kryo. When I switched to using
> rdd.aggregate I assumed that the zero value would also be strictly Kryo
> serialized, as it is a data item and not part of a closure or the
> aggregation functions. However, I got a serialization exception as the
> closure serializer (only valid serializer is the Java serializer) was used
> instead.
>
> I was wondering the following:
>
> What is the rationale for making the zero value be serialized using the
> closure serializer? This isn’t part of the closure, but is an initial data
> item.
> Would it make sense for us to perhaps write a version of rdd.aggregate()
> that takes a function as a parameter, that generates the zero value? This
> would be more intuitive to be serialized using the closure serializer.
>
> I believe aggregateByKey is also affected.
>
> Thanks,
>
> -Matt Cheah

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Sean Owen
On OS X and Ubuntu I see the following test failure in the source
release for 1.3.0-RC1:

UISeleniumSuite:
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: org/w3c/dom/ElementTraversal
...


Patrick this link gives a 404:
https://people.apache.org/keys/committer/pwendell.asc


Finally, I already realized I failed to get the fix for
https://issues.apache.org/jira/browse/SPARK-5669 correct, and that has
to be correct for the release. I'll patch that up straight away,
sorry. I believe the result of the intended fix is still as I
described in SPARK-5669, so there is no bad news there. A local test
seems to confirm it and I'm waiting on Jenkins. If it's all good I'll
merge that fix. So, that much will need a new release, I apologize.


Please keep testing anyway!


Otherwise, I verified the signatures are correct, licenses are
correct, compiles on OS X and Ubuntu 14.


On Wed, Feb 18, 2015 at 8:12 AM, Patrick Wendell  wrote:
> Please vote on releasing the following candidate as Apache Spark version 
> 1.3.0!
>
> The tag to be voted on is v1.3.0-rc1 (commit f97b0d4a):
> https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f97b0d4a6b26504916816d7aefcf3132cd1da6c2
>
> The release files, including signatures, digests, etc. can be found at:
> http://people.apache.org/~pwendell/spark-1.3.0-rc1/
>
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1069/
>
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-1.3.0-rc1-docs/
>
> Please vote on releasing this package as Apache Spark 1.3.0!
>
> The vote is open until Saturday, February 21, at 08:03 UTC and passes
> if a majority of at least 3 +1 PMC votes are cast.
>
> [ ] +1 Release this package as Apache Spark 1.3.0
> [ ] -1 Do not release this package because ...
>
> To learn more about Apache Spark, please see
> http://spark.apache.org/
>
> == How can I help test this release? ==
> If you are a Spark user, you can help us test this release by
> taking a Spark 1.2 workload and running on this release candidate,
> then reporting any regressions.
>
> == What justifies a -1 vote for this release? ==
> This vote is happening towards the end of the 1.3 QA period,
> so -1 votes should only occur for significant regressions from 1.2.1.
> Bugs already present in 1.2.X, minor regressions, or bugs related
> to new features will not block this release.
>
> - Patrick
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Josh Rosen
It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743 /
https://github.com/apache/spark/pull/3605.  Can you see whether that patch
fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah  wrote:

> Hi everyone,
>
> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD’s aggregate() instead and not
> use a key.
>
> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
> combineByKey requires that a “createCombiner” function is provided, and the
> return value from that function must be serializable using Kryo. When I
> switched to using rdd.aggregate I assumed that the zero value would also be
> strictly Kryo serialized, as it is a data item and not part of a closure or
> the aggregation functions. However, I got a serialization exception as the
> closure serializer (only valid serializer is the Java serializer) was used
> instead.
>
> I was wondering the following:
>
>1. What is the rationale for making the zero value be serialized using
>the closure serializer? This isn’t part of the closure, but is an initial
>data item.
>2. Would it make sense for us to perhaps write a version of
>rdd.aggregate() that takes a function as a parameter, that generates the
>zero value? This would be more intuitive to be serialized using the closure
>serializer.
>
> I believe aggregateByKey is also affected.
>
> Thanks,
>
> -Matt Cheah
>


Streaming partitions to driver for use in .toLocalIterator

2015-02-18 Thread Andrew Ash
Hi Spark devs,

I'm creating a streaming export functionality for RDDs and am having some
trouble with large partitions.  The RDD.toLocalIterator() call pulls over a
partition at a time to the driver, and then streams the RDD out from that
partition before pulling in the next one.  When you have large partitions
though, you can OOM the driver, especially when multiple of these exports
are happening in the same SparkContext.

One idea I had was to repartition the RDD so partitions are smaller, but
it's hard to know a priori what the partition count should be, and I'd like
to avoid paying the shuffle cost if possible -- I think repartition to a
higher partition count forces a shuffle.

Is it feasible to rework this so the executor -> driver transfer in
.toLocalIterator is a steady stream rather than a partition at a time?

Thanks!
Andrew


Issue SPARK-5008 (persistent-hdfs broken)

2015-02-18 Thread Joe Wass
I've recently run into problems caused by ticket SPARK-5008

https://issues.apache.org/jira/browse/SPARK-5008

This seems like quite a serious regression in 1.2.0, meaning that it's not
really possible to use persistent-hdfs. The config for the persistent-hdfs
points to the wrong part of the filesystem, so it comes up on the wrong
volume (and therefore has the wrong capacity). I'm working around it with
symlinks, but it's not ideal.

It doesn't look like it's scheduled to be fixed in any particular release.
Is there any indication of whether this is on anyone's todo list?

If no-one's looking into it then I could try having a look myself, but I'm
not (yet) familiar with the internals. From the discussion on the ticket it
doesn't look like a huge fix.

Cheers

Joe


Re: [VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Patrick Wendell
> UISeleniumSuite:
> *** RUN ABORTED ***
>   java.lang.NoClassDefFoundError: org/w3c/dom/ElementTraversal
> ...

This is a newer test suite. There is something flaky about it, we
should definitely fix it, IMO it's not a blocker though.

>
> Patrick this link gives a 404:
> https://people.apache.org/keys/committer/pwendell.asc

Works for me. Maybe it's some ephemeral issue?

> Finally, I already realized I failed to get the fix for
> https://issues.apache.org/jira/browse/SPARK-5669 correct, and that has
> to be correct for the release. I'll patch that up straight away,
> sorry. I believe the result of the intended fix is still as I
> described in SPARK-5669, so there is no bad news there. A local test
> seems to confirm it and I'm waiting on Jenkins. If it's all good I'll
> merge that fix. So, that much will need a new release, I apologize.

Thanks for finding this. I'm going to leave this open for continued testing...

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



quick jenkins restart tomorrow morning, ~7am PST

2015-02-18 Thread shane knapp
i'll be kicking jenkins to up the open file limits on the workers.  it
should be a very short downtime, and i'll post updates on my progress
tomorrow.

shane


Re: Streaming partitions to driver for use in .toLocalIterator

2015-02-18 Thread Imran Rashid
This would be pretty tricky to do -- the issue is that right now
sparkContext.runJob has you pass in a function from a partition to *one*
result object that gets serialized and sent back: Iterator[T] => U, and
that idea is baked pretty deep into a lot of the internals, DAGScheduler,
Task, Executors, etc.

Maybe another possibility worth considering: should we make it easy to go
from N partitions to 2N partitions (or any other multiple obviously)
without requiring a shuffle?  for that matter, you should also be able to
go from 2N to N without a shuffle as well.  That change is also somewhat
involved, though.

Both are in theory possible, but I imagine they'd need really compelling
use cases.

An alternative would be to write your RDD to some other data store (eg,
hdfs) which has better support for reading data in a streaming fashion,
though you would probably be unhappy with the overhead.



On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash  wrote:

> Hi Spark devs,
>
> I'm creating a streaming export functionality for RDDs and am having some
> trouble with large partitions.  The RDD.toLocalIterator() call pulls over a
> partition at a time to the driver, and then streams the RDD out from that
> partition before pulling in the next one.  When you have large partitions
> though, you can OOM the driver, especially when multiple of these exports
> are happening in the same SparkContext.
>
> One idea I had was to repartition the RDD so partitions are smaller, but
> it's hard to know a priori what the partition count should be, and I'd like
> to avoid paying the shuffle cost if possible -- I think repartition to a
> higher partition count forces a shuffle.
>
> Is it feasible to rework this so the executor -> driver transfer in
> .toLocalIterator is a steady stream rather than a partition at a time?
>
> Thanks!
> Andrew
>


Re: quick jenkins restart tomorrow morning, ~7am PST

2015-02-18 Thread shane knapp
i'm actually going to do this now -- it's really quiet today.

there are two spark pull request builds running, which i will kill and
retrigger once jenkins is back up:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27689/
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27690/

On Wed, Feb 18, 2015 at 12:55 PM, shane knapp  wrote:

> i'll be kicking jenkins to up the open file limits on the workers.  it
> should be a very short downtime, and i'll post updates on my progress
> tomorrow.
>
> shane
>


Re: quick jenkins restart tomorrow morning, ~7am PST

2015-02-18 Thread shane knapp
this is done.

On Wed, Feb 18, 2015 at 2:00 PM, shane knapp  wrote:

> i'm actually going to do this now -- it's really quiet today.
>
> there are two spark pull request builds running, which i will kill and
> retrigger once jenkins is back up:
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27689/
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27690/
>
> On Wed, Feb 18, 2015 at 12:55 PM, shane knapp  wrote:
>
>> i'll be kicking jenkins to up the open file limits on the workers.  it
>> should be a very short downtime, and i'll post updates on my progress
>> tomorrow.
>>
>> shane
>>
>
>


Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Matt Cheah
But RDD.aggregate() has this code:

// Clone the zero value since we will also be serializing it as part of
tasks
var jobResult = Utils.clone(zeroValue,
sc.env.closureSerializer.newInstance())

I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
we just missed it and need to apply the change to aggregate()? It seems
appropriate to target a fix for 1.3.0.

-Matt Cheah
From:  Josh Rosen 
Date:  Wednesday, February 18, 2015 at 6:12 AM
To:  Matt Cheah 
Cc:  "dev@spark.apache.org" , Mingyu Kim
, Andrew Ash 
Subject:  Re: JavaRDD Aggregate initial value - Closure-serialized zero
value reasoning?

It looks like this was fixed in
https://issues.apache.org/jira/browse/SPARK-4743
  /
https://github.com/apache/spark/pull/3605
 .  Can you see
whether that patch fixes this issue for you?



On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah  wrote:
> Hi everyone,
> 
> I was using JavaPairRDD¹s combineByKey() to compute all of my aggregations
> before, since I assumed that every aggregation required a key. However, I
> realized I could do my analysis using JavaRDD¹s aggregate() instead and not
> use a key.
> 
> I have set spark.serializer to use Kryo. As a result, JavaRDD¹s combineByKey
> requires that a ³createCombiner² function is provided, and the return value
> from that function must be serializable using Kryo. When I switched to using
> rdd.aggregate I assumed that the zero value would also be strictly Kryo
> serialized, as it is a data item and not part of a closure or the aggregation
> functions. However, I got a serialization exception as the closure serializer
> (only valid serializer is the Java serializer) was used instead.
> 
> I was wondering the following:
> 1. What is the rationale for making the zero value be serialized using the
> closure serializer? This isn¹t part of the closure, but is an initial data
> item.
> 2. Would it make sense for us to perhaps write a version of rdd.aggregate()
> that takes a function as a parameter, that generates the zero value? This
> would be more intuitive to be serialized using the closure serializer.
> I believe aggregateByKey is also affected.
> 
> Thanks,
> 
> -Matt Cheah





smime.p7s
Description: S/MIME cryptographic signature


Re: [ml] Lost persistence for fold in crossvalidation.

2015-02-18 Thread Joseph Bradley
Now in JIRA form: https://issues.apache.org/jira/browse/SPARK-5844

On Tue, Feb 17, 2015 at 3:12 PM, Xiangrui Meng  wrote:

> There are three different regParams defined in the grid and there are
> tree folds. For simplicity, we didn't split the dataset into three and
> reuse them, but do the split for each fold. Then we need to cache 3*3
> times. Note that the pipeline API is not yet optimized for
> performance. It would be nice to optimize its perforamnce in 1.4.
> -Xiangrui
>
> On Wed, Feb 11, 2015 at 11:13 AM, Peter Rudenko 
> wrote:
> > Hi i have a problem. Using spark 1.2 with Pipeline + GridSearch +
> > LogisticRegression. I’ve reimplemented LogisticRegression.fit method and
> > comment out instances.unpersist()
> >
> > |override  def  fit(dataset:SchemaRDD,
> > paramMap:ParamMap):LogisticRegressionModel  = {
> > println(s"Fitting dataset ${dataset.take(1000).toSeq.hashCode()} with
> > ParamMap $paramMap.")
> > transformSchema(dataset.schema, paramMap, logging =true)
> > import  dataset.sqlContext._
> > val  map  =  this.paramMap ++ paramMap
> > val  instances  =  dataset.select(map(labelCol).attr,
> > map(featuresCol).attr)
> >   .map {
> > case  Row(label:Double, features:Vector) =>
> >   LabeledPoint(label, features)
> >   }
> >
> > if  (instances.getStorageLevel ==StorageLevel.NONE) {
> >   println("Instances not persisted")
> >   instances.persist(StorageLevel.MEMORY_AND_DISK)
> > }
> >
> >  val  lr  =  (new  LogisticRegressionWithLBFGS)
> >   .setValidateData(false)
> >   .setIntercept(true)
> > lr.optimizer
> >   .setRegParam(map(regParam))
> >   .setNumIterations(map(maxIter))
> > val  lrm  =  new  LogisticRegressionModel(this, map,
> > lr.run(instances).weights)
> > //instances.unpersist()
> > // copy model params
> > Params.inheritValues(map,this, lrm)
> > lrm
> >   }
> > |
> >
> > CrossValidator feeds the same SchemaRDD for each parameter (same hash
> code),
> > but somewhere cache being flushed. The memory is enough. Here’s the
> output:
> >
> > |Fitting dataset 2051470010 with ParamMap {
> > DRLogisticRegression-f35ae4d3-regParam: 0.1
> > }.
> > Instances not persisted
> > Fitting dataset 2051470010 with ParamMap {
> > DRLogisticRegression-f35ae4d3-regParam: 0.01
> > }.
> > Instances not persisted
> > Fitting dataset 2051470010 with ParamMap {
> > DRLogisticRegression-f35ae4d3-regParam: 0.001
> > }.
> > Instances not persisted
> > Fitting dataset 802615223 with ParamMap {
> > DRLogisticRegression-f35ae4d3-regParam: 0.1
> > }.
> > Instances not persisted
> > Fitting dataset 802615223 with ParamMap {
> > DRLogisticRegression-f35ae4d3-regParam: 0.01
> > }.
> > Instances not persisted
> > |
> >
> > I have 3 parameters in GridSearch and 3 folds for CrossValidation:
> >
> > |
> > val  paramGrid  =  new  ParamGridBuilder()
> >   .addGrid(model.regParam,Array(0.1,0.01,0.001))
> >   .build()
> >
> > crossval.setEstimatorParamMaps(paramGrid)
> > crossval.setNumFolds(3)
> > |
> >
> > I assume that the data should be read and cached 3 times (1 to
> > numFolds).combinations(2) and be independent from number of parameters.
> But
> > i have 9 times data being read and cached.
> >
> > Thanks,
> > Peter Rudenko
> >
> >
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Sean Owen
That looks, at the least, inconsistent. As far as I know this should
be changed so that the zero value is always cloned via the non-closure
serializer. Any objection to that?

On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah  wrote:
> But RDD.aggregate() has this code:
>
> // Clone the zero value since we will also be serializing it as part of
> tasks
> var jobResult = Utils.clone(zeroValue,
> sc.env.closureSerializer.newInstance())
>
> I do see the SparkEnv.get.serializer used in aggregateByKey however. Perhaps
> we just missed it and need to apply the change to aggregate()? It seems
> appropriate to target a fix for 1.3.0.
>
> -Matt Cheah
> From: Josh Rosen 
> Date: Wednesday, February 18, 2015 at 6:12 AM
> To: Matt Cheah 
> Cc: "dev@spark.apache.org" , Mingyu Kim
> , Andrew Ash 
> Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero value
> reasoning?
>
> It looks like this was fixed in
> https://issues.apache.org/jira/browse/SPARK-4743 /
> https://github.com/apache/spark/pull/3605.  Can you see whether that patch
> fixes this issue for you?
>
>
>
> On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah  wrote:
>>
>> Hi everyone,
>>
>> I was using JavaPairRDD’s combineByKey() to compute all of my aggregations
>> before, since I assumed that every aggregation required a key. However, I
>> realized I could do my analysis using JavaRDD’s aggregate() instead and not
>> use a key.
>>
>> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
>> combineByKey requires that a “createCombiner” function is provided, and the
>> return value from that function must be serializable using Kryo. When I
>> switched to using rdd.aggregate I assumed that the zero value would also be
>> strictly Kryo serialized, as it is a data item and not part of a closure or
>> the aggregation functions. However, I got a serialization exception as the
>> closure serializer (only valid serializer is the Java serializer) was used
>> instead.
>>
>> I was wondering the following:
>>
>> What is the rationale for making the zero value be serialized using the
>> closure serializer? This isn’t part of the closure, but is an initial data
>> item.
>> Would it make sense for us to perhaps write a version of rdd.aggregate()
>> that takes a function as a parameter, that generates the zero value? This
>> would be more intuitive to be serialized using the closure serializer.
>>
>> I believe aggregateByKey is also affected.
>>
>> Thanks,
>>
>> -Matt Cheah
>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: JavaRDD Aggregate initial value - Closure-serialized zero value reasoning?

2015-02-18 Thread Reynold Xin
Yes, that's a bug and should be using the standard serializer.

On Wed, Feb 18, 2015 at 2:58 PM, Sean Owen  wrote:

> That looks, at the least, inconsistent. As far as I know this should
> be changed so that the zero value is always cloned via the non-closure
> serializer. Any objection to that?
>
> On Wed, Feb 18, 2015 at 10:28 PM, Matt Cheah  wrote:
> > But RDD.aggregate() has this code:
> >
> > // Clone the zero value since we will also be serializing it as part
> of
> > tasks
> > var jobResult = Utils.clone(zeroValue,
> > sc.env.closureSerializer.newInstance())
> >
> > I do see the SparkEnv.get.serializer used in aggregateByKey however.
> Perhaps
> > we just missed it and need to apply the change to aggregate()? It seems
> > appropriate to target a fix for 1.3.0.
> >
> > -Matt Cheah
> > From: Josh Rosen 
> > Date: Wednesday, February 18, 2015 at 6:12 AM
> > To: Matt Cheah 
> > Cc: "dev@spark.apache.org" , Mingyu Kim
> > , Andrew Ash 
> > Subject: Re: JavaRDD Aggregate initial value - Closure-serialized zero
> value
> > reasoning?
> >
> > It looks like this was fixed in
> > https://issues.apache.org/jira/browse/SPARK-4743 /
> > https://github.com/apache/spark/pull/3605.  Can you see whether that
> patch
> > fixes this issue for you?
> >
> >
> >
> > On Tue, Feb 17, 2015 at 8:31 PM, Matt Cheah  wrote:
> >>
> >> Hi everyone,
> >>
> >> I was using JavaPairRDD’s combineByKey() to compute all of my
> aggregations
> >> before, since I assumed that every aggregation required a key. However,
> I
> >> realized I could do my analysis using JavaRDD’s aggregate() instead and
> not
> >> use a key.
> >>
> >> I have set spark.serializer to use Kryo. As a result, JavaRDD’s
> >> combineByKey requires that a “createCombiner” function is provided, and
> the
> >> return value from that function must be serializable using Kryo. When I
> >> switched to using rdd.aggregate I assumed that the zero value would
> also be
> >> strictly Kryo serialized, as it is a data item and not part of a
> closure or
> >> the aggregation functions. However, I got a serialization exception as
> the
> >> closure serializer (only valid serializer is the Java serializer) was
> used
> >> instead.
> >>
> >> I was wondering the following:
> >>
> >> What is the rationale for making the zero value be serialized using the
> >> closure serializer? This isn’t part of the closure, but is an initial
> data
> >> item.
> >> Would it make sense for us to perhaps write a version of rdd.aggregate()
> >> that takes a function as a parameter, that generates the zero value?
> This
> >> would be more intuitive to be serialized using the closure serializer.
> >>
> >> I believe aggregateByKey is also affected.
> >>
> >> Thanks,
> >>
> >> -Matt Cheah
> >
> >
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: Batch prediciton for ALS

2015-02-18 Thread Xiangrui Meng
Please create a JIRA for it and we should discuss the APIs first
before updating the code. -Xiangrui

On Tue, Feb 17, 2015 at 4:10 PM, Debasish Das  wrote:
> It will be really help us if we merge it but I guess it is already diverged
> from the new ALS...I will also take a look at it again and try update with
> the new ALS...
>
> On Tue, Feb 17, 2015 at 3:22 PM, Xiangrui Meng  wrote:
>>
>> It may be too late to merge it into 1.3. I'm going to make another
>> pass on your PR today. -Xiangrui
>>
>> On Tue, Feb 10, 2015 at 8:01 AM, Debasish Das 
>> wrote:
>> > Hi,
>> >
>> > Will it be possible to merge this PR to 1.3 ?
>> >
>> > https://github.com/apache/spark/pull/3098
>> >
>> > The batch prediction API in ALS will be useful for us who want to cross
>> > validate on prec@k and MAP...
>> >
>> > Thanks.
>> > Deb
>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Batch prediciton for ALS

2015-02-18 Thread Debasish Das
You have a JIRA for it...

https://issues.apache.org/jira/browse/SPARK-3066

I added the PR on the JIRA...

On Wed, Feb 18, 2015 at 3:07 PM, Xiangrui Meng  wrote:

> Please create a JIRA for it and we should discuss the APIs first
> before updating the code. -Xiangrui
>
> On Tue, Feb 17, 2015 at 4:10 PM, Debasish Das 
> wrote:
> > It will be really help us if we merge it but I guess it is already
> diverged
> > from the new ALS...I will also take a look at it again and try update
> with
> > the new ALS...
> >
> > On Tue, Feb 17, 2015 at 3:22 PM, Xiangrui Meng  wrote:
> >>
> >> It may be too late to merge it into 1.3. I'm going to make another
> >> pass on your PR today. -Xiangrui
> >>
> >> On Tue, Feb 10, 2015 at 8:01 AM, Debasish Das  >
> >> wrote:
> >> > Hi,
> >> >
> >> > Will it be possible to merge this PR to 1.3 ?
> >> >
> >> > https://github.com/apache/spark/pull/3098
> >> >
> >> > The batch prediction API in ALS will be useful for us who want to
> cross
> >> > validate on prec@k and MAP...
> >> >
> >> > Thanks.
> >> > Deb
> >
> >
>


Re: Streaming partitions to driver for use in .toLocalIterator

2015-02-18 Thread Mingyu Kim
Another alternative would be to compress the partition in memory in a
streaming fashion instead of calling .toArray on the iterator. Would it be
an easier mitigation to the problem? Or, is it hard to compress the rows
one by one without materializing the full partition in memory using the
compression algo Spark uses currently?

Mingyu





On 2/18/15, 1:01 PM, "Imran Rashid"  wrote:

>This would be pretty tricky to do -- the issue is that right now
>sparkContext.runJob has you pass in a function from a partition to *one*
>result object that gets serialized and sent back: Iterator[T] => U, and
>that idea is baked pretty deep into a lot of the internals, DAGScheduler,
>Task, Executors, etc.
>
>Maybe another possibility worth considering: should we make it easy to go
>from N partitions to 2N partitions (or any other multiple obviously)
>without requiring a shuffle?  for that matter, you should also be able to
>go from 2N to N without a shuffle as well.  That change is also somewhat
>involved, though.
>
>Both are in theory possible, but I imagine they'd need really compelling
>use cases.
>
>An alternative would be to write your RDD to some other data store (eg,
>hdfs) which has better support for reading data in a streaming fashion,
>though you would probably be unhappy with the overhead.
>
>
>
>On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash  wrote:
>
>> Hi Spark devs,
>>
>> I'm creating a streaming export functionality for RDDs and am having
>>some
>> trouble with large partitions.  The RDD.toLocalIterator() call pulls
>>over a
>> partition at a time to the driver, and then streams the RDD out from
>>that
>> partition before pulling in the next one.  When you have large
>>partitions
>> though, you can OOM the driver, especially when multiple of these
>>exports
>> are happening in the same SparkContext.
>>
>> One idea I had was to repartition the RDD so partitions are smaller, but
>> it's hard to know a priori what the partition count should be, and I'd
>>like
>> to avoid paying the shuffle cost if possible -- I think repartition to a
>> higher partition count forces a shuffle.
>>
>> Is it feasible to rework this so the executor -> driver transfer in
>> .toLocalIterator is a steady stream rather than a partition at a time?
>>
>> Thanks!
>> Andrew
>>


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Sean Owen
On Wed, Feb 18, 2015 at 6:13 PM, Patrick Wendell  wrote:
>> Patrick this link gives a 404:
>> https://people.apache.org/keys/committer/pwendell.asc
>
> Works for me. Maybe it's some ephemeral issue?

Yes works now; I swear it didn't before! that's all set now. The
signing key is in that file.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



[Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
Hi everyone,

Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
consistently has a slower execution time on the later release. I was
wondering if anyone else has had similar observations.

I have two setups where this reproduces. The first is a local test. I
launched a spark cluster with 4 worker JVMs on my Mac, and launched a
Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on
it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8
files, which ends up having 128 partitions, and a total of 8000 rows.
The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all
numbers being in seconds:
1 items

Spark 1.0.2: 0.069281, 0.012261, 0.011083

Spark 1.1.1: 0.11577, 0.097636, 0.11321



4 items

Spark 1.0.2: 0.023751, 0.069365, 0.023603

Spark 1.1.1: 0.224287, 0.229651, 0.158431



10 items

Spark 1.0.2: 0.047019, 0.049056, 0.042568

Spark 1.1.1: 0.353277, 0.288965, 0.281751



40 items

Spark 1.0.2: 0.216048, 0.198049, 0.796037

Spark 1.1.1: 1.865622, 2.224424, 2.037672

This small test suite indicates a consistently reproducible performance
regression.



I also notice this on a larger scale test. The cluster used is on EC2:

ec2 instance type: m2.4xlarge
10 slaves, 1 master
ephemeral storage
70 cores, 50 GB/box
In this case, I have a 100GB dataset split into 78 files totally 350 million
items, and I take the first 50,000 items from the RDD. In this case, I have
tested this on different formats of the raw data.

With plaintext files:

Spark 1.0.2: 0.422s, 0.363s, 0.382s

Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s



With snappy-compressed Avro files:

Spark 1.0.2: 0.73s, 0.395s, 0.426s

Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s

Again demonstrating a reproducible performance regression.

I was wondering if anyone else observed this regression, and if so, if
anyone would have any idea what could possibly have caused it between Spark
1.0.2 and Spark 1.1.1?

Thanks,

-Matt Cheah




smime.p7s
Description: S/MIME cryptographic signature


Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Patrick Wendell
I believe the heuristic governing the way that take() decides to fetch
partitions changed between these versions. It could be that in certain
cases the new heuristic is worse, but it might be good to just look at
the source code and see, for your number of elements taken and number
of partitions, if there was any effective change in how aggressively
spark fetched partitions.

This was quite a while ago, but I think the change was made because in
many cases the newer code works more efficiently.

- Patrick

On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah  wrote:
> Hi everyone,
>
> Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
> consistently has a slower execution time on the later release. I was
> wondering if anyone else has had similar observations.
>
> I have two setups where this reproduces. The first is a local test. I
> launched a spark cluster with 4 worker JVMs on my Mac, and launched a
> Spark-Shell. I retrieved the text file and immediately called rdd.take(N) on
> it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over 8
> files, which ends up having 128 partitions, and a total of 8000 rows.
> The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with all
> numbers being in seconds:
>
> 1 items
>
> Spark 1.0.2: 0.069281, 0.012261, 0.011083
>
> Spark 1.1.1: 0.11577, 0.097636, 0.11321
>
>
> 4 items
>
> Spark 1.0.2: 0.023751, 0.069365, 0.023603
>
> Spark 1.1.1: 0.224287, 0.229651, 0.158431
>
>
> 10 items
>
> Spark 1.0.2: 0.047019, 0.049056, 0.042568
>
> Spark 1.1.1: 0.353277, 0.288965, 0.281751
>
>
> 40 items
>
> Spark 1.0.2: 0.216048, 0.198049, 0.796037
>
> Spark 1.1.1: 1.865622, 2.224424, 2.037672
>
> This small test suite indicates a consistently reproducible performance
> regression.
>
>
> I also notice this on a larger scale test. The cluster used is on EC2:
>
> ec2 instance type: m2.4xlarge
> 10 slaves, 1 master
> ephemeral storage
> 70 cores, 50 GB/box
>
> In this case, I have a 100GB dataset split into 78 files totally 350 million
> items, and I take the first 50,000 items from the RDD. In this case, I have
> tested this on different formats of the raw data.
>
> With plaintext files:
>
> Spark 1.0.2: 0.422s, 0.363s, 0.382s
>
> Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
>
>
> With snappy-compressed Avro files:
>
> Spark 1.0.2: 0.73s, 0.395s, 0.426s
>
> Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
>
> Again demonstrating a reproducible performance regression.
>
> I was wondering if anyone else observed this regression, and if so, if
> anyone would have any idea what could possibly have caused it between Spark
> 1.0.2 and Spark 1.1.1?
>
> Thanks,
>
> -Matt Cheah

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
I actually tested Spark 1.2.0 with the code in the rdd.take() method
swapped out for what was in Spark 1.0.2. The run time was still slower,
which indicates to me something at work lower in the stack.

-Matt Cheah

On 2/18/15, 4:54 PM, "Patrick Wendell"  wrote:

>I believe the heuristic governing the way that take() decides to fetch
>partitions changed between these versions. It could be that in certain
>cases the new heuristic is worse, but it might be good to just look at
>the source code and see, for your number of elements taken and number
>of partitions, if there was any effective change in how aggressively
>spark fetched partitions.
>
>This was quite a while ago, but I think the change was made because in
>many cases the newer code works more efficiently.
>
>- Patrick
>
>On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah  wrote:
>> Hi everyone,
>>
>> Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
>> consistently has a slower execution time on the later release. I was
>> wondering if anyone else has had similar observations.
>>
>> I have two setups where this reproduces. The first is a local test. I
>> launched a spark cluster with 4 worker JVMs on my Mac, and launched a
>> Spark-Shell. I retrieved the text file and immediately called
>>rdd.take(N) on
>> it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
>>8
>> files, which ends up having 128 partitions, and a total of 8000
>>rows.
>> The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
>>all
>> numbers being in seconds:
>>
>> 1 items
>>
>> Spark 1.0.2: 0.069281, 0.012261, 0.011083
>>
>> Spark 1.1.1: 0.11577, 0.097636, 0.11321
>>
>>
>> 4 items
>>
>> Spark 1.0.2: 0.023751, 0.069365, 0.023603
>>
>> Spark 1.1.1: 0.224287, 0.229651, 0.158431
>>
>>
>> 10 items
>>
>> Spark 1.0.2: 0.047019, 0.049056, 0.042568
>>
>> Spark 1.1.1: 0.353277, 0.288965, 0.281751
>>
>>
>> 40 items
>>
>> Spark 1.0.2: 0.216048, 0.198049, 0.796037
>>
>> Spark 1.1.1: 1.865622, 2.224424, 2.037672
>>
>> This small test suite indicates a consistently reproducible performance
>> regression.
>>
>>
>> I also notice this on a larger scale test. The cluster used is on EC2:
>>
>> ec2 instance type: m2.4xlarge
>> 10 slaves, 1 master
>> ephemeral storage
>> 70 cores, 50 GB/box
>>
>> In this case, I have a 100GB dataset split into 78 files totally 350
>>million
>> items, and I take the first 50,000 items from the RDD. In this case, I
>>have
>> tested this on different formats of the raw data.
>>
>> With plaintext files:
>>
>> Spark 1.0.2: 0.422s, 0.363s, 0.382s
>>
>> Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
>>
>>
>> With snappy-compressed Avro files:
>>
>> Spark 1.0.2: 0.73s, 0.395s, 0.426s
>>
>> Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
>>
>> Again demonstrating a reproducible performance regression.
>>
>> I was wondering if anyone else observed this regression, and if so, if
>> anyone would have any idea what could possibly have caused it between
>>Spark
>> 1.0.2 and Spark 1.1.1?
>>
>> Thanks,
>>
>> -Matt Cheah


smime.p7s
Description: S/MIME cryptographic signature


Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Aaron Davidson
You might be seeing the result of this patch:

https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797

which was introduced in 1.1.1. This patch disabled the ability for take()
to run without launching a Spark job, which means that the latency is
significantly increased for small jobs (but not for large ones). You can
try enabling local execution and seeing if your problem goes away.

On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah  wrote:

> I actually tested Spark 1.2.0 with the code in the rdd.take() method
> swapped out for what was in Spark 1.0.2. The run time was still slower,
> which indicates to me something at work lower in the stack.
>
> -Matt Cheah
>
> On 2/18/15, 4:54 PM, "Patrick Wendell"  wrote:
>
> >I believe the heuristic governing the way that take() decides to fetch
> >partitions changed between these versions. It could be that in certain
> >cases the new heuristic is worse, but it might be good to just look at
> >the source code and see, for your number of elements taken and number
> >of partitions, if there was any effective change in how aggressively
> >spark fetched partitions.
> >
> >This was quite a while ago, but I think the change was made because in
> >many cases the newer code works more efficiently.
> >
> >- Patrick
> >
> >On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah  wrote:
> >> Hi everyone,
> >>
> >> Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
> >> consistently has a slower execution time on the later release. I was
> >> wondering if anyone else has had similar observations.
> >>
> >> I have two setups where this reproduces. The first is a local test. I
> >> launched a spark cluster with 4 worker JVMs on my Mac, and launched a
> >> Spark-Shell. I retrieved the text file and immediately called
> >>rdd.take(N) on
> >> it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
> >>8
> >> files, which ends up having 128 partitions, and a total of 8000
> >>rows.
> >> The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
> >>all
> >> numbers being in seconds:
> >>
> >> 1 items
> >>
> >> Spark 1.0.2: 0.069281, 0.012261, 0.011083
> >>
> >> Spark 1.1.1: 0.11577, 0.097636, 0.11321
> >>
> >>
> >> 4 items
> >>
> >> Spark 1.0.2: 0.023751, 0.069365, 0.023603
> >>
> >> Spark 1.1.1: 0.224287, 0.229651, 0.158431
> >>
> >>
> >> 10 items
> >>
> >> Spark 1.0.2: 0.047019, 0.049056, 0.042568
> >>
> >> Spark 1.1.1: 0.353277, 0.288965, 0.281751
> >>
> >>
> >> 40 items
> >>
> >> Spark 1.0.2: 0.216048, 0.198049, 0.796037
> >>
> >> Spark 1.1.1: 1.865622, 2.224424, 2.037672
> >>
> >> This small test suite indicates a consistently reproducible performance
> >> regression.
> >>
> >>
> >> I also notice this on a larger scale test. The cluster used is on EC2:
> >>
> >> ec2 instance type: m2.4xlarge
> >> 10 slaves, 1 master
> >> ephemeral storage
> >> 70 cores, 50 GB/box
> >>
> >> In this case, I have a 100GB dataset split into 78 files totally 350
> >>million
> >> items, and I take the first 50,000 items from the RDD. In this case, I
> >>have
> >> tested this on different formats of the raw data.
> >>
> >> With plaintext files:
> >>
> >> Spark 1.0.2: 0.422s, 0.363s, 0.382s
> >>
> >> Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
> >>
> >>
> >> With snappy-compressed Avro files:
> >>
> >> Spark 1.0.2: 0.73s, 0.395s, 0.426s
> >>
> >> Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
> >>
> >> Again demonstrating a reproducible performance regression.
> >>
> >> I was wondering if anyone else observed this regression, and if so, if
> >> anyone would have any idea what could possibly have caused it between
> >>Spark
> >> 1.0.2 and Spark 1.1.1?
> >>
> >> Thanks,
> >>
> >> -Matt Cheah
>


Re: [Performance] Possible regression in rdd.take()?

2015-02-18 Thread Matt Cheah
Ah okay, I turned on spark.localExecution.enabled and the performance
returned to what Spark 1.0.2 had. However I can see how users can
inadvertently incur memory and network strain in fetching the whole
partition to the driver.

I¹ll evaluate on my side if we want to turn this on or not. Thanks for the
quick and accurate response!

-Matt CHeah

From:  Aaron Davidson 
Date:  Wednesday, February 18, 2015 at 5:25 PM
To:  Matt Cheah 
Cc:  Patrick Wendell , "dev@spark.apache.org"
, Mingyu Kim , Sandor Van
Wassenhove 
Subject:  Re: [Performance] Possible regression in rdd.take()?

You might be seeing the result of this patch:

https://github.com/apache/spark/commit/d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5
797

which was introduced in 1.1.1. This patch disabled the ability for take() to
run without launching a Spark job, which means that the latency is
significantly increased for small jobs (but not for large ones). You can try
enabling local execution and seeing if your problem goes away.

On Wed, Feb 18, 2015 at 5:10 PM, Matt Cheah  wrote:
> I actually tested Spark 1.2.0 with the code in the rdd.take() method
> swapped out for what was in Spark 1.0.2. The run time was still slower,
> which indicates to me something at work lower in the stack.
> 
> -Matt Cheah
> 
> On 2/18/15, 4:54 PM, "Patrick Wendell"  wrote:
> 
>> >I believe the heuristic governing the way that take() decides to fetch
>> >partitions changed between these versions. It could be that in certain
>> >cases the new heuristic is worse, but it might be good to just look at
>> >the source code and see, for your number of elements taken and number
>> >of partitions, if there was any effective change in how aggressively
>> >spark fetched partitions.
>> >
>> >This was quite a while ago, but I think the change was made because in
>> >many cases the newer code works more efficiently.
>> >
>> >- Patrick
>> >
>> >On Wed, Feb 18, 2015 at 4:47 PM, Matt Cheah  wrote:
>>> >> Hi everyone,
>>> >>
>>> >> Between Spark 1.0.2 and Spark 1.1.1, I have noticed that rdd.take()
>>> >> consistently has a slower execution time on the later release. I was
>>> >> wondering if anyone else has had similar observations.
>>> >>
>>> >> I have two setups where this reproduces. The first is a local test. I
>>> >> launched a spark cluster with 4 worker JVMs on my Mac, and launched a
>>> >> Spark-Shell. I retrieved the text file and immediately called
>>> >>rdd.take(N) on
>>> >> it, where N varied. The RDD is a plaintext CSV, 4GB in size, split over
>>> >>8
>>> >> files, which ends up having 128 partitions, and a total of 8000
>>> >>rows.
>>> >> The numbers I discovered between Spark 1.0.2 and Spark 1.1.1 are, with
>>> >>all
>>> >> numbers being in seconds:
>>> >>
>>> >> 1 items
>>> >>
>>> >> Spark 1.0.2: 0.069281, 0.012261, 0.011083
>>> >>
>>> >> Spark 1.1.1: 0.11577, 0.097636, 0.11321
>>> >>
>>> >>
>>> >> 4 items
>>> >>
>>> >> Spark 1.0.2: 0.023751, 0.069365, 0.023603
>>> >>
>>> >> Spark 1.1.1: 0.224287, 0.229651, 0.158431
>>> >>
>>> >>
>>> >> 10 items
>>> >>
>>> >> Spark 1.0.2: 0.047019, 0.049056, 0.042568
>>> >>
>>> >> Spark 1.1.1: 0.353277, 0.288965, 0.281751
>>> >>
>>> >>
>>> >> 40 items
>>> >>
>>> >> Spark 1.0.2: 0.216048, 0.198049, 0.796037
>>> >>
>>> >> Spark 1.1.1: 1.865622, 2.224424, 2.037672
>>> >>
>>> >> This small test suite indicates a consistently reproducible performance
>>> >> regression.
>>> >>
>>> >>
>>> >> I also notice this on a larger scale test. The cluster used is on EC2:
>>> >>
>>> >> ec2 instance type: m2.4xlarge
>>> >> 10 slaves, 1 master
>>> >> ephemeral storage
>>> >> 70 cores, 50 GB/box
>>> >>
>>> >> In this case, I have a 100GB dataset split into 78 files totally 350
>>> >>million
>>> >> items, and I take the first 50,000 items from the RDD. In this case, I
>>> >>have
>>> >> tested this on different formats of the raw data.
>>> >>
>>> >> With plaintext files:
>>> >>
>>> >> Spark 1.0.2: 0.422s, 0.363s, 0.382s
>>> >>
>>> >> Spark 1.1.1: 4.54s, 1.28s, 1.221s, 1.13s
>>> >>
>>> >>
>>> >> With snappy-compressed Avro files:
>>> >>
>>> >> Spark 1.0.2: 0.73s, 0.395s, 0.426s
>>> >>
>>> >> Spark 1.1.1: 4.618s, 1.81s, 1.158s, 1.333s
>>> >>
>>> >> Again demonstrating a reproducible performance regression.
>>> >>
>>> >> I was wondering if anyone else observed this regression, and if so, if
>>> >> anyone would have any idea what could possibly have caused it between
>>> >>Spark
>>> >> 1.0.2 and Spark 1.1.1?
>>> >>
>>> >> Thanks,
>>> >>
>>> >> -Matt Cheah





smime.p7s
Description: S/MIME cryptographic signature


Spark-SQL 1.2.0 "sort by" results are not consistent with Hive

2015-02-18 Thread Kannan Rajah
According to hive documentation, "sort by" is supposed to order the results
for each reducer. So if we set a single reducer, then the results should be
sorted, right? But this is not happening. Any idea why? Looks like the
settings I am using to restrict the number of reducers is not having an
effect.

*Tried the following:*

Set spark.default.parallelism to 1

Set spark.sql.shuffle.partitions to 1

These were set in hive-site.xml and also inside spark shell.


*Spark-SQL*

create table if not exists testSortBy (key int, name string, age int);
LOAD DATA LOCAL INPATH '/home/mapr/sample-name-age.txt' OVERWRITE INTO TABLE
testSortBy;
select * from testSortBY;

1Aditya28
2aash25
3prashanth27
4bharath26
5terry27
6nanda26
7pradeep27
8pratyay26


set spark.default.parallelism=1;

set spark.sql.shuffle.partitions=1;

select name,age from testSortBy sort by age; aash 25 bharath 26 prashanth
27 Aditya 28 nanda 26 pratyay 26 terry 27 pradeep 27 *HIVE* select name,age
from testSortBy sort by age;

aash25
bharath26
nanda26
pratyay26
prashanth27
terry27
pradeep27
Aditya28


--
Kannan


Re: [VOTE] Release Apache Spark 1.3.0 (RC1)

2015-02-18 Thread Krishna Sankar
+1 (non-binding, of course)

1. Compiled OSX 10.10 (Yosemite) OK Total time: 14:50 min
 mvn clean package -Pyarn -Dyarn.version=2.6.0 -Phadoop-2.4
-Dhadoop.version=2.6.0 -Phive -DskipTests -Dscala-2.11
2. Tested pyspark, mlib - running as well as compare results with 1.1.x &
1.2.x
2.1. statistics (min,max,mean,Pearson,Spearman) OK
2.2. Linear/Ridge/Laso Regression OK

But MSE has increased from 40.81 to 105.86. Has some refactoring happened
on SGD/Linear Models ? Or do we have some extra parameters ? or change
of defaults ?

2.3. Decision Tree, Naive Bayes OK
2.4. KMeans OK
   Center And Scale OK
   WSSSE has come down slightly
2.5. rdd operations OK
  State of the Union Texts - MapReduce, Filter,sortByKey (word count)
2.6. Recommendation (Movielens medium dataset ~1 M ratings) OK
   Model evaluation/optimization (rank, numIter, lmbda) with itertools
OK
3. Scala - MLlib
3.1. statistics (min,max,mean,Pearson,Spearman) OK
3.2. LinearRegressionWIthSGD OK
3.3. Decision Tree OK
3.4. KMeans OK
3.5. Recommendation (Movielens medium dataset ~1 M ratings) OK

Cheers

P.S: For some reason replacing  "import sqlContext.createSchemaRDD" with "
import sqlContext.implicits._" doesn't do the implicit conversations.
registerTempTable
gives syntax error. I will dig deeper tomorrow. Has anyone seen this ?

On Wed, Feb 18, 2015 at 3:25 PM, Sean Owen  wrote:

> On Wed, Feb 18, 2015 at 6:13 PM, Patrick Wendell 
> wrote:
> >> Patrick this link gives a 404:
> >> https://people.apache.org/keys/committer/pwendell.asc
> >
> > Works for me. Maybe it's some ephemeral issue?
>
> Yes works now; I swear it didn't before! that's all set now. The
> signing key is in that file.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


If job fails shuffle space is not cleaned

2015-02-18 Thread Debasish Das
Hi,

Some of my jobs failed due to no space left on device and on those jobs I
was monitoring the shuffle space...when the job failed shuffle space did
not clean and I had to manually clean it...

Is there a JIRA already tracking this issue ? If no one has been assigned
to it, I can take a look.

Thanks.
Deb