Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nicholas Chammas
OK, thanks for confirming. Is there something we can do about that leftover
part- files problem in Spark, or is that for the Hadoop team?


2014년 6월 2일 월요일, Aaron Davidson님이 작성한 메시지:

> Yes.
>
>
> On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
> So in summary:
>
>- As of Spark 1.0.0, saveAsTextFile() will no longer clobber by
>default.
>- There is an open JIRA issue to add an option to allow clobbering.
>- Even when clobbering, part- files may be left over from previous
>saves, which is dangerous.
>
> Is this correct?
>
>
> On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson  wrote:
>
> +1 please re-add this feature
>
>
> On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell 
> wrote:
>
> Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
> I accidentally assigned myself way back when I created it). This
> should be an easy fix.
>
> On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  wrote:
> > Hi, Patrick,
> >
> > I think https://issues.apache.org/jira/browse/SPARK-1677 is talking
> about
> > the same thing?
> >
> > How about assigning it to me?
> >
> > I think I missed the configuration part in my previous commit, though I
> > declared that in the PR description
> >
> > Best,
> >
> > --
> > Nan Zhu
> >
> > On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
> >
> > Hey There,
> >
> > The issue was that the old behavior could cause users to silently
> > overwrite data, which is pretty bad, so to be conservative we decided
> > to enforce the same checks that Hadoop does.
> >
> > This was documented by this JIRA:
> > https://issues.apache.org/jira/browse/SPARK-1100
> >
> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
> >
> > However, it would be very easy to add an option that allows preserving
> > the old behavior. Is anyone here interested in contributing that? I
> > created a JIRA for it:
> >
> > https://issues.apache.org/jira/browse/SPARK-1993
> >
> > - Patrick
> >
> > On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
> >  wrote:
> >
> > Indeed, the behavior has changed for good or for bad. I mean, I agree
> with
> > the danger you mention but I'm not sure it's happening like that. Isn't
> > there a mechanism for overwrite in Hadoop that automatically removes part
> > files, then writes a _temporary folder and then only the part files along
> > with the _success folder.
> >
> > In any case this change of behavior should be documented IMO.
> >
> > Cheers
> > Pierre
> >
> > Message sent from a mobile device - excuse typos and abbreviations
> >
> > Le 2 juin 2014 à 17:42, Nicholas Chammas  a
> > écrit :
> >
> > What I've found using saveAsTextFile() against S3 (prior to Spark
> 1.0.0.) is
> > that files get overwritten automatically. This is one danger to this
> though.
> > If I save to a directory that already has 20 part- files, but this time
> > around I'm only saving 15 part- files, then there will be 5 leftover
> part-
> > files from the previous set mixed in with the 15 newer files. This is
> > potentially dangerous.
> >
> > I haven't checked to see if this behavior has changed in 1.0.0. Are you
>
>


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Pierre Borckmans
I'm a bit confused because the PR mentioned by Patrick seems to adress all 
these issues:
https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1

Was it not accepted? Or is the description of this PR not completely 
implemented?

Message sent from a mobile device - excuse typos and abbreviations

> Le 2 juin 2014 à 23:08, Nicholas Chammas  a écrit 
> :
> 
> OK, thanks for confirming. Is there something we can do about that leftover 
> part- files problem in Spark, or is that for the Hadoop team?
> 
> 
> 2014년 6월 2일 월요일, Aaron Davidson님이 작성한 메시지:
>> Yes.
>> 
>> 
>> On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas 
>>  wrote:
>> So in summary:
>> As of Spark 1.0.0, saveAsTextFile() will no longer clobber by default.
>> There is an open JIRA issue to add an option to allow clobbering.
>> Even when clobbering, part- files may be left over from previous saves, 
>> which is dangerous.
>> Is this correct?
>> 
>> 
>> On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson  wrote:
>> +1 please re-add this feature
>> 
>> 
>> On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell  wrote:
>> Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
>> I accidentally assigned myself way back when I created it). This
>> should be an easy fix.
>> 
>> On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  wrote:
>> > Hi, Patrick,
>> >
>> > I think https://issues.apache.org/jira/browse/SPARK-1677 is talking about
>> > the same thing?
>> >
>> > How about assigning it to me?
>> >
>> > I think I missed the configuration part in my previous commit, though I
>> > declared that in the PR description
>> >
>> > Best,
>> >
>> > --
>> > Nan Zhu
>> >
>> > On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
>> >
>> > Hey There,
>> >
>> > The issue was that the old behavior could cause users to silently
>> > overwrite data, which is pretty bad, so to be conservative we decided
>> > to enforce the same checks that Hadoop does.
>> >
>> > This was documented by this JIRA:
>> > https://issues.apache.org/jira/browse/SPARK-1100
>> > https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>> >
>> > However, it would be very easy to add an option that allows preserving
>> > the old behavior. Is anyone here interested in contributing that? I
>> > created a JIRA for it:
>> >
>> > https://issues.apache.org/jira/browse/SPARK-1993
>> >
>> > - Patrick
>> >
>> > On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
>> >  wrote:
>> >
>> > Indeed, the behavior has changed for good or for bad. I mean, I agree with
>> > the danger you mention but I'm not sure it's happening like that. Isn't
>> > there a mechanism for overwrite in Hadoop that automatically removes part
>> > files, then writes a _temporary folder and then only the part files along
>> > with the _success folder.
>> >
>> > In any case this change of behavior should be documented IMO.
>> >
>> > Cheers
>> > Pierre
>> >
>> > Message sent from a mobile device - excuse typos and abbreviations
>> >
>> > Le 2 juin 2014 à 17:42, Nicholas Chammas  a
>> > écrit :
>> >
>> > What I've found using saveAsTextFile() against S3 (prior to Spark 1.0.0.) 
>> > is
>> > that files get overwritten automatically. This is one danger to this 
>> > though.
>> > If I save to a directory that already has 20 part- files, but this time
>> > around I'm only saving 15 part- files, then there will be 5 leftover part-
>> > files from the previous set mixed in with the 15 newer files. This is
>> > potentially dangerous.
>> >
>> > I haven't checked to see if this behavior has changed in 1.0.0. Are you


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Sean Owen
I assume the idea is for Spark to "rm -r dir/", which would clean out
everything that was there before. It's just doing this instead of the
caller. Hadoop still won't let you write into a location that already
exists regardless, and part of that is for this reason that you might
end up with files mixed-up from different jobs.

This doesn't need a change to Hadoop and probably shouldn't; it's a
change to semantics provided by Spark to do the delete for you if you
set a flag. Viewed that way, meh, seems like the caller could just do
that themselves rather than expand the Spark API (via a utility method
if you like), but I can see it both ways. Caller beware.

On Mon, Jun 2, 2014 at 10:08 PM, Nicholas Chammas
 wrote:
> OK, thanks for confirming. Is there something we can do about that leftover
> part- files problem in Spark, or is that for the Hadoop team?
>
>
> 2014년 6월 2일 월요일, Aaron Davidson님이 작성한 메시지:
>
>> Yes.
>>
>>
>> On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas
>>  wrote:
>>
>> So in summary:
>>
>> As of Spark 1.0.0, saveAsTextFile() will no longer clobber by default.
>> There is an open JIRA issue to add an option to allow clobbering.
>> Even when clobbering, part- files may be left over from previous saves,
>> which is dangerous.
>>
>> Is this correct?


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nicholas Chammas
Fair enough. That rationale makes sense.

I would prefer that a Spark clobber option also delete the destination
files, but as long as it's a non-default option I can see the "caller
beware" side of that argument as well.

Nick


2014년 6월 2일 월요일, Sean Owen님이 작성한 메시지:

> I assume the idea is for Spark to "rm -r dir/", which would clean out
> everything that was there before. It's just doing this instead of the
> caller. Hadoop still won't let you write into a location that already
> exists regardless, and part of that is for this reason that you might
> end up with files mixed-up from different jobs.
>
> This doesn't need a change to Hadoop and probably shouldn't; it's a
> change to semantics provided by Spark to do the delete for you if you
> set a flag. Viewed that way, meh, seems like the caller could just do
> that themselves rather than expand the Spark API (via a utility method
> if you like), but I can see it both ways. Caller beware.
>
> On Mon, Jun 2, 2014 at 10:08 PM, Nicholas Chammas
> > wrote:
> > OK, thanks for confirming. Is there something we can do about that
> leftover
> > part- files problem in Spark, or is that for the Hadoop team?
> >
> >
> > 2014년 6월 2일 월요일, Aaron Davidson>님이
> 작성한 메시지:
> >
> >> Yes.
> >>
> >>
> >> On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas
> >> > wrote:
> >>
> >> So in summary:
> >>
> >> As of Spark 1.0.0, saveAsTextFile() will no longer clobber by default.
> >> There is an open JIRA issue to add an option to allow clobbering.
> >> Even when clobbering, part- files may be left over from previous saves,
> >> which is dangerous.
> >>
> >> Is this correct?
>


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nan Zhu
I made the PR, the problem is …after many rounds of review, that configuration 
part is missed….sorry about that  

I will fix it  

Best,  

--  
Nan Zhu


On Monday, June 2, 2014 at 5:13 PM, Pierre Borckmans wrote:

> I'm a bit confused because the PR mentioned by Patrick seems to adress all 
> these issues:
> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>  
> Was it not accepted? Or is the description of this PR not completely 
> implemented?
>  
> Message sent from a mobile device - excuse typos and abbreviations
>  
> Le 2 juin 2014 à 23:08, Nicholas Chammas  (mailto:nicholas.cham...@gmail.com)> a écrit :
>  
> > OK, thanks for confirming. Is there something we can do about that leftover 
> > part- files problem in Spark, or is that for the Hadoop team?
> >  
> >  
> > 2014년 6월 2일 월요일, Aaron Davidson > (mailto:ilike...@gmail.com)>님이 작성한 메시지:
> > > Yes.
> > >  
> > >  
> > > On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas 
> > >  wrote:
> > > > So in summary:
> > > > As of Spark 1.0.0, saveAsTextFile() will no longer clobber by default.
> > > > There is an open JIRA issue to add an option to allow clobbering.
> > > > Even when clobbering, part- files may be left over from previous saves, 
> > > > which is dangerous.
> > > >  
> > > > Is this correct?
> > > >  
> > > >  
> > > >  
> > > >  
> > > > On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson  
> > > > wrote:
> > > > > +1 please re-add this feature
> > > > >  
> > > > >  
> > > > > On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell  
> > > > > wrote:
> > > > > > Thanks for pointing that out. I've assigned you to SPARK-1677 (I 
> > > > > > think
> > > > > > I accidentally assigned myself way back when I created it). This
> > > > > > should be an easy fix.
> > > > > >  
> > > > > > On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  
> > > > > > wrote:
> > > > > > > Hi, Patrick,
> > > > > > >
> > > > > > > I think https://issues.apache.org/jira/browse/SPARK-1677 is 
> > > > > > > talking about
> > > > > > > the same thing?
> > > > > > >
> > > > > > > How about assigning it to me?
> > > > > > >
> > > > > > > I think I missed the configuration part in my previous commit, 
> > > > > > > though I
> > > > > > > declared that in the PR description
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > --
> > > > > > > Nan Zhu
> > > > > > >
> > > > > > > On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
> > > > > > >
> > > > > > > Hey There,
> > > > > > >
> > > > > > > The issue was that the old behavior could cause users to silently
> > > > > > > overwrite data, which is pretty bad, so to be conservative we 
> > > > > > > decided
> > > > > > > to enforce the same checks that Hadoop does.
> > > > > > >
> > > > > > > This was documented by this JIRA:
> > > > > > > https://issues.apache.org/jira/browse/SPARK-1100
> > > > > > > https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
> > > > > > >
> > > > > > > However, it would be very easy to add an option that allows 
> > > > > > > preserving
> > > > > > > the old behavior. Is anyone here interested in contributing that? 
> > > > > > > I
> > > > > > > created a JIRA for it:
> > > > > > >
> > > > > > > https://issues.apache.org/jira/browse/SPARK-1993
> > > > > > >
> > > > > > > - Patrick
> > > > > > >
> > > > > > > On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
> > > > > > >  wrote:
> > > > > > >
> > > > > > > Indeed, the behavior has changed for good or for bad. I mean, I 
> > > > > > > agree with
> > > > > > > the danger you mention but I'm not sure it's happening like that. 
> > > > > > > Isn't
> > > > > > > there a mechanism for overwrite in Hadoop that automatically 
> > > > > > > removes part
> > > > > > > files, then writes a _temporary folder and then only the part 
> > > > > > > files along
> > > > > > > with the _success folder.
> > > > > > >
> > > > > > > In any case this change of behavior should be documented IMO.
> > > > > > >
> > > > > > > Cheers
> > > > > > > Pierre
> > > > > > >
> > > > > > > Message sent from a mobile device - excuse typos and abbreviations
> > > > > > >
> > > > > > > Le 2 juin 2014 à 17:42, Nicholas Chammas 
> > > > > > >  a
> > > > > > > écrit :
> > > > > > >
> > > > > > > What I've found using saveAsTextFile() against S3 (prior to Spark 
> > > > > > > 1.0.0.) is
> > > > > > > that files get overwritten automatically. This is one danger to 
> > > > > > > this though.
> > > > > > > If I save to a directory that already has 20 part- files, but 
> > > > > > > this time
> > > > > > > around I'm only saving 15 part- files, then there will be 5 
> > > > > > > leftover part-
> > > > > > > files from the previous set mixed in with the 15 newer files. 
> > > > > > > This is
> > > > > > > potentially dangerous.
> > > > > > >
> > > > > > > I haven't checked to see if this behavior has changed in 1.0.0. 
> > > > > > > Are you



Re: How to create RDDs from another RDD?

2014-06-02 Thread Andrew Ash
Hi Gerard,

Usually when I want to split one RDD into several, I'm better off
re-thinking the algorithm to do all the computation at once.  Example:

Suppose you had a dataset that was the tuple (URL, webserver,
pageSizeBytes), and you wanted to find out the average page size that each
webserver (e.g. Apache, nginx, IIS, etc) served.  Rather than splitting
your allPagesRDD into an RDD for each webserver, like nginxRDD, apacheRDD,
IISRDD, it's probably better to do the average computation over all at
once, like this:

// allPagesRDD is (URL, webserver, pageSizeBytes)
allPagesRDD.keyBy(getWebserver)
  .map(k => (k.pageSizeBytes, 1))
  .reduceByKey( (a,b) => (a._1 + b._1, a._2 + b._2)
  .mapValues( v => (v._1 / v._2) )

For this example you could use something like Summingbird to keep from
doing the average tracking yourself.

Can you go into more detail about why you want to split one RDD into
several?


On Mon, Jun 2, 2014 at 1:13 PM, Gerard Maas  wrote:

> The RDD API has  functions to join multiple RDDs, such as PariRDD.join
> or PariRDD.cogroup that take another RDD as input. e.g.
>  firstRDD.join(secondRDD)
>
> I'm looking for ways to do the opposite: split an existing RDD. What is
> the right way to create derivate RDDs from an existing RDD?
>
> e.g. imagine I've an  collection or pairs as input: colRDD =
>  (k1->v1)...(kx->vy)...
> I could do:
> val byKey = colRDD.groupByKey() = (k1->(k1->v1...
> k1->vn)),...(kn->(kn->vy, ...))
>
> Now, I'd like to create an RDD from the values to have something like:
>
> val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))
>
> in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?
>
> Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the
> right/recommended way to do this?  Any other options?
>
> Thanks,
>
> Gerard.
>


Interactive modification of DStreams

2014-06-02 Thread lbustelo
This is a general question about whether Spark Streaming can be interactive
like batch Spark jobs. I've read plenty of threads and done my fair bit of
experimentation and I'm thinking the answer is NO, but it does not hurt to
ask. 

More specifically, I would like to be able to do:
1. Add/Remove steps to the Streaming Job
2. Modify Window durations 
3. Stop and Restart context.

I've tried the following:

1. Modify the DStream after it has been started… BOOM! Exceptions
everywhere.

2. Stop the DStream, Make modification, Start… NOT GOOD :( In 0.9.0 I was
getting deadlocks. I also tried 1.0.0 and it did not work.

3. Based on information provided  here

 
, I was been able to prototype modifying the RDD computation within a
forEachRDD. That is nice, but you are then bounded to the specified batch
size. That got me to wanting to modify Window durations. Is changing the
Window duration possible?

4. Tried running multiple streaming context from within a single Driver
application and got several exceptions. The first one was bind exception on
the web port. Then once the app started getting run (cores were taken but
1st job) it did not run correctly. A lot of
"akka.pattern.AskTimeoutException: Timed out"
.

I've tried my experiments in 0.9.0, 0.9.1 and 1.0.0 running on Standalone
Cluster setup.
Thanks in advanced



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Interactive-modification-of-DStreams-tp6740.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nicholas Chammas
Ah yes, this was indeed intended to have been taken care of

:

add some new APIs with a flag for users to define whether he/she wants to
> overwrite the directory: if the flag is set to true, *then the output
> directory is deleted first* and then written into the new data to prevent
> the output directory contains results from multiple rounds of running;



On Mon, Jun 2, 2014 at 5:47 PM, Nan Zhu  wrote:

>  I made the PR, the problem is …after many rounds of review, that
> configuration part is missed….sorry about that
>
> I will fix it
>
> Best,
>
> --
> Nan Zhu
>
> On Monday, June 2, 2014 at 5:13 PM, Pierre Borckmans wrote:
>
> I'm a bit confused because the PR mentioned by Patrick seems to adress all
> these issues:
>
> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>
> Was it not accepted? Or is the description of this PR not completely
> implemented?
>
> Message sent from a mobile device - excuse typos and abbreviations
>
> Le 2 juin 2014 à 23:08, Nicholas Chammas  a
> écrit :
>
> OK, thanks for confirming. Is there something we can do about that
> leftover part- files problem in Spark, or is that for the Hadoop team?
>
>
> 2014년 6월 2일 월요일, Aaron Davidson님이 작성한 메시지:
>
> Yes.
>
>
> On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
> So in summary:
>
>- As of Spark 1.0.0, saveAsTextFile() will no longer clobber by
>default.
>- There is an open JIRA issue to add an option to allow clobbering.
>- Even when clobbering, part- files may be left over from previous
>saves, which is dangerous.
>
> Is this correct?
>
>
> On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson  wrote:
>
> +1 please re-add this feature
>
>
> On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell 
> wrote:
>
> Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
> I accidentally assigned myself way back when I created it). This
> should be an easy fix.
>
> On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  wrote:
> > Hi, Patrick,
> >
> > I think https://issues.apache.org/jira/browse/SPARK-1677 is talking
> about
> > the same thing?
> >
> > How about assigning it to me?
> >
> > I think I missed the configuration part in my previous commit, though I
> > declared that in the PR description
> >
> > Best,
> >
> > --
> > Nan Zhu
> >
> > On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
> >
> > Hey There,
> >
> > The issue was that the old behavior could cause users to silently
> > overwrite data, which is pretty bad, so to be conservative we decided
> > to enforce the same checks that Hadoop does.
> >
> > This was documented by this JIRA:
> > https://issues.apache.org/jira/browse/SPARK-1100
> >
> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
> >
> > However, it would be very easy to add an option that allows preserving
> > the old behavior. Is anyone here interested in contributing that? I
> > created a JIRA for it:
> >
> > https://issues.apache.org/jira/browse/SPARK-1993
> >
> > - Patrick
> >
> > On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
> >  wrote:
> >
> > Indeed, the behavior has changed for good or for bad. I mean, I agree
> with
> > the danger you mention but I'm not sure it's happening like that. Isn't
> > there a mechanism for overwrite in Hadoop that automatically removes part
> > files, then writes a _temporary folder and then only the part files along
> > with the _success folder.
> >
> > In any case this change of behavior should be documented IMO.
> >
> > Cheers
> > Pierre
> >
> > Message sent from a mobile device - excuse typos and abbreviations
> >
> > Le 2 juin 2014 à 17:42, Nicholas Chammas  a
> > écrit :
> >
> > What I've found using saveAsTextFile() against S3 (prior to Spark
> 1.0.0.) is
> > that files get overwritten automatically. This is one danger to this
> though.
> > If I save to a directory that already has 20 part- files, but this time
> > around I'm only saving 15 part- files, then there will be 5 leftover
> part-
> > files from the previous set mixed in with the 15 newer files. This is
> > potentially dangerous.
> >
> > I haven't checked to see if this behavior has changed in 1.0.0. Are you
>
>
>


NoSuchElementException: key not found

2014-06-02 Thread Michael Chang
Hi all,

Seeing a random exception kill my spark streaming job. Here's a stack
trace:

java.util.NoSuchElementException: key not found: 32855
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211)
at
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072)
at
org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716)
at
org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172)
at
org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189)
at
org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at
org.apache.spark.rdd.PartitionCoalescer$LocationIterator.(CoalescedRDD.scala:183)
at
org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234)
at
org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333)
at
org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at
org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.RDD.take(RDD.scala:830)
at
org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337)
at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27)
at
com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:87)
at
com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:53)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

It doesn't seem to happen consistently, but I have no idea causes it.  Has
anyone seen this before?  The PersistToKafkaFunction here is just trying to
write the elements in a RDD to a

using Log4j to log INFO level messages on workers

2014-06-02 Thread Shivani Rao
Hello Spark fans,

I am trying to log messages from my spark application. When the main()
function attempts to log, using log.info() it works great, but when I try
the same command from the code that probably runs on the worker, I
initially got an serialization error. To solve that, I created a new logger
in the code that operates on the data, which solved the serialization issue
but now there is no output in the console or on the worker node logs. I
don't see any application level log messages in the spark logs either. When
I use println() instead, I do see console output being  generated.

I tried the following and none of them works

a) pass log4j.properties by using -Dlog4j.properties in my java command
line initiation of the spark application
b) setting the properties within the worker by calling log.addAppender(new
ConsoleAppender)

None of them work.

What am i missing?


Thanks,
Shivani
-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Fwd: SecurityException when running tests with Spark 1.0.0

2014-06-02 Thread Mohit Nayak
Hi,
I've upgraded to Spark 1.0.0. I'm not able to run any tests. They throw a

*java.lang.SecurityException: class
"javax.servlet.FilterRegistration"'s signer information does not match
signer information of other classes in the same package*


I'm using Hadoop-core 1.0.4 and running this locally.
I noticed that there was an issue regarding this and was marked as resolved
[https://issues.apache.org/jira/browse/SPARK-1693]
Please guide..

-- 
-Mohit
wiza...@gmail.com



-- 
-Mohit
wiza...@gmail.com


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Patrick Wendell
We can just add back a flag to make it backwards compatible - it was
just missed during the original PR.

Adding a *third* set of "clobber" semantics, I'm slightly -1 on that
for the following reasons:

1. It's scary to have Spark recursively deleting user files, could
easily lead to users deleting data by mistake if they don't understand
the exact semantics.
2. It would introduce a third set of semantics here for saveAsXX...
3. It's trivial for users to implement this with two lines of code (if
output dir exists, delete it) before calling saveAsHadoopFile.

- Patrick

On Mon, Jun 2, 2014 at 2:49 PM, Nicholas Chammas
 wrote:
> Ah yes, this was indeed intended to have been taken care of:
>
>> add some new APIs with a flag for users to define whether he/she wants to
>> overwrite the directory: if the flag is set to true, then the output
>> directory is deleted first and then written into the new data to prevent the
>> output directory contains results from multiple rounds of running;
>
>
>
> On Mon, Jun 2, 2014 at 5:47 PM, Nan Zhu  wrote:
>>
>> I made the PR, the problem is …after many rounds of review, that
>> configuration part is missed….sorry about that
>>
>> I will fix it
>>
>> Best,
>>
>> --
>> Nan Zhu
>>
>> On Monday, June 2, 2014 at 5:13 PM, Pierre Borckmans wrote:
>>
>> I'm a bit confused because the PR mentioned by Patrick seems to adress all
>> these issues:
>>
>> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>>
>> Was it not accepted? Or is the description of this PR not completely
>> implemented?
>>
>> Message sent from a mobile device - excuse typos and abbreviations
>>
>> Le 2 juin 2014 à 23:08, Nicholas Chammas  a
>> écrit :
>>
>> OK, thanks for confirming. Is there something we can do about that
>> leftover part- files problem in Spark, or is that for the Hadoop team?
>>
>>
>> 2014년 6월 2일 월요일, Aaron Davidson님이 작성한 메시지:
>>
>> Yes.
>>
>>
>> On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas
>>  wrote:
>>
>> So in summary:
>>
>> As of Spark 1.0.0, saveAsTextFile() will no longer clobber by default.
>> There is an open JIRA issue to add an option to allow clobbering.
>> Even when clobbering, part- files may be left over from previous saves,
>> which is dangerous.
>>
>> Is this correct?
>>
>>
>> On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson  wrote:
>>
>> +1 please re-add this feature
>>
>>
>> On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell 
>> wrote:
>>
>> Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
>> I accidentally assigned myself way back when I created it). This
>> should be an easy fix.
>>
>> On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  wrote:
>> > Hi, Patrick,
>> >
>> > I think https://issues.apache.org/jira/browse/SPARK-1677 is talking
>> > about
>> > the same thing?
>> >
>> > How about assigning it to me?
>> >
>> > I think I missed the configuration part in my previous commit, though I
>> > declared that in the PR description
>> >
>> > Best,
>> >
>> > --
>> > Nan Zhu
>> >
>> > On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
>> >
>> > Hey There,
>> >
>> > The issue was that the old behavior could cause users to silently
>> > overwrite data, which is pretty bad, so to be conservative we decided
>> > to enforce the same checks that Hadoop does.
>> >
>> > This was documented by this JIRA:
>> > https://issues.apache.org/jira/browse/SPARK-1100
>> >
>> > https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>> >
>> > However, it would be very easy to add an option that allows preserving
>> > the old behavior. Is anyone here interested in contributing that? I
>> > created a JIRA for it:
>> >
>> > https://issues.apache.org/jira/browse/SPARK-1993
>> >
>> > - Patrick
>> >
>> > On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
>> >  wrote:
>> >
>> > Indeed, the behavior has changed for good or for bad. I mean, I agree
>> > with
>> > the danger you mention but I'm not sure it's happening like that. Isn't
>> > there a mechanism for overwrite in Hadoop that automatically removes
>> > part
>> > files, then writes a _temporary folder and then only the part files
>> > along
>> > with the _success folder.
>> >
>> > In any case this change of behavior should be documented IMO.
>> >
>> > Cheers
>> > Pierre
>> >
>> > Message sent from a mobile device - excuse typos and abbreviations
>> >
>> > Le 2 juin 2014 à 17:42, Nicholas Chammas  a
>> > écrit :
>> >
>> > What I've found using saveAsTextFile() against S3 (prior to Spark
>> > 1.0.0.) is
>> > that files get overwritten automatically. This is one danger to this
>> > though.
>> > If I save to a directory that already has 20 part- files, but this time
>> > around I'm only saving 15 part- files, then there will be 5 leftover
>> > part-
>> > files from the previous set mixed in with the 15 newer files. This is
>> > potentially dangerous.
>> >
>> > I haven't checked to see if this behavior has changed in 1.0.0. Are you
>>
>>
>


Re: SecurityException when running tests with Spark 1.0.0

2014-06-02 Thread Sean Owen
This ultimately means you have a couple copies of the servlet APIs in
the build. What is your build like (SBT? Maven?) and what exactly are
you depending on?

On Tue, Jun 3, 2014 at 12:21 AM, Mohit Nayak  wrote:
> Hi,
> I've upgraded to Spark 1.0.0. I'm not able to run any tests. They throw a
>
> java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s
> signer information does not match signer information of other classes in the
> same package
>
>
> I'm using Hadoop-core 1.0.4 and running this locally.
> I noticed that there was an issue regarding this and was marked as resolved
> [https://issues.apache.org/jira/browse/SPARK-1693]
> Please guide..
>
> --
> -Mohit
> wiza...@gmail.com
>
>
>
> --
> -Mohit
> wiza...@gmail.com


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Sean Owen
Is there a third way? Unless I miss something. Hadoop's OutputFormat
wants the target dir to not exist no matter what, so it's just a
question of whether Spark deletes it for you or errors.

On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell  wrote:
> We can just add back a flag to make it backwards compatible - it was
> just missed during the original PR.
>
> Adding a *third* set of "clobber" semantics, I'm slightly -1 on that
> for the following reasons:
>
> 1. It's scary to have Spark recursively deleting user files, could
> easily lead to users deleting data by mistake if they don't understand
> the exact semantics.
> 2. It would introduce a third set of semantics here for saveAsXX...
> 3. It's trivial for users to implement this with two lines of code (if
> output dir exists, delete it) before calling saveAsHadoopFile.
>
> - Patrick
>


Re: SecurityException when running tests with Spark 1.0.0

2014-06-02 Thread Mohit Nayak
Hey,
Thanks for the reply.

I am using SBT. Here is a list of my dependancies:
val sparkCore= "org.apache.spark" % "spark-core_2.10" % V.spark
val hadoopCore   = "org.apache.hadoop" % "hadoop-core"   %
V.hadoop% "provided"
val jodaTime = "com.github.nscala-time" %% "nscala-time" %
"0.8.0"
val scalaUtil= "com.twitter"   %% "util-collection"  %
V.util
val logback  = "ch.qos.logback" % "logback-classic" % "1.0.6" %
"runtime"
var openCsv  = "net.sf.opencsv" % "opencsv" % "2.1"
var scalaTest= "org.scalatest" % "scalatest_2.10" % "2.1.0" % "test"
var scalaIOCore  = "com.github.scala-incubator.io" %% "scala-io-core" %
V.scalaIO
var scalaIOFile  = "com.github.scala-incubator.io" %% "scala-io-file" %
V.scalaIO
var kryo = "com.esotericsoftware.kryo" % "kryo" % "2.16"
var spray = "io.spray" %%  "spray-json" % "1.2.5"
var scala_reflect = "org.scala-lang" % "scala-reflect" % "2.10.3"



On Mon, Jun 2, 2014 at 4:23 PM, Sean Owen  wrote:

> This ultimately means you have a couple copies of the servlet APIs in
> the build. What is your build like (SBT? Maven?) and what exactly are
> you depending on?
>
> On Tue, Jun 3, 2014 at 12:21 AM, Mohit Nayak  wrote:
> > Hi,
> > I've upgraded to Spark 1.0.0. I'm not able to run any tests. They throw a
> >
> > java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s
> > signer information does not match signer information of other classes in
> the
> > same package
> >
> >
> > I'm using Hadoop-core 1.0.4 and running this locally.
> > I noticed that there was an issue regarding this and was marked as
> resolved
> > [https://issues.apache.org/jira/browse/SPARK-1693]
> > Please guide..
> >
> > --
> > -Mohit
> > wiza...@gmail.com
> >
> >
> >
> > --
> > -Mohit
> > wiza...@gmail.com
>



-- 
-Mohit
wiza...@gmail.com


Processing audio/video/images

2014-06-02 Thread jamal sasha
Hi,
  How do one process for data sources other than text?
Lets say I have millions of mp3 (or jpeg) files and I want to use spark to
process them?
How does one go about it.


I have never been able to figure this out..
Lets say I have this library in python which works like following:

import audio

song = audio.read_mp3(filename)

Then most of the methods are attached to song or maybe there is another
function which takes "song" type as an input.

Maybe the above is just rambling.. but how do I use spark to process (say)
audiio files.
Thanks


Re: Processing audio/video/images

2014-06-02 Thread Marcelo Vanzin
Hi Jamal,

If what you want is to process lots of files in parallel, the best
approach is probably to load all file names into an array and
parallelize that. Then each task will take a path as input and can
process it however it wants.

Or you could write the file list to a file, and then use sc.textFile()
to open it (assuming one path per line), and the rest is pretty much
the same as above.

It will probably be hard to process each individual file in parallel,
unless mp3 and jpg files can be split into multiple blocks that can be
processed separately. In that case, you'd need a custom (Hadoop) input
format that is able to calculate the splits. But it doesn't sound like
that's what you want.



On Mon, Jun 2, 2014 at 5:02 PM, jamal sasha  wrote:
> Hi,
>   How do one process for data sources other than text?
> Lets say I have millions of mp3 (or jpeg) files and I want to use spark to
> process them?
> How does one go about it.
>
>
> I have never been able to figure this out..
> Lets say I have this library in python which works like following:
>
> import audio
>
> song = audio.read_mp3(filename)
>
> Then most of the methods are attached to song or maybe there is another
> function which takes "song" type as an input.
>
> Maybe the above is just rambling.. but how do I use spark to process (say)
> audiio files.
> Thanks



-- 
Marcelo


Re: SecurityException when running tests with Spark 1.0.0

2014-06-02 Thread Sean Owen
If it's the SBT build, I suspect you are hitting
https://issues.apache.org/jira/browse/SPARK-1949

Can you try to apply the excludes you see at
https://github.com/apache/spark/pull/906/files to your build to see if
it resolves it?

If so I think this could be helpful to commit.

On Tue, Jun 3, 2014 at 1:01 AM, Mohit Nayak  wrote:
> Hey,
> Thanks for the reply.
>
> I am using SBT. Here is a list of my dependancies:
> val sparkCore= "org.apache.spark" % "spark-core_2.10" % V.spark
> val hadoopCore   = "org.apache.hadoop" % "hadoop-core"   %
> V.hadoop% "provided"
> val jodaTime = "com.github.nscala-time" %% "nscala-time" %
> "0.8.0"
> val scalaUtil= "com.twitter"   %% "util-collection"  %
> V.util
> val logback  = "ch.qos.logback" % "logback-classic" % "1.0.6" %
> "runtime"
> var openCsv  = "net.sf.opencsv" % "opencsv" % "2.1"
> var scalaTest= "org.scalatest" % "scalatest_2.10" % "2.1.0" % "test"
> var scalaIOCore  = "com.github.scala-incubator.io" %% "scala-io-core" %
> V.scalaIO
> var scalaIOFile  = "com.github.scala-incubator.io" %% "scala-io-file" %
> V.scalaIO
> var kryo = "com.esotericsoftware.kryo" % "kryo" % "2.16"
> var spray = "io.spray" %%  "spray-json" % "1.2.5"
> var scala_reflect = "org.scala-lang" % "scala-reflect" % "2.10.3"
>
>
>
> On Mon, Jun 2, 2014 at 4:23 PM, Sean Owen  wrote:
>>
>> This ultimately means you have a couple copies of the servlet APIs in
>> the build. What is your build like (SBT? Maven?) and what exactly are
>> you depending on?
>>
>> On Tue, Jun 3, 2014 at 12:21 AM, Mohit Nayak  wrote:
>> > Hi,
>> > I've upgraded to Spark 1.0.0. I'm not able to run any tests. They throw
>> > a
>> >
>> > java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s
>> > signer information does not match signer information of other classes in
>> > the
>> > same package
>> >
>> >
>> > I'm using Hadoop-core 1.0.4 and running this locally.
>> > I noticed that there was an issue regarding this and was marked as
>> > resolved
>> > [https://issues.apache.org/jira/browse/SPARK-1693]
>> > Please guide..
>> >
>> > --
>> > -Mohit
>> > wiza...@gmail.com
>> >
>> >
>> >
>> > --
>> > -Mohit
>> > wiza...@gmail.com
>
>
>
>
> --
> -Mohit
> wiza...@gmail.com


Re: Interactive modification of DStreams

2014-06-02 Thread Tathagata Das
Currently Spark Streaming does not support addition/deletion/modification
of DStream after the streaming context has been started.
Nor can you restart a stopped streaming context.
Also, multiple spark contexts (and therefore multiple streaming contexts)
cannot be run concurrently in the same JVM.

To change the window duration, I would one of the following.

1. Stop the previous streaming context, create a new streaming context, and
setup the dstreams once again with the new window duration
2. Create a custom DStream, say DynamicWindowDStream. Take a look at how
WindowedDStream

is implemented (pretty simple, just a union over RDDs across time). That
should allow you to modify the window duration. However, do make sure you
have a maximum window duration that you will ever reach, and make sure you
define parentRememberDuration

as
a "rememberDuration + maxWindowDuration". That fields defines which RDDs
can be forgotten, so is sensitive to the window duration. Then you have to
take care of correctly (atomically, etc.) modifying the window duration as
per your requirements.

Happy streaming!

TD




On Mon, Jun 2, 2014 at 2:46 PM, lbustelo  wrote:

> This is a general question about whether Spark Streaming can be interactive
> like batch Spark jobs. I've read plenty of threads and done my fair bit of
> experimentation and I'm thinking the answer is NO, but it does not hurt to
> ask.
>
> More specifically, I would like to be able to do:
> 1. Add/Remove steps to the Streaming Job
> 2. Modify Window durations
> 3. Stop and Restart context.
>
> I've tried the following:
>
> 1. Modify the DStream after it has been started… BOOM! Exceptions
> everywhere.
>
> 2. Stop the DStream, Make modification, Start… NOT GOOD :( In 0.9.0 I was
> getting deadlocks. I also tried 1.0.0 and it did not work.
>
> 3. Based on information provided  here
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3371.html
> >
> , I was been able to prototype modifying the RDD computation within a
> forEachRDD. That is nice, but you are then bounded to the specified batch
> size. That got me to wanting to modify Window durations. Is changing the
> Window duration possible?
>
> 4. Tried running multiple streaming context from within a single Driver
> application and got several exceptions. The first one was bind exception on
> the web port. Then once the app started getting run (cores were taken but
> 1st job) it did not run correctly. A lot of
> "akka.pattern.AskTimeoutException: Timed out"
> .
>
> I've tried my experiments in 0.9.0, 0.9.1 and 1.0.0 running on Standalone
> Cluster setup.
> Thanks in advanced
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Interactive-modification-of-DStreams-tp6740.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Processing audio/video/images

2014-06-02 Thread Philip Ogren
I asked a question related to Marcelo's answer a few months ago. The 
discussion there may be useful:


http://apache-spark-user-list.1001560.n3.nabble.com/RDD-URI-td1054.html


On 06/02/2014 06:09 PM, Marcelo Vanzin wrote:

Hi Jamal,

If what you want is to process lots of files in parallel, the best
approach is probably to load all file names into an array and
parallelize that. Then each task will take a path as input and can
process it however it wants.

Or you could write the file list to a file, and then use sc.textFile()
to open it (assuming one path per line), and the rest is pretty much
the same as above.

It will probably be hard to process each individual file in parallel,
unless mp3 and jpg files can be split into multiple blocks that can be
processed separately. In that case, you'd need a custom (Hadoop) input
format that is able to calculate the splits. But it doesn't sound like
that's what you want.



On Mon, Jun 2, 2014 at 5:02 PM, jamal sasha  wrote:

Hi,
   How do one process for data sources other than text?
Lets say I have millions of mp3 (or jpeg) files and I want to use spark to
process them?
How does one go about it.


I have never been able to figure this out..
Lets say I have this library in python which works like following:

import audio

song = audio.read_mp3(filename)

Then most of the methods are attached to song or maybe there is another
function which takes "song" type as an input.

Maybe the above is just rambling.. but how do I use spark to process (say)
audiio files.
Thanks







Re: Processing audio/video/images

2014-06-02 Thread jamal sasha
Hi Marcelo,
  Thanks for the response..
I am not sure I understand. Can you elaborate a bit.
So, for example, lets take a look at this example
http://pythonvision.org/basic-tutorial

import mahotas
dna = mahotas.imread('dna.jpeg')
dnaf = ndimage.gaussian_filter(dna, 8)

But except dna.jpeg Lets say, I have millions of dna.jpeg and I want to run
the above logic on all the millions files.
How should I go about this?
Thanks

On Mon, Jun 2, 2014 at 5:09 PM, Marcelo Vanzin  wrote:

> Hi Jamal,
>
> If what you want is to process lots of files in parallel, the best
> approach is probably to load all file names into an array and
> parallelize that. Then each task will take a path as input and can
> process it however it wants.
>
> Or you could write the file list to a file, and then use sc.textFile()
> to open it (assuming one path per line), and the rest is pretty much
> the same as above.
>
> It will probably be hard to process each individual file in parallel,
> unless mp3 and jpg files can be split into multiple blocks that can be
> processed separately. In that case, you'd need a custom (Hadoop) input
> format that is able to calculate the splits. But it doesn't sound like
> that's what you want.
>
>
>
> On Mon, Jun 2, 2014 at 5:02 PM, jamal sasha  wrote:
> > Hi,
> >   How do one process for data sources other than text?
> > Lets say I have millions of mp3 (or jpeg) files and I want to use spark
> to
> > process them?
> > How does one go about it.
> >
> >
> > I have never been able to figure this out..
> > Lets say I have this library in python which works like following:
> >
> > import audio
> >
> > song = audio.read_mp3(filename)
> >
> > Then most of the methods are attached to song or maybe there is another
> > function which takes "song" type as an input.
> >
> > Maybe the above is just rambling.. but how do I use spark to process
> (say)
> > audiio files.
> > Thanks
>
>
>
> --
> Marcelo
>


Re: Processing audio/video/images

2014-06-02 Thread jamal sasha
Thanks. Let me go thru it.


On Mon, Jun 2, 2014 at 5:15 PM, Philip Ogren 
wrote:

> I asked a question related to Marcelo's answer a few months ago. The
> discussion there may be useful:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-URI-td1054.html
>
>
>
> On 06/02/2014 06:09 PM, Marcelo Vanzin wrote:
>
>> Hi Jamal,
>>
>> If what you want is to process lots of files in parallel, the best
>> approach is probably to load all file names into an array and
>> parallelize that. Then each task will take a path as input and can
>> process it however it wants.
>>
>> Or you could write the file list to a file, and then use sc.textFile()
>> to open it (assuming one path per line), and the rest is pretty much
>> the same as above.
>>
>> It will probably be hard to process each individual file in parallel,
>> unless mp3 and jpg files can be split into multiple blocks that can be
>> processed separately. In that case, you'd need a custom (Hadoop) input
>> format that is able to calculate the splits. But it doesn't sound like
>> that's what you want.
>>
>>
>>
>> On Mon, Jun 2, 2014 at 5:02 PM, jamal sasha 
>> wrote:
>>
>>> Hi,
>>>How do one process for data sources other than text?
>>> Lets say I have millions of mp3 (or jpeg) files and I want to use spark
>>> to
>>> process them?
>>> How does one go about it.
>>>
>>>
>>> I have never been able to figure this out..
>>> Lets say I have this library in python which works like following:
>>>
>>> import audio
>>>
>>> song = audio.read_mp3(filename)
>>>
>>> Then most of the methods are attached to song or maybe there is another
>>> function which takes "song" type as an input.
>>>
>>> Maybe the above is just rambling.. but how do I use spark to process
>>> (say)
>>> audiio files.
>>> Thanks
>>>
>>
>>
>>
>


Window slide duration

2014-06-02 Thread Vadim Chekan
Hi all,

I am getting an error:

14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid as
zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference is
6000 ms
14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 ms


My relevant code is:
===
ssc =  new StreamingContext(conf, Seconds(1))
val messageEvents = events.
  flatMap(e => evaluatorCached.value.find(e)).
  window(Seconds(8), Seconds(4))
messageEvents.print()
===

Seems all right to me, window slide duration (4) is streaming context batch
duration (1) *2. So, what's the problem?

Spark-v1.0.0

-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified


Re: Processing audio/video/images

2014-06-02 Thread Marcelo Vanzin
The idea is simple. If you want to run something on a collection of
files, do (in pseudo-python):

def processSingleFile(path):
  # Your code to process a file

files = [ "file1", "file2" ]
sc.parallelize(files).foreach(processSingleFile)


On Mon, Jun 2, 2014 at 5:16 PM, jamal sasha  wrote:
> Hi Marcelo,
>   Thanks for the response..
> I am not sure I understand. Can you elaborate a bit.
> So, for example, lets take a look at this example
> http://pythonvision.org/basic-tutorial
>
> import mahotas
> dna = mahotas.imread('dna.jpeg')
> dnaf = ndimage.gaussian_filter(dna, 8)
>
> But except dna.jpeg Lets say, I have millions of dna.jpeg and I want to run
> the above logic on all the millions files.
> How should I go about this?
> Thanks
>
> On Mon, Jun 2, 2014 at 5:09 PM, Marcelo Vanzin  wrote:
>>
>> Hi Jamal,
>>
>> If what you want is to process lots of files in parallel, the best
>> approach is probably to load all file names into an array and
>> parallelize that. Then each task will take a path as input and can
>> process it however it wants.
>>
>> Or you could write the file list to a file, and then use sc.textFile()
>> to open it (assuming one path per line), and the rest is pretty much
>> the same as above.
>>
>> It will probably be hard to process each individual file in parallel,
>> unless mp3 and jpg files can be split into multiple blocks that can be
>> processed separately. In that case, you'd need a custom (Hadoop) input
>> format that is able to calculate the splits. But it doesn't sound like
>> that's what you want.
>>
>>
>>
>> On Mon, Jun 2, 2014 at 5:02 PM, jamal sasha  wrote:
>> > Hi,
>> >   How do one process for data sources other than text?
>> > Lets say I have millions of mp3 (or jpeg) files and I want to use spark
>> > to
>> > process them?
>> > How does one go about it.
>> >
>> >
>> > I have never been able to figure this out..
>> > Lets say I have this library in python which works like following:
>> >
>> > import audio
>> >
>> > song = audio.read_mp3(filename)
>> >
>> > Then most of the methods are attached to song or maybe there is another
>> > function which takes "song" type as an input.
>> >
>> > Maybe the above is just rambling.. but how do I use spark to process
>> > (say)
>> > audiio files.
>> > Thanks
>>
>>
>>
>> --
>> Marcelo
>
>



-- 
Marcelo


Re: Processing audio/video/images

2014-06-02 Thread jamal sasha
Phoofff.. (Mind blown)...
Thank you sir.
This is awesome


On Mon, Jun 2, 2014 at 5:23 PM, Marcelo Vanzin  wrote:

> The idea is simple. If you want to run something on a collection of
> files, do (in pseudo-python):
>
> def processSingleFile(path):
>   # Your code to process a file
>
> files = [ "file1", "file2" ]
> sc.parallelize(files).foreach(processSingleFile)
>
>
> On Mon, Jun 2, 2014 at 5:16 PM, jamal sasha  wrote:
> > Hi Marcelo,
> >   Thanks for the response..
> > I am not sure I understand. Can you elaborate a bit.
> > So, for example, lets take a look at this example
> > http://pythonvision.org/basic-tutorial
> >
> > import mahotas
> > dna = mahotas.imread('dna.jpeg')
> > dnaf = ndimage.gaussian_filter(dna, 8)
> >
> > But except dna.jpeg Lets say, I have millions of dna.jpeg and I want to
> run
> > the above logic on all the millions files.
> > How should I go about this?
> > Thanks
> >
> > On Mon, Jun 2, 2014 at 5:09 PM, Marcelo Vanzin 
> wrote:
> >>
> >> Hi Jamal,
> >>
> >> If what you want is to process lots of files in parallel, the best
> >> approach is probably to load all file names into an array and
> >> parallelize that. Then each task will take a path as input and can
> >> process it however it wants.
> >>
> >> Or you could write the file list to a file, and then use sc.textFile()
> >> to open it (assuming one path per line), and the rest is pretty much
> >> the same as above.
> >>
> >> It will probably be hard to process each individual file in parallel,
> >> unless mp3 and jpg files can be split into multiple blocks that can be
> >> processed separately. In that case, you'd need a custom (Hadoop) input
> >> format that is able to calculate the splits. But it doesn't sound like
> >> that's what you want.
> >>
> >>
> >>
> >> On Mon, Jun 2, 2014 at 5:02 PM, jamal sasha 
> wrote:
> >> > Hi,
> >> >   How do one process for data sources other than text?
> >> > Lets say I have millions of mp3 (or jpeg) files and I want to use
> spark
> >> > to
> >> > process them?
> >> > How does one go about it.
> >> >
> >> >
> >> > I have never been able to figure this out..
> >> > Lets say I have this library in python which works like following:
> >> >
> >> > import audio
> >> >
> >> > song = audio.read_mp3(filename)
> >> >
> >> > Then most of the methods are attached to song or maybe there is
> another
> >> > function which takes "song" type as an input.
> >> >
> >> > Maybe the above is just rambling.. but how do I use spark to process
> >> > (say)
> >> > audiio files.
> >> > Thanks
> >>
> >>
> >>
> >> --
> >> Marcelo
> >
> >
>
>
>
> --
> Marcelo
>


Re: SecurityException when running tests with Spark 1.0.0

2014-06-02 Thread Mohit Nayak
Hey,
Yup that fixed it. Thanks so much!

Is this the only solution, or could this be resolved in future versions of
Spark ?


On Mon, Jun 2, 2014 at 5:14 PM, Sean Owen  wrote:

> If it's the SBT build, I suspect you are hitting
> https://issues.apache.org/jira/browse/SPARK-1949
>
> Can you try to apply the excludes you see at
> https://github.com/apache/spark/pull/906/files to your build to see if
> it resolves it?
>
> If so I think this could be helpful to commit.
>
> On Tue, Jun 3, 2014 at 1:01 AM, Mohit Nayak  wrote:
> > Hey,
> > Thanks for the reply.
> >
> > I am using SBT. Here is a list of my dependancies:
> > val sparkCore= "org.apache.spark" % "spark-core_2.10" % V.spark
> > val hadoopCore   = "org.apache.hadoop" % "hadoop-core"   %
> > V.hadoop% "provided"
> > val jodaTime = "com.github.nscala-time" %% "nscala-time" %
> > "0.8.0"
> > val scalaUtil= "com.twitter"   %% "util-collection"  %
> > V.util
> > val logback  = "ch.qos.logback" % "logback-classic" % "1.0.6" %
> > "runtime"
> > var openCsv  = "net.sf.opencsv" % "opencsv" % "2.1"
> > var scalaTest= "org.scalatest" % "scalatest_2.10" % "2.1.0" %
> "test"
> > var scalaIOCore  = "com.github.scala-incubator.io" %%
> "scala-io-core" %
> > V.scalaIO
> > var scalaIOFile  = "com.github.scala-incubator.io" %%
> "scala-io-file" %
> > V.scalaIO
> > var kryo = "com.esotericsoftware.kryo" % "kryo" % "2.16"
> > var spray = "io.spray" %%  "spray-json" % "1.2.5"
> > var scala_reflect = "org.scala-lang" % "scala-reflect" % "2.10.3"
> >
> >
> >
> > On Mon, Jun 2, 2014 at 4:23 PM, Sean Owen  wrote:
> >>
> >> This ultimately means you have a couple copies of the servlet APIs in
> >> the build. What is your build like (SBT? Maven?) and what exactly are
> >> you depending on?
> >>
> >> On Tue, Jun 3, 2014 at 12:21 AM, Mohit Nayak  wrote:
> >> > Hi,
> >> > I've upgraded to Spark 1.0.0. I'm not able to run any tests. They
> throw
> >> > a
> >> >
> >> > java.lang.SecurityException: class
> "javax.servlet.FilterRegistration"'s
> >> > signer information does not match signer information of other classes
> in
> >> > the
> >> > same package
> >> >
> >> >
> >> > I'm using Hadoop-core 1.0.4 and running this locally.
> >> > I noticed that there was an issue regarding this and was marked as
> >> > resolved
> >> > [https://issues.apache.org/jira/browse/SPARK-1693]
> >> > Please guide..
> >> >
> >> > --
> >> > -Mohit
> >> > wiza...@gmail.com
> >> >
> >> >
> >> >
> >> > --
> >> > -Mohit
> >> > wiza...@gmail.com
> >
> >
> >
> >
> > --
> > -Mohit
> > wiza...@gmail.com
>



-- 
-Mohit
wiza...@gmail.com


Re: Window slide duration

2014-06-02 Thread Tathagata Das
I am assuming that you are referring to the "OneForOneStrategy: key not
found: 1401753992000 ms" error, and not to the previous "Time 1401753992000
ms is invalid ...". Those two seem a little unrelated to me. Can you give
us the stacktrace associated with the key-not-found error?

TD


On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan  wrote:

> Hi all,
>
> I am getting an error:
> 
> 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid
> as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference
> is 6000 ms
> 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 ms
> 
>
> My relevant code is:
> ===
> ssc =  new StreamingContext(conf, Seconds(1))
> val messageEvents = events.
>   flatMap(e => evaluatorCached.value.find(e)).
>   window(Seconds(8), Seconds(4))
> messageEvents.print()
> ===
>
> Seems all right to me, window slide duration (4) is streaming context
> batch duration (1) *2. So, what's the problem?
>
> Spark-v1.0.0
>
> --
> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
> explicitly specified
>


Re: SecurityException when running tests with Spark 1.0.0

2014-06-02 Thread Matei Zaharia
You can just use the Maven build for now, even for Spark 1.0.0.

Matei

On Jun 2, 2014, at 5:30 PM, Mohit Nayak  wrote:

> Hey,
> Yup that fixed it. Thanks so much!
>  
> Is this the only solution, or could this be resolved in future versions of 
> Spark ?
> 
> 
> On Mon, Jun 2, 2014 at 5:14 PM, Sean Owen  wrote:
> If it's the SBT build, I suspect you are hitting
> https://issues.apache.org/jira/browse/SPARK-1949
> 
> Can you try to apply the excludes you see at
> https://github.com/apache/spark/pull/906/files to your build to see if
> it resolves it?
> 
> If so I think this could be helpful to commit.
> 
> On Tue, Jun 3, 2014 at 1:01 AM, Mohit Nayak  wrote:
> > Hey,
> > Thanks for the reply.
> >
> > I am using SBT. Here is a list of my dependancies:
> > val sparkCore= "org.apache.spark" % "spark-core_2.10" % V.spark
> > val hadoopCore   = "org.apache.hadoop" % "hadoop-core"   %
> > V.hadoop% "provided"
> > val jodaTime = "com.github.nscala-time" %% "nscala-time" %
> > "0.8.0"
> > val scalaUtil= "com.twitter"   %% "util-collection"  %
> > V.util
> > val logback  = "ch.qos.logback" % "logback-classic" % "1.0.6" %
> > "runtime"
> > var openCsv  = "net.sf.opencsv" % "opencsv" % "2.1"
> > var scalaTest= "org.scalatest" % "scalatest_2.10" % "2.1.0" % "test"
> > var scalaIOCore  = "com.github.scala-incubator.io" %% "scala-io-core" %
> > V.scalaIO
> > var scalaIOFile  = "com.github.scala-incubator.io" %% "scala-io-file" %
> > V.scalaIO
> > var kryo = "com.esotericsoftware.kryo" % "kryo" % "2.16"
> > var spray = "io.spray" %%  "spray-json" % "1.2.5"
> > var scala_reflect = "org.scala-lang" % "scala-reflect" % "2.10.3"
> >
> >
> >
> > On Mon, Jun 2, 2014 at 4:23 PM, Sean Owen  wrote:
> >>
> >> This ultimately means you have a couple copies of the servlet APIs in
> >> the build. What is your build like (SBT? Maven?) and what exactly are
> >> you depending on?
> >>
> >> On Tue, Jun 3, 2014 at 12:21 AM, Mohit Nayak  wrote:
> >> > Hi,
> >> > I've upgraded to Spark 1.0.0. I'm not able to run any tests. They throw
> >> > a
> >> >
> >> > java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s
> >> > signer information does not match signer information of other classes in
> >> > the
> >> > same package
> >> >
> >> >
> >> > I'm using Hadoop-core 1.0.4 and running this locally.
> >> > I noticed that there was an issue regarding this and was marked as
> >> > resolved
> >> > [https://issues.apache.org/jira/browse/SPARK-1693]
> >> > Please guide..
> >> >
> >> > --
> >> > -Mohit
> >> > wiza...@gmail.com
> >> >
> >> >
> >> >
> >> > --
> >> > -Mohit
> >> > wiza...@gmail.com
> >
> >
> >
> >
> > --
> > -Mohit
> > wiza...@gmail.com
> 
> 
> 
> -- 
> -Mohit
> wiza...@gmail.com



Re: Window slide duration

2014-06-02 Thread Vadim Chekan
Ok, it seems like "Time ... is invalid" is part of normal workflow, when
window DStream will ignore RDDs at moments in time when they do not match
to the window sliding interval. But why am I getting exception is still
unclear. Here is the full stack:

14/06/02 17:21:48 INFO WindowedDStream: Time 1401754908000 ms is invalid as
zeroTime is 1401754907000 ms and slideDuration is 4000 ms and difference is
1000 ms
14/06/02 17:21:48 ERROR OneForOneStrategy: key not found: 1401754908000 ms
java.util.NoSuchElementException: key not found: 1401754908000 ms
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan  wrote:

> Hi all,
>
> I am getting an error:
> 
> 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid
> as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference
> is 6000 ms
> 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 ms
> 
>
> My relevant code is:
> ===
> ssc =  new StreamingContext(conf, Seconds(1))
> val messageEvents = events.
>   flatMap(e => evaluatorCached.value.find(e)).
>   window(Seconds(8), Seconds(4))
> messageEvents.print()
> ===
>
> Seems all right to me, window slide duration (4) is streaming context
> batch duration (1) *2. So, what's the problem?
>
> Spark-v1.0.0
>
> --
> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
> explicitly specified
>



-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified


Re: NoSuchElementException: key not found

2014-06-02 Thread Tathagata Das
Do you have the info level logs of the application? Can you grep the
value "32855"
to find any references to it? Also what version of the Spark are you using
(so that I can match the stack trace, does not seem to match with Spark
1.0)?

TD


On Mon, Jun 2, 2014 at 3:27 PM, Michael Chang  wrote:

> Hi all,
>
> Seeing a random exception kill my spark streaming job. Here's a stack
> trace:
>
> java.util.NoSuchElementException: key not found: 32855
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> at
> org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:211)
>  at
> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1072)
> at
> org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:716)
> at
> org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:172)
> at
> org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:189)
> at
> org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:188)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:351)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
> at
> org.apache.spark.rdd.PartitionCoalescer$LocationIterator.(CoalescedRDD.scala:183)
> at
> org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:234)
> at
> org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:333)
> at
> org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:81)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> at
> org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:31)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> at org.apache.spark.rdd.RDD.take(RDD.scala:830)
> at
> org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:337)
> at org.apache.spark.api.java.JavaRDD.take(JavaRDD.scala:27)
> at
> com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:87)
> at
> com.tellapart.manifolds.spark.ManifoldsUtil$PersistToKafkaFunction.call(ManifoldsUtil.java:53)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:270)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:520)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> at
> org.apache.spark.str

Re: Window slide duration

2014-06-02 Thread Tathagata Das
Can you give all the logs? Would like to see what is clearing the key
" 1401754908000
ms"

TD


On Mon, Jun 2, 2014 at 5:38 PM, Vadim Chekan  wrote:

> Ok, it seems like "Time ... is invalid" is part of normal workflow, when
> window DStream will ignore RDDs at moments in time when they do not match
> to the window sliding interval. But why am I getting exception is still
> unclear. Here is the full stack:
>
> 14/06/02 17:21:48 INFO WindowedDStream: Time 1401754908000 ms is invalid
> as zeroTime is 1401754907000 ms and slideDuration is 4000 ms and difference
> is 1000 ms
> 14/06/02 17:21:48 ERROR OneForOneStrategy: key not found: 1401754908000 ms
> java.util.NoSuchElementException: key not found: 1401754908000 ms
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> at
> org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan 
> wrote:
>
>> Hi all,
>>
>> I am getting an error:
>> 
>> 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid
>> as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference
>> is 6000 ms
>> 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000 ms
>> 
>>
>> My relevant code is:
>> ===
>> ssc =  new StreamingContext(conf, Seconds(1))
>> val messageEvents = events.
>>   flatMap(e => evaluatorCached.value.find(e)).
>>   window(Seconds(8), Seconds(4))
>> messageEvents.print()
>> ===
>>
>> Seems all right to me, window slide duration (4) is streaming context
>> batch duration (1) *2. So, what's the problem?
>>
>> Spark-v1.0.0
>>
>> --
>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
>> explicitly specified
>>
>
>
>
> --
> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
> explicitly specified
>


how to construct a ClassTag object as a method parameter in Java

2014-06-02 Thread bluejoe2008
hi,all
i am programming with Spark in Java, and now i have a question:
when i made a method call on a JavaRDD such as:

textInput.mapPartitionsWithIndex(
new Function2, Iterator>()
{...},
false,
PARAM3
);

what value should i pass as the PARAM3 parameter?
it is required as a ClassTag value, then how can i define such a value in Java? 
i really have no idea...

best regards,
bluejoe2008

Re: Failed to remove RDD error

2014-06-02 Thread Tathagata Das
Spark.streaming.unpersist was an experimental feature introduced with Spark
0.9 (but kept disabled), which actively clears off RDDs that are not useful
any more. in Spark 1.0 that has been enabled by default. It is possible
that this is an unintended side-effect of that. If spark.cleaner.ttl works
then that should be used.

TD


On Mon, Jun 2, 2014 at 9:42 AM, Michael Chang  wrote:

> Hey Mayur,
>
> Thanks for the suggestion, I didn't realize that was configurable.  I
> don't think I'm running out of memory, though it does seem like these
> errors go away when i turn off the spark.streaming.unpersist configuration
> and use spark.cleaner.ttl instead.  Do you know if there are known issues
> with the unpersist option?
>
>
> On Sat, May 31, 2014 at 12:17 AM, Mayur Rustagi 
> wrote:
>
>> You can increase your akka timeout, should give you some more life.. are
>> you running out of memory by any chance?
>>
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Sat, May 31, 2014 at 6:52 AM, Michael Chang 
>> wrote:
>>
>>> I'm running a some kafka streaming spark contexts (on 0.9.1), and they
>>> seem to be dying after 10 or so minutes with a lot of these errors.  I
>>> can't really tell what's going on here, except that maybe the driver is
>>> unresponsive somehow?  Has anyone seen this before?
>>>
>>> 14/05/31 01:13:30 ERROR BlockManagerMaster: Failed to remove RDD 12635
>>>
>>> akka.pattern.AskTimeoutException: Timed out
>>>
>>> at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>>>
>>> at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118)
>>>
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:691)
>>>
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:688)
>>>
>>> at
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455)
>>>
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407)
>>>
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411)
>>>
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363)
>>>
>>> at java.lang.Thread.run(Thread.java:744)
>>>
>>> Thanks,
>>>
>>> Mike
>>>
>>>
>>>
>>
>


Re: how to construct a ClassTag object as a method parameter in Java

2014-06-02 Thread Michael Armbrust
What version of Spark are you using?  Also are you sure the type of
textInput is a JavaRDD and not an RDD?

It looks like the 1.0 Java API
does not require a class tag.


On Mon, Jun 2, 2014 at 5:59 PM, bluejoe2008  wrote:

>  hi,all
>i am programming with Spark in Java, and now i have a question:
> when i made a method call on a JavaRDD such as:
>
> textInput.mapPartitionsWithIndex(
> new Function2, Iterator>()
> {...},
> false,
> PARAM3
> );
>
> what value should i pass as the PARAM3 parameter?
> it is required as a ClassTag value, then how can i define such a value in
> Java? i really have no idea...
>
> best regards,
> bluejoe2008
>
>


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Patrick Wendell
(A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's
output format check and overwrite files in the destination directory.
But it won't clobber the directory entirely. I.e. if the directory
already had "part1" "part2" "part3" "part4" and you write a new job
outputing only two files ("part1", "part2") then it would leave the
other two files intact, confusingly.

(B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check
which means the directory must not exist already or an excpetion is
thrown.

(C) Semantics proposed by Nicholas Chammas in this thread (AFAIK):
Spark will delete/clobber an existing destination directory if it
exists, then fully over-write it with new data.

I'm fine to add a flag that allows (B) for backwards-compatibility
reasons, but my point was I'd prefer not to have (C) even though I see
some cases where it would be useful.

- Patrick

On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen  wrote:
> Is there a third way? Unless I miss something. Hadoop's OutputFormat
> wants the target dir to not exist no matter what, so it's just a
> question of whether Spark deletes it for you or errors.
>
> On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell  wrote:
>> We can just add back a flag to make it backwards compatible - it was
>> just missed during the original PR.
>>
>> Adding a *third* set of "clobber" semantics, I'm slightly -1 on that
>> for the following reasons:
>>
>> 1. It's scary to have Spark recursively deleting user files, could
>> easily lead to users deleting data by mistake if they don't understand
>> the exact semantics.
>> 2. It would introduce a third set of semantics here for saveAsXX...
>> 3. It's trivial for users to implement this with two lines of code (if
>> output dir exists, delete it) before calling saveAsHadoopFile.
>>
>> - Patrick
>>


Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Patrick Wendell
Yeah we need to add a build warning to the Maven build. Would you be
able to try compiling Spark with Java 6? It would be good to narrow
down if you hare hitting this problem or something else.

On Mon, Jun 2, 2014 at 1:15 PM, Xu (Simon) Chen  wrote:
> Nope... didn't try java 6. The standard installation guide didn't say
> anything about java 7 and suggested to do "-DskipTests" for the build..
> http://spark.apache.org/docs/latest/building-with-maven.html
>
> So, I didn't see the warning message...
>
>
> On Mon, Jun 2, 2014 at 3:48 PM, Patrick Wendell  wrote:
>>
>> Are you building Spark with Java 6 or Java 7. Java 6 uses the extended
>> Zip format and Java 7 uses Zip64. I think we've tried to add some
>> build warnings if Java 7 is used, for this reason:
>>
>> https://github.com/apache/spark/blob/master/make-distribution.sh#L102
>>
>> Any luck if you use JDK 6 to compile?
>>
>>
>> On Mon, Jun 2, 2014 at 12:03 PM, Xu (Simon) Chen 
>> wrote:
>> > OK, my colleague found this:
>> > https://mail.python.org/pipermail/python-list/2014-May/671353.html
>> >
>> > And my jar file has 70011 files. Fantastic..
>> >
>> >
>> >
>> >
>> > On Mon, Jun 2, 2014 at 2:34 PM, Xu (Simon) Chen 
>> > wrote:
>> >>
>> >> I asked several people, no one seems to believe that we can do this:
>> >> $ PYTHONPATH=/path/to/assembly/jar python
>> >> >>> import pyspark
>> >>
>> >> This following pull request did mention something about generating a
>> >> zip
>> >> file for all python related modules:
>> >> https://www.mail-archive.com/reviews@spark.apache.org/msg08223.html
>> >>
>> >> I've tested that zipped modules can as least be imported via zipimport.
>> >>
>> >> Any ideas?
>> >>
>> >> -Simon
>> >>
>> >>
>> >>
>> >> On Mon, Jun 2, 2014 at 11:50 AM, Andrew Or 
>> >> wrote:
>> >>>
>> >>> Hi Simon,
>> >>>
>> >>> You shouldn't have to install pyspark on every worker node. In YARN
>> >>> mode,
>> >>> pyspark is packaged into your assembly jar and shipped to your
>> >>> executors
>> >>> automatically. This seems like a more general problem. There are a few
>> >>> things to try:
>> >>>
>> >>> 1) Run a simple pyspark shell with yarn-client, and do
>> >>> "sc.parallelize(range(10)).count()" to see if you get the same error
>> >>> 2) If so, check if your assembly jar is compiled correctly. Run
>> >>>
>> >>> $ jar -tf  pyspark
>> >>> $ jar -tf  py4j
>> >>>
>> >>> to see if the files are there. For Py4j, you need both the python
>> >>> files
>> >>> and the Java class files.
>> >>>
>> >>> 3) If the files are there, try running a simple python shell (not
>> >>> pyspark
>> >>> shell) with the assembly jar on the PYTHONPATH:
>> >>>
>> >>> $ PYTHONPATH=/path/to/assembly/jar python
>> >>> >>> import pyspark
>> >>>
>> >>> 4) If that works, try it on every worker node. If it doesn't work,
>> >>> there
>> >>> is probably something wrong with your jar.
>> >>>
>> >>> There is a known issue for PySpark on YARN - jars built with Java 7
>> >>> cannot be properly opened by Java 6. I would either verify that the
>> >>> JAVA_HOME set on all of your workers points to Java 7 (by setting
>> >>> SPARK_YARN_USER_ENV), or simply build your jar with Java 6:
>> >>>
>> >>> $ cd /path/to/spark/home
>> >>> $ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
>> >>> 2.3.0-cdh5.0.0
>> >>>
>> >>> 5) You can check out
>> >>>
>> >>> http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application,
>> >>> which has more detailed information about how to debug running an
>> >>> application on YARN in general. In my experience, the steps outlined
>> >>> there
>> >>> are quite useful.
>> >>>
>> >>> Let me know if you get it working (or not).
>> >>>
>> >>> Cheers,
>> >>> Andrew
>> >>>
>> >>>
>> >>>
>> >>> 2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :
>> >>>
>>  Hi folks,
>> 
>>  I have a weird problem when using pyspark with yarn. I started
>>  ipython
>>  as follows:
>> 
>>  IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4
>>  --num-executors 4 --executor-memory 4G
>> 
>>  When I create a notebook, I can see workers being created and indeed
>>  I
>>  see spark UI running on my client machine on port 4040.
>> 
>>  I have the following simple script:
>>  """
>>  import pyspark
>>  data = sc.textFile("hdfs://test/tmp/data/*").cache()
>>  oneday = data.map(lambda line: line.split(",")).\
>>    map(lambda f: (f[0], float(f[1]))).\
>>    filter(lambda t: t[0] >= "2013-01-01" and t[0] <
>>  "2013-01-02").\
>>    map(lambda t: (parser.parse(t[0]), t[1]))
>>  oneday.take(1)
>>  """
>> 
>>  By executing this, I see that it is my client machine (where ipython
>>  is
>>  launched) is reading all the data from HDFS, and produce the result
>>  of
>>  take(1), rather than my worker nodes...
>> 
>>  When I do "data.count()", things would blow up altogether. But I do
>>  see
>>  in the erro

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nan Zhu
I remember that in the earlier version of that PR, I deleted files by calling 
HDFS API

we discussed and concluded that, it’s a bit scary to have something directly 
deleting user’s files in Spark

Best,  

--  
Nan Zhu


On Monday, June 2, 2014 at 10:39 PM, Patrick Wendell wrote:

> (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's
> output format check and overwrite files in the destination directory.
> But it won't clobber the directory entirely. I.e. if the directory
> already had "part1" "part2" "part3" "part4" and you write a new job
> outputing only two files ("part1", "part2") then it would leave the
> other two files intact, confusingly.
>  
> (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check
> which means the directory must not exist already or an excpetion is
> thrown.
>  
> (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK):
> Spark will delete/clobber an existing destination directory if it
> exists, then fully over-write it with new data.
>  
> I'm fine to add a flag that allows (B) for backwards-compatibility
> reasons, but my point was I'd prefer not to have (C) even though I see
> some cases where it would be useful.
>  
> - Patrick
>  
> On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen  (mailto:so...@cloudera.com)> wrote:
> > Is there a third way? Unless I miss something. Hadoop's OutputFormat
> > wants the target dir to not exist no matter what, so it's just a
> > question of whether Spark deletes it for you or errors.
> >  
> > On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell  > (mailto:pwend...@gmail.com)> wrote:
> > > We can just add back a flag to make it backwards compatible - it was
> > > just missed during the original PR.
> > >  
> > > Adding a *third* set of "clobber" semantics, I'm slightly -1 on that
> > > for the following reasons:
> > >  
> > > 1. It's scary to have Spark recursively deleting user files, could
> > > easily lead to users deleting data by mistake if they don't understand
> > > the exact semantics.
> > > 2. It would introduce a third set of semantics here for saveAsXX...
> > > 3. It's trivial for users to implement this with two lines of code (if
> > > output dir exists, delete it) before calling saveAsHadoopFile.
> > >  
> > > - Patrick  



A single build.sbt file to start Spark REPL?

2014-06-02 Thread Alexy Khrabrov
The usual way to use Spark with SBT is to package a Spark project using sbt 
package (e.g. per Quick Start) and submit it to Spark using the bin/ scripts 
from Sark distribution.  For plain Scala project, you don’t need to download 
anything, you can just get a build.sbt file with dependencies and e.g. say 
“console” which will start a Scala REPL with the dependencies on the class 
path.  Is there a way to avoid downloading Spark tarball completely, by 
defining the spark-core dependency in build.sbt, and using `run` or `console` 
to invoke Spark REPL from sbt?  I.e. the goal is: create a single build.sbt 
file, such that if you run sbt in its directory, and then say run/console (with 
optional parameters), it will download all Spark dependencies and start the 
REPL.  Should work on a fresh machine where Spark tarball had never been 
untarred.

A+

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nicholas Chammas
On Mon, Jun 2, 2014 at 10:39 PM, Patrick Wendell  wrote:

> (B) Semantics in Spark 1.0 and earlier:


Do you mean 1.0 and later?

Option (B) with the exception-on-clobber sounds fine to me, btw. My use
pattern is probably common but not universal, and deleting user files is
indeed scary.

Nick


Re: Re: how to construct a ClassTag object as a method parameter in Java

2014-06-02 Thread bluejoe2008
spark 0.9.1
textInput is a JavaRDD object
i am programming in Java

2014-06-03 


bluejoe2008

From: Michael Armbrust
Date: 2014-06-03 10:09
To: user
Subject: Re: how to construct a ClassTag object as a method parameter in Java
What version of Spark are you using?  Also are you sure the type of textInput 
is a JavaRDD and not an RDD?


It looks like the 1.0 Java API does not require a class tag.



On Mon, Jun 2, 2014 at 5:59 PM, bluejoe2008  wrote:

hi,all
i am programming with Spark in Java, and now i have a question:
when i made a method call on a JavaRDD such as:

textInput.mapPartitionsWithIndex(
new Function2, Iterator>()
{...},
false,
PARAM3
);

what value should i pass as the PARAM3 parameter?
it is required as a ClassTag value, then how can i define such a value in Java? 
i really have no idea...

best regards,
bluejoe2008

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Kexin Xie
+1 on Option (B) with flag to allow semantics in (A) for back compatibility.

Kexin


On Tue, Jun 3, 2014 at 1:18 PM, Nicholas Chammas  wrote:

> On Mon, Jun 2, 2014 at 10:39 PM, Patrick Wendell 
> wrote:
>
>> (B) Semantics in Spark 1.0 and earlier:
>
>
> Do you mean 1.0 and later?
>
> Option (B) with the exception-on-clobber sounds fine to me, btw. My use
> pattern is probably common but not universal, and deleting user files is
> indeed scary.
>
> Nick
>


Re: Window slide duration

2014-06-02 Thread Vadim Chekan
Thanks for looking into this Tathagata.

Are you looking for traces of ReceiveInputDStream.clearMetadata call?
Here is the log: http://wepaste.com/vchekan

Vadim.


On Mon, Jun 2, 2014 at 5:58 PM, Tathagata Das 
wrote:

> Can you give all the logs? Would like to see what is clearing the key " 
> 1401754908000
> ms"
>
> TD
>
>
> On Mon, Jun 2, 2014 at 5:38 PM, Vadim Chekan 
> wrote:
>
>> Ok, it seems like "Time ... is invalid" is part of normal workflow, when
>> window DStream will ignore RDDs at moments in time when they do not match
>> to the window sliding interval. But why am I getting exception is still
>> unclear. Here is the full stack:
>>
>> 14/06/02 17:21:48 INFO WindowedDStream: Time 1401754908000 ms is invalid
>> as zeroTime is 1401754907000 ms and slideDuration is 4000 ms and difference
>> is 1000 ms
>> 14/06/02 17:21:48 ERROR OneForOneStrategy: key not found: 1401754908000 ms
>> java.util.NoSuchElementException: key not found: 1401754908000 ms
>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>> at scala.collection.AbstractMap.default(Map.scala:58)
>> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>> at
>> org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am getting an error:
>>> 
>>> 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid
>>> as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference
>>> is 6000 ms
>>> 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000
>>> ms
>>> 
>>>
>>> My relevant code is:
>>> ===
>>> ssc =  new StreamingContext(conf, Seconds(1))
>>> val messageEvents = events.
>>>   flatMap(e => evaluatorCached.value.find(e)).
>>>   window(Seconds(8), Seconds(4))
>>> messageEvents.print()
>>> ===
>>>
>>> Seems all right to me, window slide duration (4) is streaming context
>>> batch duration (1) *2. So, what's the problem?
>>>
>>> Spark-v1.0.0
>>>
>>> --
>>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
>>> explicitly specified
>>>
>>
>>
>>
>> --
>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
>> explicitly specified
>>
>
>


-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Patrick Wendell
Good catch! Yes I meant 1.0 and later.

On Mon, Jun 2, 2014 at 8:33 PM, Kexin Xie  wrote:
> +1 on Option (B) with flag to allow semantics in (A) for back compatibility.
>
> Kexin
>
>
>
> On Tue, Jun 3, 2014 at 1:18 PM, Nicholas Chammas
>  wrote:
>>
>> On Mon, Jun 2, 2014 at 10:39 PM, Patrick Wendell 
>> wrote:
>>>
>>> (B) Semantics in Spark 1.0 and earlier:
>>
>>
>> Do you mean 1.0 and later?
>>
>> Option (B) with the exception-on-clobber sounds fine to me, btw. My use
>> pattern is probably common but not universal, and deleting user files is
>> indeed scary.
>>
>> Nick
>
>


Re: using Log4j to log INFO level messages on workers

2014-06-02 Thread Alex Gaudio
Hi,


I had the same problem with pyspark.  Here's how I resolved it:

What I've found in python (not sure about scala) is that if the function
being serialized was written in the same python module as the main
function, then logging fails.  If the serialized function is in a separate
module, then logging does not fail.  I just created this gist to demo the
situation and (python) solution.  Is there a similar way to do this in
scala?

https://gist.github.com/adgaudio/0191e14717af68bbba81


Alex


On Mon, Jun 2, 2014 at 7:18 PM, Shivani Rao  wrote:

> Hello Spark fans,
>
> I am trying to log messages from my spark application. When the main()
> function attempts to log, using log.info() it works great, but when I try
> the same command from the code that probably runs on the worker, I
> initially got an serialization error. To solve that, I created a new logger
> in the code that operates on the data, which solved the serialization issue
> but now there is no output in the console or on the worker node logs. I
> don't see any application level log messages in the spark logs either. When
> I use println() instead, I do see console output being  generated.
>
> I tried the following and none of them works
>
> a) pass log4j.properties by using -Dlog4j.properties in my java command
> line initiation of the spark application
> b) setting the properties within the worker by calling log.addAppender(new
> ConsoleAppender)
>
> None of them work.
>
> What am i missing?
>
>
> Thanks,
> Shivani
> --
> Software Engineer
> Analytics Engineering Team@ Box
> Mountain View, CA
>


Re: EC2 Simple Cluster

2014-06-02 Thread Akhil Das
Hi Gianluca,

I believe your cluster setup wasn't complete. Do check the ec2 script
console for more details. Also micro instances will be having only 600mb
memory.

Thanks
Best Regards


On Tue, Jun 3, 2014 at 1:59 AM, Gianluca Privitera <
gianluca.privite...@studio.unibo.it> wrote:

> Hi everyone,
> I would like to setup a very simple cluster (specifically using 2 micro
> instances only) of Spark on EC2 and make it run a simple Spark Streaming
> application I created.
> Someone actually managed to do that?
> Because after launching the scripts from this page:
> http://spark.apache.org/docs/0.9.1/ec2-scripts.html and logging into the
> master node, I cannot find the spark folder the page is talking about, so I
> suppose the launch didn't go well.
>
> Thank you
> Gianluca
>


Re: Using String Dataset for Logistic Regression

2014-06-02 Thread praveshjain1991
I am not sure. I have just been using some numerical datasets.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p6784.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Andrew Or
>> I asked several people, no one seems to believe that we can do this:
>> $ PYTHONPATH=/path/to/assembly/jar python
>> >>> import pyspark

That is because people usually don't package python files into their jars.
For pyspark, however, this will work as long as the jar can be opened and
its contents can be read. In my experience, if I am able to import the
pyspark module by explicitly specifying the PYTHONPATH this way, then I can
run pyspark on YARN without fail.

>> > OK, my colleague found this:
>> > https://mail.python.org/pipermail/python-list/2014-May/671353.html
>> >
>> > And my jar file has 70011 files. Fantastic..

It seems that this problem is not specific to running Java 6 on a Java 7
jar. We definitely need to document and warn against Java 7 jars more
aggressively. For now, please do try building the jar with Java 6.



2014-06-03 4:42 GMT+02:00 Patrick Wendell :

> Yeah we need to add a build warning to the Maven build. Would you be
> able to try compiling Spark with Java 6? It would be good to narrow
> down if you hare hitting this problem or something else.
>
> On Mon, Jun 2, 2014 at 1:15 PM, Xu (Simon) Chen  wrote:
> > Nope... didn't try java 6. The standard installation guide didn't say
> > anything about java 7 and suggested to do "-DskipTests" for the build..
> > http://spark.apache.org/docs/latest/building-with-maven.html
> >
> > So, I didn't see the warning message...
> >
> >
> > On Mon, Jun 2, 2014 at 3:48 PM, Patrick Wendell 
> wrote:
> >>
> >> Are you building Spark with Java 6 or Java 7. Java 6 uses the extended
> >> Zip format and Java 7 uses Zip64. I think we've tried to add some
> >> build warnings if Java 7 is used, for this reason:
> >>
> >> https://github.com/apache/spark/blob/master/make-distribution.sh#L102
> >>
> >> Any luck if you use JDK 6 to compile?
> >>
> >>
> >> On Mon, Jun 2, 2014 at 12:03 PM, Xu (Simon) Chen 
> >> wrote:
> >> > OK, my colleague found this:
> >> > https://mail.python.org/pipermail/python-list/2014-May/671353.html
> >> >
> >> > And my jar file has 70011 files. Fantastic..
> >> >
> >> >
> >> >
> >> >
> >> > On Mon, Jun 2, 2014 at 2:34 PM, Xu (Simon) Chen 
> >> > wrote:
> >> >>
> >> >> I asked several people, no one seems to believe that we can do this:
> >> >> $ PYTHONPATH=/path/to/assembly/jar python
> >> >> >>> import pyspark
> >> >>
> >> >> This following pull request did mention something about generating a
> >> >> zip
> >> >> file for all python related modules:
> >> >> https://www.mail-archive.com/reviews@spark.apache.org/msg08223.html
> >> >>
> >> >> I've tested that zipped modules can as least be imported via
> zipimport.
> >> >>
> >> >> Any ideas?
> >> >>
> >> >> -Simon
> >> >>
> >> >>
> >> >>
> >> >> On Mon, Jun 2, 2014 at 11:50 AM, Andrew Or 
> >> >> wrote:
> >> >>>
> >> >>> Hi Simon,
> >> >>>
> >> >>> You shouldn't have to install pyspark on every worker node. In YARN
> >> >>> mode,
> >> >>> pyspark is packaged into your assembly jar and shipped to your
> >> >>> executors
> >> >>> automatically. This seems like a more general problem. There are a
> few
> >> >>> things to try:
> >> >>>
> >> >>> 1) Run a simple pyspark shell with yarn-client, and do
> >> >>> "sc.parallelize(range(10)).count()" to see if you get the same error
> >> >>> 2) If so, check if your assembly jar is compiled correctly. Run
> >> >>>
> >> >>> $ jar -tf  pyspark
> >> >>> $ jar -tf  py4j
> >> >>>
> >> >>> to see if the files are there. For Py4j, you need both the python
> >> >>> files
> >> >>> and the Java class files.
> >> >>>
> >> >>> 3) If the files are there, try running a simple python shell (not
> >> >>> pyspark
> >> >>> shell) with the assembly jar on the PYTHONPATH:
> >> >>>
> >> >>> $ PYTHONPATH=/path/to/assembly/jar python
> >> >>> >>> import pyspark
> >> >>>
> >> >>> 4) If that works, try it on every worker node. If it doesn't work,
> >> >>> there
> >> >>> is probably something wrong with your jar.
> >> >>>
> >> >>> There is a known issue for PySpark on YARN - jars built with Java 7
> >> >>> cannot be properly opened by Java 6. I would either verify that the
> >> >>> JAVA_HOME set on all of your workers points to Java 7 (by setting
> >> >>> SPARK_YARN_USER_ENV), or simply build your jar with Java 6:
> >> >>>
> >> >>> $ cd /path/to/spark/home
> >> >>> $ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
> >> >>> 2.3.0-cdh5.0.0
> >> >>>
> >> >>> 5) You can check out
> >> >>>
> >> >>>
> http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application
> ,
> >> >>> which has more detailed information about how to debug running an
> >> >>> application on YARN in general. In my experience, the steps outlined
> >> >>> there
> >> >>> are quite useful.
> >> >>>
> >> >>> Let me know if you get it working (or not).
> >> >>>
> >> >>> Cheers,
> >> >>> Andrew
> >> >>>
> >> >>>
> >> >>>
> >> >>> 2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :
> >> >>>
> >>  Hi folks,
> >> 
> >>  I have a weird problem when using pyspark with yarn. 

Re: Using String Dataset for Logistic Regression

2014-06-02 Thread Xiangrui Meng
Yes. MLlib 1.0 supports sparse input data for linear methods. -Xiangrui

On Mon, Jun 2, 2014 at 11:36 PM, praveshjain1991
 wrote:
> I am not sure. I have just been using some numerical datasets.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p6784.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming not processing file with particular number of entries

2014-06-02 Thread praveshjain1991
Hi,

I am using spark-streaming application to process some data over a 3 node
cluster. It is, however, not processing any file that contains 0.4 million
entires. Files with any other number of entries are processed fine. When
running in local mode, even the 0.4 million entries file is processed fine.

I've tried using different files that have that many number of entries but
none of them work.

Any ideas why such a wierd issue could occur?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using String Dataset for Logistic Regression

2014-06-02 Thread praveshjain1991
Thank you for your replies. I've now been using integer datasets but ran into
another issue.

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-td6694.html

Any ideas?

--
Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p6695.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Kexin Xie
Hi,

Spark 1.0 changes the default behaviour of RDD.saveAsTextFile to
throw org.apache.hadoop.mapred.FileAlreadyExistsException when file already
exists.

Is there a way I can allow Spark to overwrite the existing file?

Cheers,
Kexin


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Pierre Borckmans
+1 Same question here...

Message sent from a mobile device - excuse typos and abbreviations

> Le 2 juin 2014 à 10:08, Kexin Xie  a écrit :
> 
> Hi,
> 
> Spark 1.0 changes the default behaviour of RDD.saveAsTextFile to throw 
> org.apache.hadoop.mapred.FileAlreadyExistsException when file already exists. 
> 
> Is there a way I can allow Spark to overwrite the existing file?
> 
> Cheers,
> Kexin
> 


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Michael Cutler
The function saveAsTextFile

is
a wrapper around saveAsHadoopFile

and
from looking at the source I don't see any flags etc. to overwrite existing
files.  It is however trivial to do this using HDFS directly from Scala.

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new
java.net.URI("hdfs://localhost:9000"), hadoopConf)


You can now use hdfs to do all sorts of useful things, listing directories,
recursively delete output directories e.g.

// Delete the existing path, ignore any exceptions thrown if the path
doesn't exist
val output = "hdfs://localhost:9000/tmp/wimbledon_top_mentions"
try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch
{ case _ : Throwable => { } }
top_mentions.saveAsTextFile(output)


For an illustrated example of how I do this see HDFSDeleteExample.scala




*Michael Cutler*
Founder, CTO


*Mobile: +44 789 990 7847Email:   mich...@tumra.com Web:
tumra.com *
*Visit us at our offices in Chiswick Park *
*Registered in England & Wales, 07916412. VAT No. 130595328*


This email and any files transmitted with it are confidential and may also
be privileged. It is intended only for the person to whom it is addressed.
If you have received this email in error, please inform the sender immediately.
If you are not the intended recipient you must not use, disclose, copy,
print, distribute or rely on this email.


On 2 June 2014 09:26, Pierre Borckmans <
pierre.borckm...@realimpactanalytics.com> wrote:

> +1 Same question here...
>
> Message sent from a mobile device - excuse typos and abbreviations
>
> Le 2 juin 2014 à 10:08, Kexin Xie  a écrit :
>
> Hi,
>
> Spark 1.0 changes the default behaviour of RDD.saveAsTextFile to
> throw org.apache.hadoop.mapred.FileAlreadyExistsException when file already
> exists.
>
> Is there a way I can allow Spark to overwrite the existing file?
>
> Cheers,
> Kexin
>
>


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Pierre B
Hi Michaël,

Thanks for this. We could indeed do that.

But I guess the question is more about the change of behaviour from 0.9.1 to
1.0.0.
We never had to care about that in previous versions.

Does that mean we have to manually remove existing files or is there a way
to "aumotically" overwrite when using saveAsTextFile?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using String Dataset for Logistic Regression

2014-06-02 Thread Wush Wu
Dear all,

Does spark support sparse matrix/vector for LR now?

Best,
Wush
2014/6/2 下午3:19 於 "praveshjain1991"  寫道:

> Thank you for your replies. I've now been using integer datasets but ran
> into
> another issue.
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-td6694.html
>
> Any ideas?
>
> --
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p6695.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: spark 1.0.0 on yarn

2014-06-02 Thread Xu (Simon) Chen
OK, rebuilding the assembly jar file with cdh5 works now...
Thanks..

-Simon


On Sun, Jun 1, 2014 at 9:37 PM, Xu (Simon) Chen  wrote:

> That helped a bit... Now I have a different failure: the start up process
> is stuck in an infinite loop outputting the following message:
>
> 14/06/02 01:34:56 INFO cluster.YarnClientSchedulerBackend: Application
> report from ASM:
>  appMasterRpcPort: -1
>  appStartTime: 1401672868277
>  yarnAppState: ACCEPTED
>
> I am using the hadoop 2 prebuild package. Probably it doesn't have the
> latest yarn client.
>
> -Simon
>
>
>
>
> On Sun, Jun 1, 2014 at 9:03 PM, Patrick Wendell 
> wrote:
>
>> As a debugging step, does it work if you use a single resource manager
>> with the key "yarn.resourcemanager.address" instead of using two named
>> resource managers? I wonder if somehow the YARN client can't detect
>> this multi-master set-up.
>>
>> On Sun, Jun 1, 2014 at 12:49 PM, Xu (Simon) Chen 
>> wrote:
>> > Note that everything works fine in spark 0.9, which is packaged in
>> CDH5: I
>> > can launch a spark-shell and interact with workers spawned on my yarn
>> > cluster.
>> >
>> > So in my /opt/hadoop/conf/yarn-site.xml, I have:
>> > ...
>> > 
>> > yarn.resourcemanager.address.rm1
>> > controller-1.mycomp.com:23140
>> > 
>> > ...
>> > 
>> > yarn.resourcemanager.address.rm2
>> > controller-2.mycomp.com:23140
>> > 
>> > ...
>> >
>> > And the other usual stuff.
>> >
>> > So spark 1.0 is launched like this:
>> > Spark Command: java -cp
>> >
>> ::/home/chenxu/spark-1.0.0-bin-hadoop2/conf:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/opt/hadoop/conf
>> > -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>> > org.apache.spark.deploy.SparkSubmit spark-shell --master yarn-client
>> --class
>> > org.apache.spark.repl.Main
>> >
>> > I do see "/opt/hadoop/conf" included, but not sure it's the right place.
>> >
>> > Thanks..
>> > -Simon
>> >
>> >
>> >
>> > On Sun, Jun 1, 2014 at 1:57 PM, Patrick Wendell 
>> wrote:
>> >>
>> >> I would agree with your guess, it looks like the yarn library isn't
>> >> correctly finding your yarn-site.xml file. If you look in
>> >> yarn-site.xml do you definitely the resource manager
>> >> address/addresses?
>> >>
>> >> Also, you can try running this command with
>> >> SPARK_PRINT_LAUNCH_COMMAND=1 to make sure the classpath is being
>> >> set-up correctly.
>> >>
>> >> - Patrick
>> >>
>> >> On Sat, May 31, 2014 at 5:51 PM, Xu (Simon) Chen 
>> >> wrote:
>> >> > Hi all,
>> >> >
>> >> > I tried a couple ways, but couldn't get it to work..
>> >> >
>> >> > The following seems to be what the online document
>> >> > (http://spark.apache.org/docs/latest/running-on-yarn.html) is
>> >> > suggesting:
>> >> >
>> >> >
>> SPARK_JAR=hdfs://test/user/spark/share/lib/spark-assembly-1.0.0-hadoop2.2.0.jar
>> >> > YARN_CONF_DIR=/opt/hadoop/conf ./spark-shell --master yarn-client
>> >> >
>> >> > Help info of spark-shell seems to be suggesting "--master yarn
>> >> > --deploy-mode
>> >> > cluster".
>> >> >
>> >> > But either way, I am seeing the following messages:
>> >> > 14/06/01 00:33:20 INFO client.RMProxy: Connecting to ResourceManager
>> at
>> >> > /0.0.0.0:8032
>> >> > 14/06/01 00:33:21 INFO ipc.Client: Retrying connect to server:
>> >> > 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is
>> >> > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1
>> SECONDS)
>> >> > 14/06/01 00:33:22 INFO ipc.Client: Retrying connect to server:
>> >> > 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is
>> >> > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1
>> SECONDS)
>> >> >
>> >> > My guess is that spark-shell is trying to talk to resource manager to
>> >> > setup
>> >> > spark master/worker nodes - I am not sure where 0.0.0.0:8032 came
>> from
>> >> > though. I am running CDH5 with two resource managers in HA mode.
>> Their
>> >> > IP/port should be in /opt/hadoop/conf/yarn-site.xml. I tried both
>> >> > HADOOP_CONF_DIR and YARN_CONF_DIR, but that info isn't picked up.
>> >> >
>> >> > Any ideas? Thanks.
>> >> > -Simon
>> >
>> >
>>
>
>


pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Xu (Simon) Chen
Hi folks,

I have a weird problem when using pyspark with yarn. I started ipython as
follows:

IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4 --num-executors
4 --executor-memory 4G

When I create a notebook, I can see workers being created and indeed I see
spark UI running on my client machine on port 4040.

I have the following simple script:
"""
import pyspark
data = sc.textFile("hdfs://test/tmp/data/*").cache()
oneday = data.map(lambda line: line.split(",")).\
  map(lambda f: (f[0], float(f[1]))).\
  filter(lambda t: t[0] >= "2013-01-01" and t[0] <
"2013-01-02").\
  map(lambda t: (parser.parse(t[0]), t[1]))
oneday.take(1)
"""

By executing this, I see that it is my client machine (where ipython is
launched) is reading all the data from HDFS, and produce the result of
take(1), rather than my worker nodes...

When I do "data.count()", things would blow up altogether. But I do see in
the error message something like this:
"""

Error from python worker:
  /usr/bin/python: No module named pyspark

"""


Am I supposed to install pyspark on every worker node?


Thanks.

-Simon


Re: Is uberjar a recommended way of running Spark/Scala applications?

2014-06-02 Thread Andrei
Thanks! This is even closer to what I am looking for. I'm in a trip now, so
I'm going to give it a try when I come back.


On Mon, Jun 2, 2014 at 5:12 AM, Ngoc Dao  wrote:

> Alternative solution:
> https://github.com/xitrum-framework/xitrum-package
>
> It collects all dependency .jar files in your Scala program into a
> directory. It doesn't merge the .jar files together, the .jar files
> are left "as is".
>
>
> On Sat, May 31, 2014 at 3:42 AM, Andrei  wrote:
> > Thanks, Stephen. I have eventually decided to go with assembly, but put
> away
> > Spark and Hadoop jars, and instead use `spark-submit` to automatically
> > provide these dependencies. This way no resource conflicts arise and
> > mergeStrategy needs no modification. To memorize this stable setup and
> also
> > share it with the community I've crafted a project [1] with minimal
> working
> > config. It is SBT project with assembly plugin, Spark 1.0 and Cloudera's
> > Hadoop client. Hope, it will help somebody to take Spark setup quicker.
> >
> > Though I'm fine with this setup for final builds, I'm still looking for a
> > more interactive dev setup - something that doesn't require full rebuild.
> >
> > [1]: https://github.com/faithlessfriend/sample-spark-project
> >
> > Thanks and have a good weekend,
> > Andrei
> >
> > On Thu, May 29, 2014 at 8:27 PM, Stephen Boesch 
> wrote:
> >>
> >>
> >> The MergeStrategy combined with sbt assembly did work for me.  This is
> not
> >> painless: some trial and error and the assembly may take multiple
> minutes.
> >>
> >> You will likely want to filter out some additional classes from the
> >> generated jar file.  Here is an SOF answer to explain that and with
> IMHO the
> >> best answer snippet included here (in this case the OP understandably
> did
> >> not want to not include javax.servlet.Servlet)
> >>
> >> http://stackoverflow.com/questions/7819066/sbt-exclude-class-from-jar
> >>
> >>
> >> mappings in (Compile,packageBin) ~= { (ms: Seq[(File, String)]) => ms
> >> filter { case (file, toPath) => toPath != "javax/servlet/Servlet.class"
> } }
> >>
> >> There is a setting to not include the project files in the assembly but
> I
> >> do not recall it at this moment.
> >>
> >>
> >>
> >> 2014-05-29 10:13 GMT-07:00 Andrei :
> >>
> >>> Thanks, Jordi, your gist looks pretty much like what I have in my
> project
> >>> currently (with few exceptions that I'm going to borrow).
> >>>
> >>> I like the idea of using "sbt package", since it doesn't require third
> >>> party plugins and, most important, doesn't create a mess of classes and
> >>> resources. But in this case I'll have to handle jar list manually via
> Spark
> >>> context. Is there a way to automate this process? E.g. when I was a
> Clojure
> >>> guy, I could run "lein deps" (lein is a build tool similar to sbt) to
> >>> download all dependencies and then just enumerate them from my app.
> Maybe
> >>> you have heard of something like that for Spark/SBT?
> >>>
> >>> Thanks,
> >>> Andrei
> >>>
> >>>
> >>> On Thu, May 29, 2014 at 3:48 PM, jaranda  wrote:
> 
>  Hi Andrei,
> 
>  I think the preferred way to deploy Spark jobs is by using the sbt
>  package
>  task instead of using the sbt assembly plugin. In any case, as you
>  comment,
>  the mergeStrategy in combination with some dependency exlusions should
>  fix
>  your problems. Have a look at  this gist
>     for
> further
>  details (I just followed some recommendations commented in the sbt
>  assembly
>  plugin documentation).
> 
>  Up to now I haven't found a proper way to combine my
>  development/deployment
>  phases, although I must say my experience in Spark is pretty poor (it
>  really
>  depends in your deployment requirements as well). In this case, I
> think
>  someone else could give you some further insights.
> 
>  Best,
> 
> 
> 
>  --
>  View this message in context:
> 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-uberjar-a-recommended-way-of-running-Spark-Scala-applications-tp6518p6520.html
>  Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>>
> >>
> >
>


Re: Trouble with EC2

2014-06-02 Thread Stefan van Wouw
Dear PJ$,

If you are familiar with Puppet, you could try using the puppet module I wrote 
(currently for Spark 0.9.0, I custom compiled it since no Debian package was 
available at the time I started with a project I required it for).

https://github.com/stefanvanwouw/puppet-spark

---
Kind regards,

Stefan van Wouw

On 02 Jun 2014, at 00:11, PJ$  wrote:

> Running on a few m3.larges with the ami-848a6eec image (debian 7). Haven't 
> gotten any further. No clue what's wrong. I'd really appreciate any guidance 
> y'all could offer. 
> 
> Best, 
> PJ$
> 
> 
> On Sat, May 31, 2014 at 1:40 PM, Matei Zaharia  
> wrote:
> What instance types did you launch on?
> 
> Sometimes you also get a bad individual machine from EC2. It might help to 
> remove the node it’s complaining about from the conf/slaves file.
> 
> Matei
> 
> On May 30, 2014, at 11:18 AM, PJ$  wrote:
> 
>> Hey Folks, 
>> 
>> I'm really having quite a bit of trouble getting spark running on ec2. I'm 
>> not using scripts the https://github.com/apache/spark/tree/master/ec2 
>> because I'd like to know how everything works. But I'm going a little crazy. 
>> I think that something about the networking configuration must be messed up, 
>> but I'm at a loss. Shortly after starting the cluster, I get a lot of this: 
>> 
>> 14/05/30 18:03:22 INFO master.Master: Registering worker 
>> ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
>> 14/05/30 18:03:22 INFO master.Master: Registering worker 
>> ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
>> 14/05/30 18:03:23 INFO master.Master: Registering worker 
>> ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
>> 14/05/30 18:03:23 INFO master.Master: Registering worker 
>> ip-10-100-184-45.ec2.internal:7078 with 2 cores, 6.3 GB RAM
>> 14/05/30 18:05:54 INFO master.Master: 
>> akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
>> removing it.
>> 14/05/30 18:05:54 INFO actor.LocalActorRef: Message 
>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from 
>> Actor[akka://sparkMaster/deadLetters] to 
>> Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.100.75.70%3A36725-25#847210246]
>>  was not delivered. [5] dead letters encountered. This logging can be turned 
>> off or adjusted with configuration settings 'akka.log-dead-letters' and 
>> 'akka.log-dead-letters-during-shutdown'.
>> 14/05/30 18:05:54 INFO master.Master: 
>> akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
>> removing it.
>> 14/05/30 18:05:54 INFO master.Master: 
>> akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
>> removing it.
>> 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
>> [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] -> 
>> [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
>> failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
>> akka.remote.EndpointAssociationException: Association failed with 
>> [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
>> Caused by: 
>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
>> Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
>> ]
>> 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
>> [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] -> 
>> [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
>> failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
>> akka.remote.EndpointAssociationException: Association failed with 
>> [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
>> Caused by: 
>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
>> Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
>> ]
>> 14/05/30 18:05:54 INFO master.Master: 
>> akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
>> removing it.
>> 14/05/30 18:05:54 INFO master.Master: 
>> akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485 got disassociated, 
>> removing it.
>> 14/05/30 18:05:54 ERROR remote.EndpointWriter: AssociationError 
>> [akka.tcp://sparkMaster@ip-10-100-184-45.ec2.internal:7077] -> 
>> [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]: Error [Association 
>> failed with [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]] [
>> akka.remote.EndpointAssociationException: Association failed with 
>> [akka.tcp://spark@ip-10-100-75-70.ec2.internal:38485]
>> Caused by: 
>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
>> Connection refused: ip-10-100-75-70.ec2.internal/10.100.75.70:38485
> 
> 



Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nicholas Chammas
What I’ve found using saveAsTextFile() against S3 (prior to Spark 1.0.0.)
is that files get overwritten automatically. This is one danger to this
though. If I save to a directory that already has 20 part- files, but this
time around I’m only saving 15 part- files, then there will be 5 leftover
part- files from the previous set mixed in with the 15 newer files. This is
potentially dangerous.

I haven’t checked to see if this behavior has changed in 1.0.0. Are you
saying it has, Pierre?

On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
[pierre.borckm...@realimpactanalytics.com](mailto:pierre.borckm...@realimpactanalytics.com)

wrote:

Hi Michaël,
>
> Thanks for this. We could indeed do that.
>
> But I guess the question is more about the change of behaviour from 0.9.1
> to
> 1.0.0.
> We never had to care about that in previous versions.
>
> Does that mean we have to manually remove existing files or is there a way
> to "aumotically" overwrite when using saveAsTextFile?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
​


Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Andrew Or
Hi Simon,

You shouldn't have to install pyspark on every worker node. In YARN mode,
pyspark is packaged into your assembly jar and shipped to your executors
automatically. This seems like a more general problem. There are a few
things to try:

1) Run a simple pyspark shell with yarn-client, and do
"sc.parallelize(range(10)).count()" to see if you get the same error
2) If so, check if your assembly jar is compiled correctly. Run

$ jar -tf  pyspark
$ jar -tf  py4j

to see if the files are there. For Py4j, you need both the python files and
the Java class files.

3) If the files are there, try running a simple python shell (not pyspark
shell) with the assembly jar on the PYTHONPATH:

$ PYTHONPATH=/path/to/assembly/jar python
>>> import pyspark

4) If that works, try it on every worker node. If it doesn't work, there is
probably something wrong with your jar.

There is a known issue for PySpark on YARN - jars built with Java 7 cannot
be properly opened by Java 6. I would either verify that the JAVA_HOME set
on all of your workers points to Java 7 (by setting SPARK_YARN_USER_ENV),
or simply build your jar with Java 6:

$ cd /path/to/spark/home
$ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
2.3.0-cdh5.0.0

5) You can check out
http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application,
which has more detailed information about how to debug running an
application on YARN in general. In my experience, the steps outlined there
are quite useful.

Let me know if you get it working (or not).

Cheers,
Andrew



2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :

> Hi folks,
>
> I have a weird problem when using pyspark with yarn. I started ipython as
> follows:
>
> IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4
> --num-executors 4 --executor-memory 4G
>
> When I create a notebook, I can see workers being created and indeed I see
> spark UI running on my client machine on port 4040.
>
> I have the following simple script:
> """
> import pyspark
> data = sc.textFile("hdfs://test/tmp/data/*").cache()
> oneday = data.map(lambda line: line.split(",")).\
>   map(lambda f: (f[0], float(f[1]))).\
>   filter(lambda t: t[0] >= "2013-01-01" and t[0] <
> "2013-01-02").\
>   map(lambda t: (parser.parse(t[0]), t[1]))
> oneday.take(1)
> """
>
> By executing this, I see that it is my client machine (where ipython is
> launched) is reading all the data from HDFS, and produce the result of
> take(1), rather than my worker nodes...
>
> When I do "data.count()", things would blow up altogether. But I do see in
> the error message something like this:
> """
>
> Error from python worker:
>   /usr/bin/python: No module named pyspark
>
> """
>
>
> Am I supposed to install pyspark on every worker node?
>
>
> Thanks.
>
> -Simon
>
>


Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Xu (Simon) Chen
1) yes, that sc.parallelize(range(10)).count() has the same error.

2) the files seem to be correct

3) I have trouble at this step, "ImportError: No module named pyspark"
but I seem to have files in the jar file:
"""
$ PYTHONPATH=~/spark-assembly-1.0.0-hadoop2.3.0-cdh5.0.1.jar python
>>> import pyspark
Traceback (most recent call last):
  File "", line 1, in 
ImportError: No module named pyspark

$ jar -tf ~/spark-assembly-1.0.0-hadoop2.3.0-cdh5.0.1.jar pyspark
pyspark/
pyspark/rddsampler.py
pyspark/broadcast.py
pyspark/serializers.py
pyspark/java_gateway.py
pyspark/resultiterable.py
pyspark/accumulators.py
pyspark/sql.py
pyspark/__init__.py
pyspark/daemon.py
pyspark/context.py
pyspark/cloudpickle.py
pyspark/join.py
pyspark/tests.py
pyspark/files.py
pyspark/conf.py
pyspark/rdd.py
pyspark/storagelevel.py
pyspark/statcounter.py
pyspark/shell.py
pyspark/worker.py
"""

4) All my nodes should be running java 7, so probably this is not related.
5) I'll do it in a bit.

Any ideas on 3)?

Thanks.
-Simon



On Mon, Jun 2, 2014 at 11:50 AM, Andrew Or  wrote:

> Hi Simon,
>
> You shouldn't have to install pyspark on every worker node. In YARN mode,
> pyspark is packaged into your assembly jar and shipped to your executors
> automatically. This seems like a more general problem. There are a few
> things to try:
>
> 1) Run a simple pyspark shell with yarn-client, and do
> "sc.parallelize(range(10)).count()" to see if you get the same error
> 2) If so, check if your assembly jar is compiled correctly. Run
>
> $ jar -tf  pyspark
> $ jar -tf  py4j
>
> to see if the files are there. For Py4j, you need both the python files
> and the Java class files.
>
> 3) If the files are there, try running a simple python shell (not pyspark
> shell) with the assembly jar on the PYTHONPATH:
>
> $ PYTHONPATH=/path/to/assembly/jar python
> >>> import pyspark
>
> 4) If that works, try it on every worker node. If it doesn't work, there
> is probably something wrong with your jar.
>
> There is a known issue for PySpark on YARN - jars built with Java 7 cannot
> be properly opened by Java 6. I would either verify that the JAVA_HOME set
> on all of your workers points to Java 7 (by setting SPARK_YARN_USER_ENV),
> or simply build your jar with Java 6:
>
> $ cd /path/to/spark/home
> $ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
> 2.3.0-cdh5.0.0
>
> 5) You can check out
> http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application,
> which has more detailed information about how to debug running an
> application on YARN in general. In my experience, the steps outlined there
> are quite useful.
>
> Let me know if you get it working (or not).
>
> Cheers,
> Andrew
>
>
>
> 2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :
>
> Hi folks,
>>
>> I have a weird problem when using pyspark with yarn. I started ipython as
>> follows:
>>
>> IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4
>> --num-executors 4 --executor-memory 4G
>>
>> When I create a notebook, I can see workers being created and indeed I
>> see spark UI running on my client machine on port 4040.
>>
>> I have the following simple script:
>> """
>> import pyspark
>> data = sc.textFile("hdfs://test/tmp/data/*").cache()
>> oneday = data.map(lambda line: line.split(",")).\
>>   map(lambda f: (f[0], float(f[1]))).\
>>   filter(lambda t: t[0] >= "2013-01-01" and t[0] <
>> "2013-01-02").\
>>   map(lambda t: (parser.parse(t[0]), t[1]))
>> oneday.take(1)
>> """
>>
>> By executing this, I see that it is my client machine (where ipython is
>> launched) is reading all the data from HDFS, and produce the result of
>> take(1), rather than my worker nodes...
>>
>> When I do "data.count()", things would blow up altogether. But I do see
>> in the error message something like this:
>> """
>>
>> Error from python worker:
>>   /usr/bin/python: No module named pyspark
>>
>> """
>>
>>
>> Am I supposed to install pyspark on every worker node?
>>
>>
>> Thanks.
>>
>> -Simon
>>
>>
>


Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Xu (Simon) Chen
So, I did specify SPARK_JAR in my pyspark prog. I also checked the workers,
it seems that the jar file is distributed and included in classpath
correctly.

I think the problem is likely at step 3..

I build my jar file with maven, like this:
"mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.0.1 -DskipTests clean
package"

Anything that I might have missed?

Thanks.
-Simon


On Mon, Jun 2, 2014 at 12:02 PM, Xu (Simon) Chen  wrote:

> 1) yes, that sc.parallelize(range(10)).count() has the same error.
>
> 2) the files seem to be correct
>
> 3) I have trouble at this step, "ImportError: No module named pyspark"
> but I seem to have files in the jar file:
> """
> $ PYTHONPATH=~/spark-assembly-1.0.0-hadoop2.3.0-cdh5.0.1.jar python
> >>> import pyspark
> Traceback (most recent call last):
>   File "", line 1, in 
> ImportError: No module named pyspark
>
> $ jar -tf ~/spark-assembly-1.0.0-hadoop2.3.0-cdh5.0.1.jar pyspark
> pyspark/
> pyspark/rddsampler.py
> pyspark/broadcast.py
> pyspark/serializers.py
> pyspark/java_gateway.py
> pyspark/resultiterable.py
> pyspark/accumulators.py
> pyspark/sql.py
> pyspark/__init__.py
> pyspark/daemon.py
> pyspark/context.py
> pyspark/cloudpickle.py
> pyspark/join.py
> pyspark/tests.py
> pyspark/files.py
> pyspark/conf.py
> pyspark/rdd.py
> pyspark/storagelevel.py
> pyspark/statcounter.py
> pyspark/shell.py
> pyspark/worker.py
> """
>
> 4) All my nodes should be running java 7, so probably this is not related.
> 5) I'll do it in a bit.
>
> Any ideas on 3)?
>
> Thanks.
> -Simon
>
>
>
> On Mon, Jun 2, 2014 at 11:50 AM, Andrew Or  wrote:
>
>> Hi Simon,
>>
>> You shouldn't have to install pyspark on every worker node. In YARN mode,
>> pyspark is packaged into your assembly jar and shipped to your executors
>> automatically. This seems like a more general problem. There are a few
>> things to try:
>>
>> 1) Run a simple pyspark shell with yarn-client, and do
>> "sc.parallelize(range(10)).count()" to see if you get the same error
>> 2) If so, check if your assembly jar is compiled correctly. Run
>>
>> $ jar -tf  pyspark
>> $ jar -tf  py4j
>>
>> to see if the files are there. For Py4j, you need both the python files
>> and the Java class files.
>>
>> 3) If the files are there, try running a simple python shell (not pyspark
>> shell) with the assembly jar on the PYTHONPATH:
>>
>> $ PYTHONPATH=/path/to/assembly/jar python
>> >>> import pyspark
>>
>> 4) If that works, try it on every worker node. If it doesn't work, there
>> is probably something wrong with your jar.
>>
>> There is a known issue for PySpark on YARN - jars built with Java 7
>> cannot be properly opened by Java 6. I would either verify that the
>> JAVA_HOME set on all of your workers points to Java 7 (by setting
>> SPARK_YARN_USER_ENV), or simply build your jar with Java 6:
>>
>> $ cd /path/to/spark/home
>> $ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
>> 2.3.0-cdh5.0.0
>>
>> 5) You can check out
>> http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application,
>> which has more detailed information about how to debug running an
>> application on YARN in general. In my experience, the steps outlined there
>> are quite useful.
>>
>> Let me know if you get it working (or not).
>>
>> Cheers,
>> Andrew
>>
>>
>>
>> 2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :
>>
>> Hi folks,
>>>
>>> I have a weird problem when using pyspark with yarn. I started ipython
>>> as follows:
>>>
>>> IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4
>>> --num-executors 4 --executor-memory 4G
>>>
>>> When I create a notebook, I can see workers being created and indeed I
>>> see spark UI running on my client machine on port 4040.
>>>
>>> I have the following simple script:
>>> """
>>> import pyspark
>>> data = sc.textFile("hdfs://test/tmp/data/*").cache()
>>> oneday = data.map(lambda line: line.split(",")).\
>>>   map(lambda f: (f[0], float(f[1]))).\
>>>   filter(lambda t: t[0] >= "2013-01-01" and t[0] <
>>> "2013-01-02").\
>>>   map(lambda t: (parser.parse(t[0]), t[1]))
>>> oneday.take(1)
>>> """
>>>
>>> By executing this, I see that it is my client machine (where ipython is
>>> launched) is reading all the data from HDFS, and produce the result of
>>> take(1), rather than my worker nodes...
>>>
>>> When I do "data.count()", things would blow up altogether. But I do see
>>> in the error message something like this:
>>> """
>>>
>>> Error from python worker:
>>>   /usr/bin/python: No module named pyspark
>>>
>>> """
>>>
>>>
>>> Am I supposed to install pyspark on every worker node?
>>>
>>>
>>> Thanks.
>>>
>>> -Simon
>>>
>>>
>>
>


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Pierre Borckmans
Indeed, the behavior has changed for good or for bad. I mean, I agree with the 
danger you mention but I'm not sure it's happening like that. Isn't there a 
mechanism for overwrite in Hadoop that automatically removes part files, then 
writes a _temporary folder and then only the part files along with the _success 
folder. 

In any case this change of behavior should be documented IMO.

Cheers 
Pierre

Message sent from a mobile device - excuse typos and abbreviations

> Le 2 juin 2014 à 17:42, Nicholas Chammas  a écrit 
> :
> 
> What I’ve found using saveAsTextFile() against S3 (prior to Spark 1.0.0.) is 
> that files get overwritten automatically. This is one danger to this though. 
> If I save to a directory that already has 20 part- files, but this time 
> around I’m only saving 15 part- files, then there will be 5 leftover part- 
> files from the previous set mixed in with the 15 newer files. This is 
> potentially dangerous.
> 
> I haven’t checked to see if this behavior has changed in 1.0.0. Are you 
> saying it has, Pierre?
> 
>> On Mon, Jun 2, 2014 at 9:41 AM, Pierre B 
>> [pierre.borckm...@realimpactanalytics.com](mailto:pierre.borckm...@realimpactanalytics.com)
>>  wrote:
>> 
>> Hi Michaël,
>> 
>> Thanks for this. We could indeed do that.
>> 
>> But I guess the question is more about the change of behaviour from 0.9.1 to
>> 1.0.0.
>> We never had to care about that in previous versions.
>> 
>> Does that mean we have to manually remove existing files or is there a way
>> to "aumotically" overwrite when using saveAsTextFile?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ​


Re: Failed to remove RDD error

2014-06-02 Thread Michael Chang
Hey Mayur,

Thanks for the suggestion, I didn't realize that was configurable.  I don't
think I'm running out of memory, though it does seem like these errors go
away when i turn off the spark.streaming.unpersist configuration and use
spark.cleaner.ttl instead.  Do you know if there are known issues with the
unpersist option?


On Sat, May 31, 2014 at 12:17 AM, Mayur Rustagi 
wrote:

> You can increase your akka timeout, should give you some more life.. are
> you running out of memory by any chance?
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Sat, May 31, 2014 at 6:52 AM, Michael Chang  wrote:
>
>> I'm running a some kafka streaming spark contexts (on 0.9.1), and they
>> seem to be dying after 10 or so minutes with a lot of these errors.  I
>> can't really tell what's going on here, except that maybe the driver is
>> unresponsive somehow?  Has anyone seen this before?
>>
>> 14/05/31 01:13:30 ERROR BlockManagerMaster: Failed to remove RDD 12635
>>
>> akka.pattern.AskTimeoutException: Timed out
>>
>> at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>>
>> at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118)
>>
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:691)
>>
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:688)
>>
>> at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455)
>>
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407)
>>
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411)
>>
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363)
>>
>> at java.lang.Thread.run(Thread.java:744)
>>
>> Thanks,
>>
>> Mike
>>
>>
>>
>


Re: hadoopRDD stalls reading entire directory

2014-06-02 Thread Russell Jurney
Looks like just worker and master processes are running:

[hivedata@hivecluster2 ~]$ jps

10425 Jps

[hivedata@hivecluster2 ~]$ ps aux|grep spark

hivedata 10424  0.0  0.0 103248   820 pts/3S+   10:05   0:00 grep spark

root 10918  0.5  1.4 4752880 230512 ?  Sl   May27  41:43 java -cp
:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/conf:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/jline.jar
-Dspark.akka.logLifecycleEvents=true
-Djava.library.path=/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
-Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip hivecluster2
--port 7077 --webui-port 18080

root 12715  0.0  0.0 148028   656 ?SMay27   0:00 sudo
/opt/cloudera/parcels/SPARK/lib/spark/bin/spark-class
org.apache.spark.deploy.worker.Worker spark://hivecluster2:7077

root 12716  0.3  1.1 4155884 191340 ?  Sl   May27  30:21 java -cp
:/opt/cloudera/parcels/SPARK/lib/spark/conf:/opt/cloudera/parcels/SPARK/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/jline.jar
-Dspark.akka.logLifecycleEvents=true
-Djava.library.path=/opt/cloudera/parcels/SPARK/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
-Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker
spark://hivecluster2:7077




On Sun, Jun 1, 2014 at 7:41 PM, Aaron Davidson  wrote:

> Sounds like you have two shells running, and the first one is talking all
> your resources. Do a "jps" and kill the other guy, then try again.
>
> By the way, you can look at http://localhost:8080 (replace localhost with
> the server your Spark Master is running on) to see what applications are
> currently started, and what resource allocations they have.
>
>
> On Sun, Jun 1, 2014 at 6:47 PM, Russell Jurney 
> wrote:
>
>> Thanks again. Run results here:
>> https://gist.github.com/rjurney/dc0efae486ba7d55b7d5
>>
>> This time I get a port already in use exception on 4040, but it isn't
>> fatal. Then when I run rdd.first, I get this over and over:
>>
>> 14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not 
>> accepted any resources; check your cluster UI to ensure that workers are 
>> registered and have sufficient memory
>>
>>
>>
>>
>>
>>
>> On Sun, Jun 1, 2014 at 3:09 PM, Aaron Davidson 
>> wrote:
>>
>>> You can avoid that by using the constructor that takes a SparkConf, a la
>>>
>>> val conf = new SparkConf()
>>> conf.setJars("avro.jar", ...)
>>> val sc = new SparkContext(conf)
>>>
>>>
>>> On Sun, Jun 1, 2014 at 2:32 PM, Russell Jurney >> > wrote:
>>>
 Followup question: the docs to make a new SparkContext require that I
 know where $SPARK_HOME is. However, I have no idea. Any idea where that
 might be?


 On Sun, Jun 1, 2014 at 10:28 AM, Aaron Davidson 
 wrote:

> Gotcha. The easiest way to get your dependencies to your Executors
> would probably be to construct your SparkContext with all necessary jars
> passed in (as the "jars" parameter), or inside a SparkConf with setJars().
> Avro is a "necessary jar", but it's possible your application also needs 
> to
> distribute other ones to the cluster.
>
> An easy way to make sure all your dependencies get shipped to the
> cluster is to create an assembly jar of your application, and then you 
> just
> need to tell Spark about that jar, which includes all your appli

Is Hadoop MR now comparable with Spark?

2014-06-02 Thread Ian Ferreira
http://hortonworks.com/blog/ddm/#.U4yn3gJgfts.twitter






Re: [Spark Streaming] Distribute custom receivers evenly across excecutors

2014-06-02 Thread Guang Gao
The receivers are submitted as tasks. They are supposed to be assigned
to the executors in a round-robin manner by
TaskSchedulerImpl.resourceOffers(). However, sometimes not all the
executors are registered when the receivers are submitted. That's why
the receivers fill up the registered executors first and then the
others. 1.0.0 tries to handle this problem by running a dummy batch
job before submitting the receivers in
ReceiverTracker.startReceivers():

  // Run the dummy Spark job to ensure that all slaves have registered.
  // This avoids all the receivers to be scheduled on the same node.
  if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x,
1)).reduceByKey(_ + _, 20).collect()
  }

Apparently, sometimes it doesn't work. You can solve this problem by
running a similar dummy job at a much larger scale before you start
the streaming job, like:

ssc.sparkContext.makeRDD(1 to 1, 1).map(x => (x,
1)).reduceByKey(_ + _, 1000).collect()

On Sun, Jun 1, 2014 at 6:06 PM, Guang Gao  wrote:
> Dear All,
>
> I'm running Spark Streaming (1.0.0) with Yarn (2.2.0) on a 10-node cluster.
> I setup 10 custom receivers to hear from 10 data streams. I want one
> receiver per node in order to maximize the network bandwidth. However, if I
> set "--executor-cores 4", the 10 receivers only run on 3 of the nodes in the
> cluster, each running 4, 4, 2 receivers; if I set "--executor-cores 1", each
> node will run exactly one receiver, and it seems that Spark can't make any
> progress to process theses streams.
>
> I read the documentation on configuration and also googled but didn't find a
> clue. Is there a way to configure how the receivers are distributed?
>
> Thanks!
>
> Here are some details:
> 
> How I created 10 receivers:
>
> val conf = new SparkConf().setAppName(jobId)
> val sc = new StreamingContext(conf, Seconds(1))
> var lines:DStream[String] =
>   sc.receiverStream(
>   new CustomReceiver(...)
>   )
> for(i <- 1 to 9) {
> lines = lines.union(
>sc.receiverStream(
>  new CustomReceiver(...)
>   )
> }
>
> How I submit a job to Yarn:
>
> spark-submit \
> --class $JOB_CLASS \
> --master yarn-client \
> --num-executors 10 \
> --driver-memory 1g \
> --executor-memory 2g \
> --executor-cores 4 \
> $JAR_NAME
>


Re: hadoopRDD stalls reading entire directory

2014-06-02 Thread Russell Jurney
If it matters, I have servers running at
http://hivecluster2:4040/stages/ and http://hivecluster2:4041/stages/

When I run rdd.first, I see an item at
http://hivecluster2:4041/stages/ but no tasks are running. Stage ID 1,
first at :46, Tasks: Succeeded/Total 0/16.

On Mon, Jun 2, 2014 at 10:09 AM, Russell Jurney
 wrote:
> Looks like just worker and master processes are running:
>
> [hivedata@hivecluster2 ~]$ jps
>
> 10425 Jps
>
> [hivedata@hivecluster2 ~]$ ps aux|grep spark
>
> hivedata 10424  0.0  0.0 103248   820 pts/3S+   10:05   0:00 grep spark
>
> root 10918  0.5  1.4 4752880 230512 ?  Sl   May27  41:43 java -cp
> :/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/conf:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/jline.jar
> -Dspark.akka.logLifecycleEvents=true
> -Djava.library.path=/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
> -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip hivecluster2
> --port 7077 --webui-port 18080
>
> root 12715  0.0  0.0 148028   656 ?SMay27   0:00 sudo
> /opt/cloudera/parcels/SPARK/lib/spark/bin/spark-class
> org.apache.spark.deploy.worker.Worker spark://hivecluster2:7077
>
> root 12716  0.3  1.1 4155884 191340 ?  Sl   May27  30:21 java -cp
> :/opt/cloudera/parcels/SPARK/lib/spark/conf:/opt/cloudera/parcels/SPARK/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/jline.jar
> -Dspark.akka.logLifecycleEvents=true
> -Djava.library.path=/opt/cloudera/parcels/SPARK/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
> -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker
> spark://hivecluster2:7077
>
>
>
>
> On Sun, Jun 1, 2014 at 7:41 PM, Aaron Davidson  wrote:
>>
>> Sounds like you have two shells running, and the first one is talking all
>> your resources. Do a "jps" and kill the other guy, then try again.
>>
>> By the way, you can look at http://localhost:8080 (replace localhost with
>> the server your Spark Master is running on) to see what applications are
>> currently started, and what resource allocations they have.
>>
>>
>> On Sun, Jun 1, 2014 at 6:47 PM, Russell Jurney 
>> wrote:
>>>
>>> Thanks again. Run results here:
>>> https://gist.github.com/rjurney/dc0efae486ba7d55b7d5
>>>
>>> This time I get a port already in use exception on 4040, but it isn't
>>> fatal. Then when I run rdd.first, I get this over and over:
>>>
>>> 14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Jun 1, 2014 at 3:09 PM, Aaron Davidson 
>>> wrote:

 You can avoid that by using the constructor that takes a SparkConf, a la

 val conf = new SparkConf()
 conf.setJars("avro.jar", ...)
 val sc = new SparkContext(conf)


 On Sun, Jun 1, 2014 at 2:32 PM, Russell Jurney
  wrote:
>
> Followup question: the docs to make a new SparkContext require that I
> know where $SPARK_HOME is. However, I have no idea. Any idea where that
> might be?
>
>
> On Sun, Jun 1, 2014 at 10:28 AM, Aaron Davidson 
> wrote:
>>
>> Gotcha. The easiest way to get your dependencies to your Executors
>> would probably be to construct your SparkContext with all necessary jars
>> passed in (a

Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Xu (Simon) Chen
I asked several people, no one seems to believe that we can do this:
$ PYTHONPATH=/path/to/assembly/jar python
>>> import pyspark

This following pull request did mention something about generating a zip
file for all python related modules:
https://www.mail-archive.com/reviews@spark.apache.org/msg08223.html

I've tested that zipped modules can as least be imported via zipimport.

Any ideas?

-Simon



On Mon, Jun 2, 2014 at 11:50 AM, Andrew Or  wrote:

> Hi Simon,
>
> You shouldn't have to install pyspark on every worker node. In YARN mode,
> pyspark is packaged into your assembly jar and shipped to your executors
> automatically. This seems like a more general problem. There are a few
> things to try:
>
> 1) Run a simple pyspark shell with yarn-client, and do
> "sc.parallelize(range(10)).count()" to see if you get the same error
> 2) If so, check if your assembly jar is compiled correctly. Run
>
> $ jar -tf  pyspark
> $ jar -tf  py4j
>
> to see if the files are there. For Py4j, you need both the python files
> and the Java class files.
>
> 3) If the files are there, try running a simple python shell (not pyspark
> shell) with the assembly jar on the PYTHONPATH:
>
> $ PYTHONPATH=/path/to/assembly/jar python
> >>> import pyspark
>
> 4) If that works, try it on every worker node. If it doesn't work, there
> is probably something wrong with your jar.
>
> There is a known issue for PySpark on YARN - jars built with Java 7 cannot
> be properly opened by Java 6. I would either verify that the JAVA_HOME set
> on all of your workers points to Java 7 (by setting SPARK_YARN_USER_ENV),
> or simply build your jar with Java 6:
>
> $ cd /path/to/spark/home
> $ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
> 2.3.0-cdh5.0.0
>
> 5) You can check out
> http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application,
> which has more detailed information about how to debug running an
> application on YARN in general. In my experience, the steps outlined there
> are quite useful.
>
> Let me know if you get it working (or not).
>
> Cheers,
> Andrew
>
>
>
> 2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :
>
> Hi folks,
>>
>> I have a weird problem when using pyspark with yarn. I started ipython as
>> follows:
>>
>> IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4
>> --num-executors 4 --executor-memory 4G
>>
>> When I create a notebook, I can see workers being created and indeed I
>> see spark UI running on my client machine on port 4040.
>>
>> I have the following simple script:
>> """
>> import pyspark
>> data = sc.textFile("hdfs://test/tmp/data/*").cache()
>> oneday = data.map(lambda line: line.split(",")).\
>>   map(lambda f: (f[0], float(f[1]))).\
>>   filter(lambda t: t[0] >= "2013-01-01" and t[0] <
>> "2013-01-02").\
>>   map(lambda t: (parser.parse(t[0]), t[1]))
>> oneday.take(1)
>> """
>>
>> By executing this, I see that it is my client machine (where ipython is
>> launched) is reading all the data from HDFS, and produce the result of
>> take(1), rather than my worker nodes...
>>
>> When I do "data.count()", things would blow up altogether. But I do see
>> in the error message something like this:
>> """
>>
>> Error from python worker:
>>   /usr/bin/python: No module named pyspark
>>
>> """
>>
>>
>> Am I supposed to install pyspark on every worker node?
>>
>>
>> Thanks.
>>
>> -Simon
>>
>>
>


Re: spark 1.0.0 on yarn

2014-06-02 Thread Patrick Wendell
Okay I'm guessing that our upstreaming "Hadoop2" package isn't new
enough to work with CDH5. We should probably clarify this in our
downloads. Thanks for reporting this. What was the exact string you
used when building? Also which CDH-5 version are you building against?

On Mon, Jun 2, 2014 at 8:11 AM, Xu (Simon) Chen  wrote:
> OK, rebuilding the assembly jar file with cdh5 works now...
> Thanks..
>
> -Simon
>
>
> On Sun, Jun 1, 2014 at 9:37 PM, Xu (Simon) Chen  wrote:
>>
>> That helped a bit... Now I have a different failure: the start up process
>> is stuck in an infinite loop outputting the following message:
>>
>> 14/06/02 01:34:56 INFO cluster.YarnClientSchedulerBackend: Application
>> report from ASM:
>> appMasterRpcPort: -1
>> appStartTime: 1401672868277
>> yarnAppState: ACCEPTED
>>
>> I am using the hadoop 2 prebuild package. Probably it doesn't have the
>> latest yarn client.
>>
>> -Simon
>>
>>
>>
>>
>> On Sun, Jun 1, 2014 at 9:03 PM, Patrick Wendell 
>> wrote:
>>>
>>> As a debugging step, does it work if you use a single resource manager
>>> with the key "yarn.resourcemanager.address" instead of using two named
>>> resource managers? I wonder if somehow the YARN client can't detect
>>> this multi-master set-up.
>>>
>>> On Sun, Jun 1, 2014 at 12:49 PM, Xu (Simon) Chen 
>>> wrote:
>>> > Note that everything works fine in spark 0.9, which is packaged in
>>> > CDH5: I
>>> > can launch a spark-shell and interact with workers spawned on my yarn
>>> > cluster.
>>> >
>>> > So in my /opt/hadoop/conf/yarn-site.xml, I have:
>>> > ...
>>> > 
>>> > yarn.resourcemanager.address.rm1
>>> > controller-1.mycomp.com:23140
>>> > 
>>> > ...
>>> > 
>>> > yarn.resourcemanager.address.rm2
>>> > controller-2.mycomp.com:23140
>>> > 
>>> > ...
>>> >
>>> > And the other usual stuff.
>>> >
>>> > So spark 1.0 is launched like this:
>>> > Spark Command: java -cp
>>> >
>>> > ::/home/chenxu/spark-1.0.0-bin-hadoop2/conf:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/opt/hadoop/conf
>>> > -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>>> > org.apache.spark.deploy.SparkSubmit spark-shell --master yarn-client
>>> > --class
>>> > org.apache.spark.repl.Main
>>> >
>>> > I do see "/opt/hadoop/conf" included, but not sure it's the right
>>> > place.
>>> >
>>> > Thanks..
>>> > -Simon
>>> >
>>> >
>>> >
>>> > On Sun, Jun 1, 2014 at 1:57 PM, Patrick Wendell 
>>> > wrote:
>>> >>
>>> >> I would agree with your guess, it looks like the yarn library isn't
>>> >> correctly finding your yarn-site.xml file. If you look in
>>> >> yarn-site.xml do you definitely the resource manager
>>> >> address/addresses?
>>> >>
>>> >> Also, you can try running this command with
>>> >> SPARK_PRINT_LAUNCH_COMMAND=1 to make sure the classpath is being
>>> >> set-up correctly.
>>> >>
>>> >> - Patrick
>>> >>
>>> >> On Sat, May 31, 2014 at 5:51 PM, Xu (Simon) Chen 
>>> >> wrote:
>>> >> > Hi all,
>>> >> >
>>> >> > I tried a couple ways, but couldn't get it to work..
>>> >> >
>>> >> > The following seems to be what the online document
>>> >> > (http://spark.apache.org/docs/latest/running-on-yarn.html) is
>>> >> > suggesting:
>>> >> >
>>> >> >
>>> >> > SPARK_JAR=hdfs://test/user/spark/share/lib/spark-assembly-1.0.0-hadoop2.2.0.jar
>>> >> > YARN_CONF_DIR=/opt/hadoop/conf ./spark-shell --master yarn-client
>>> >> >
>>> >> > Help info of spark-shell seems to be suggesting "--master yarn
>>> >> > --deploy-mode
>>> >> > cluster".
>>> >> >
>>> >> > But either way, I am seeing the following messages:
>>> >> > 14/06/01 00:33:20 INFO client.RMProxy: Connecting to ResourceManager
>>> >> > at
>>> >> > /0.0.0.0:8032
>>> >> > 14/06/01 00:33:21 INFO ipc.Client: Retrying connect to server:
>>> >> > 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is
>>> >> > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1
>>> >> > SECONDS)
>>> >> > 14/06/01 00:33:22 INFO ipc.Client: Retrying connect to server:
>>> >> > 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is
>>> >> > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1
>>> >> > SECONDS)
>>> >> >
>>> >> > My guess is that spark-shell is trying to talk to resource manager
>>> >> > to
>>> >> > setup
>>> >> > spark master/worker nodes - I am not sure where 0.0.0.0:8032 came
>>> >> > from
>>> >> > though. I am running CDH5 with two resource managers in HA mode.
>>> >> > Their
>>> >> > IP/port should be in /opt/hadoop/conf/yarn-site.xml. I tried both
>>> >> > HADOOP_CONF_DIR and YARN_CONF_DIR, but that info isn't picked up.
>>> >> >
>>> >> > Any ideas? Thanks.
>>> >> > -Simon
>>> >
>>> >
>>
>>
>


Re: spark 1.0.0 on yarn

2014-06-02 Thread Xu (Simon) Chen
I built my new package like this:
"mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.0.1 -DskipTests clean
package"

Spark-shell is working now, but pyspark is still broken. I reported the
problem on a different thread. Please take a look if you can... Desperately
need ideas..

Thanks.
-Simon


On Mon, Jun 2, 2014 at 2:47 PM, Patrick Wendell  wrote:

> Okay I'm guessing that our upstreaming "Hadoop2" package isn't new
> enough to work with CDH5. We should probably clarify this in our
> downloads. Thanks for reporting this. What was the exact string you
> used when building? Also which CDH-5 version are you building against?
>
> On Mon, Jun 2, 2014 at 8:11 AM, Xu (Simon) Chen  wrote:
> > OK, rebuilding the assembly jar file with cdh5 works now...
> > Thanks..
> >
> > -Simon
> >
> >
> > On Sun, Jun 1, 2014 at 9:37 PM, Xu (Simon) Chen 
> wrote:
> >>
> >> That helped a bit... Now I have a different failure: the start up
> process
> >> is stuck in an infinite loop outputting the following message:
> >>
> >> 14/06/02 01:34:56 INFO cluster.YarnClientSchedulerBackend: Application
> >> report from ASM:
> >> appMasterRpcPort: -1
> >> appStartTime: 1401672868277
> >> yarnAppState: ACCEPTED
> >>
> >> I am using the hadoop 2 prebuild package. Probably it doesn't have the
> >> latest yarn client.
> >>
> >> -Simon
> >>
> >>
> >>
> >>
> >> On Sun, Jun 1, 2014 at 9:03 PM, Patrick Wendell 
> >> wrote:
> >>>
> >>> As a debugging step, does it work if you use a single resource manager
> >>> with the key "yarn.resourcemanager.address" instead of using two named
> >>> resource managers? I wonder if somehow the YARN client can't detect
> >>> this multi-master set-up.
> >>>
> >>> On Sun, Jun 1, 2014 at 12:49 PM, Xu (Simon) Chen 
> >>> wrote:
> >>> > Note that everything works fine in spark 0.9, which is packaged in
> >>> > CDH5: I
> >>> > can launch a spark-shell and interact with workers spawned on my yarn
> >>> > cluster.
> >>> >
> >>> > So in my /opt/hadoop/conf/yarn-site.xml, I have:
> >>> > ...
> >>> > 
> >>> > yarn.resourcemanager.address.rm1
> >>> > controller-1.mycomp.com:23140
> >>> > 
> >>> > ...
> >>> > 
> >>> > yarn.resourcemanager.address.rm2
> >>> > controller-2.mycomp.com:23140
> >>> > 
> >>> > ...
> >>> >
> >>> > And the other usual stuff.
> >>> >
> >>> > So spark 1.0 is launched like this:
> >>> > Spark Command: java -cp
> >>> >
> >>> >
> ::/home/chenxu/spark-1.0.0-bin-hadoop2/conf:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/home/chenxu/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/opt/hadoop/conf
> >>> > -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
> >>> > org.apache.spark.deploy.SparkSubmit spark-shell --master yarn-client
> >>> > --class
> >>> > org.apache.spark.repl.Main
> >>> >
> >>> > I do see "/opt/hadoop/conf" included, but not sure it's the right
> >>> > place.
> >>> >
> >>> > Thanks..
> >>> > -Simon
> >>> >
> >>> >
> >>> >
> >>> > On Sun, Jun 1, 2014 at 1:57 PM, Patrick Wendell 
> >>> > wrote:
> >>> >>
> >>> >> I would agree with your guess, it looks like the yarn library isn't
> >>> >> correctly finding your yarn-site.xml file. If you look in
> >>> >> yarn-site.xml do you definitely the resource manager
> >>> >> address/addresses?
> >>> >>
> >>> >> Also, you can try running this command with
> >>> >> SPARK_PRINT_LAUNCH_COMMAND=1 to make sure the classpath is being
> >>> >> set-up correctly.
> >>> >>
> >>> >> - Patrick
> >>> >>
> >>> >> On Sat, May 31, 2014 at 5:51 PM, Xu (Simon) Chen  >
> >>> >> wrote:
> >>> >> > Hi all,
> >>> >> >
> >>> >> > I tried a couple ways, but couldn't get it to work..
> >>> >> >
> >>> >> > The following seems to be what the online document
> >>> >> > (http://spark.apache.org/docs/latest/running-on-yarn.html) is
> >>> >> > suggesting:
> >>> >> >
> >>> >> >
> >>> >> >
> SPARK_JAR=hdfs://test/user/spark/share/lib/spark-assembly-1.0.0-hadoop2.2.0.jar
> >>> >> > YARN_CONF_DIR=/opt/hadoop/conf ./spark-shell --master yarn-client
> >>> >> >
> >>> >> > Help info of spark-shell seems to be suggesting "--master yarn
> >>> >> > --deploy-mode
> >>> >> > cluster".
> >>> >> >
> >>> >> > But either way, I am seeing the following messages:
> >>> >> > 14/06/01 00:33:20 INFO client.RMProxy: Connecting to
> ResourceManager
> >>> >> > at
> >>> >> > /0.0.0.0:8032
> >>> >> > 14/06/01 00:33:21 INFO ipc.Client: Retrying connect to server:
> >>> >> > 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is
> >>> >> > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1
> >>> >> > SECONDS)
> >>> >> > 14/06/01 00:33:22 INFO ipc.Client: Retrying connect to server:
> >>> >> > 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is
> >>> >> > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1
> >>> >> > SECONDS)
> 

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Patrick Wendell
Hey There,

The issue was that the old behavior could cause users to silently
overwrite data, which is pretty bad, so to be conservative we decided
to enforce the same checks that Hadoop does.

This was documented by this JIRA:
https://issues.apache.org/jira/browse/SPARK-1100
https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1

However, it would be very easy to add an option that allows preserving
the old behavior. Is anyone here interested in contributing that? I
created a JIRA for it:

https://issues.apache.org/jira/browse/SPARK-1993

- Patrick

On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
 wrote:
> Indeed, the behavior has changed for good or for bad. I mean, I agree with
> the danger you mention but I'm not sure it's happening like that. Isn't
> there a mechanism for overwrite in Hadoop that automatically removes part
> files, then writes a _temporary folder and then only the part files along
> with the _success folder.
>
> In any case this change of behavior should be documented IMO.
>
> Cheers
> Pierre
>
> Message sent from a mobile device - excuse typos and abbreviations
>
> Le 2 juin 2014 à 17:42, Nicholas Chammas  a
> écrit :
>
> What I've found using saveAsTextFile() against S3 (prior to Spark 1.0.0.) is
> that files get overwritten automatically. This is one danger to this though.
> If I save to a directory that already has 20 part- files, but this time
> around I'm only saving 15 part- files, then there will be 5 leftover part-
> files from the previous set mixed in with the 15 newer files. This is
> potentially dangerous.
>
> I haven't checked to see if this behavior has changed in 1.0.0. Are you
> saying it has, Pierre?
>
> On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
> [pierre.borckm...@realimpactanalytics.com](mailto:pierre.borckm...@realimpactanalytics.com)
> wrote:
>>
>> Hi Michaël,
>>
>> Thanks for this. We could indeed do that.
>>
>> But I guess the question is more about the change of behaviour from 0.9.1
>> to
>> 1.0.0.
>> We never had to care about that in previous versions.
>>
>> Does that mean we have to manually remove existing files or is there a way
>> to "aumotically" overwrite when using saveAsTextFile?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Xu (Simon) Chen
OK, my colleague found this:
https://mail.python.org/pipermail/python-list/2014-May/671353.html

And my jar file has 70011 files. Fantastic..




On Mon, Jun 2, 2014 at 2:34 PM, Xu (Simon) Chen  wrote:

> I asked several people, no one seems to believe that we can do this:
> $ PYTHONPATH=/path/to/assembly/jar python
> >>> import pyspark
>
> This following pull request did mention something about generating a zip
> file for all python related modules:
> https://www.mail-archive.com/reviews@spark.apache.org/msg08223.html
>
> I've tested that zipped modules can as least be imported via zipimport.
>
> Any ideas?
>
> -Simon
>
>
>
> On Mon, Jun 2, 2014 at 11:50 AM, Andrew Or  wrote:
>
>> Hi Simon,
>>
>> You shouldn't have to install pyspark on every worker node. In YARN mode,
>> pyspark is packaged into your assembly jar and shipped to your executors
>> automatically. This seems like a more general problem. There are a few
>> things to try:
>>
>> 1) Run a simple pyspark shell with yarn-client, and do
>> "sc.parallelize(range(10)).count()" to see if you get the same error
>> 2) If so, check if your assembly jar is compiled correctly. Run
>>
>> $ jar -tf  pyspark
>> $ jar -tf  py4j
>>
>> to see if the files are there. For Py4j, you need both the python files
>> and the Java class files.
>>
>> 3) If the files are there, try running a simple python shell (not pyspark
>> shell) with the assembly jar on the PYTHONPATH:
>>
>> $ PYTHONPATH=/path/to/assembly/jar python
>> >>> import pyspark
>>
>> 4) If that works, try it on every worker node. If it doesn't work, there
>> is probably something wrong with your jar.
>>
>> There is a known issue for PySpark on YARN - jars built with Java 7
>> cannot be properly opened by Java 6. I would either verify that the
>> JAVA_HOME set on all of your workers points to Java 7 (by setting
>> SPARK_YARN_USER_ENV), or simply build your jar with Java 6:
>>
>> $ cd /path/to/spark/home
>> $ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
>> 2.3.0-cdh5.0.0
>>
>> 5) You can check out
>> http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application,
>> which has more detailed information about how to debug running an
>> application on YARN in general. In my experience, the steps outlined there
>> are quite useful.
>>
>> Let me know if you get it working (or not).
>>
>> Cheers,
>> Andrew
>>
>>
>>
>> 2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :
>>
>> Hi folks,
>>>
>>> I have a weird problem when using pyspark with yarn. I started ipython
>>> as follows:
>>>
>>> IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4
>>> --num-executors 4 --executor-memory 4G
>>>
>>> When I create a notebook, I can see workers being created and indeed I
>>> see spark UI running on my client machine on port 4040.
>>>
>>> I have the following simple script:
>>> """
>>> import pyspark
>>> data = sc.textFile("hdfs://test/tmp/data/*").cache()
>>> oneday = data.map(lambda line: line.split(",")).\
>>>   map(lambda f: (f[0], float(f[1]))).\
>>>   filter(lambda t: t[0] >= "2013-01-01" and t[0] <
>>> "2013-01-02").\
>>>   map(lambda t: (parser.parse(t[0]), t[1]))
>>> oneday.take(1)
>>> """
>>>
>>> By executing this, I see that it is my client machine (where ipython is
>>> launched) is reading all the data from HDFS, and produce the result of
>>> take(1), rather than my worker nodes...
>>>
>>> When I do "data.count()", things would blow up altogether. But I do see
>>> in the error message something like this:
>>> """
>>>
>>> Error from python worker:
>>>   /usr/bin/python: No module named pyspark
>>>
>>> """
>>>
>>>
>>> Am I supposed to install pyspark on every worker node?
>>>
>>>
>>> Thanks.
>>>
>>> -Simon
>>>
>>>
>>
>


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nan Zhu
Hi, Patrick,   

I think https://issues.apache.org/jira/browse/SPARK-1677 is talking about the 
same thing?

How about assigning it to me?  

I think I missed the configuration part in my previous commit, though I 
declared that in the PR description….

Best,  

--  
Nan Zhu


On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:

> Hey There,
>  
> The issue was that the old behavior could cause users to silently
> overwrite data, which is pretty bad, so to be conservative we decided
> to enforce the same checks that Hadoop does.
>  
> This was documented by this JIRA:
> https://issues.apache.org/jira/browse/SPARK-1100
> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>  
> However, it would be very easy to add an option that allows preserving
> the old behavior. Is anyone here interested in contributing that? I
> created a JIRA for it:
>  
> https://issues.apache.org/jira/browse/SPARK-1993
>  
> - Patrick
>  
> On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
>  (mailto:pierre.borckm...@realimpactanalytics.com)> wrote:
> > Indeed, the behavior has changed for good or for bad. I mean, I agree with
> > the danger you mention but I'm not sure it's happening like that. Isn't
> > there a mechanism for overwrite in Hadoop that automatically removes part
> > files, then writes a _temporary folder and then only the part files along
> > with the _success folder.
> >  
> > In any case this change of behavior should be documented IMO.
> >  
> > Cheers
> > Pierre
> >  
> > Message sent from a mobile device - excuse typos and abbreviations
> >  
> > Le 2 juin 2014 à 17:42, Nicholas Chammas  > (mailto:nicholas.cham...@gmail.com)> a
> > écrit :
> >  
> > What I've found using saveAsTextFile() against S3 (prior to Spark 1.0.0.) is
> > that files get overwritten automatically. This is one danger to this though.
> > If I save to a directory that already has 20 part- files, but this time
> > around I'm only saving 15 part- files, then there will be 5 leftover part-
> > files from the previous set mixed in with the 15 newer files. This is
> > potentially dangerous.
> >  
> > I haven't checked to see if this behavior has changed in 1.0.0. Are you
> > saying it has, Pierre?
> >  
> > On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
> > [pierre.borckm...@realimpactanalytics.com](mailto:pierre.borckm...@realimpactanalytics.com)
> > wrote:
> > >  
> > > Hi Michaël,
> > >  
> > > Thanks for this. We could indeed do that.
> > >  
> > > But I guess the question is more about the change of behaviour from 0.9.1
> > > to
> > > 1.0.0.
> > > We never had to care about that in previous versions.
> > >  
> > > Does that mean we have to manually remove existing files or is there a way
> > > to "aumotically" overwrite when using saveAsTextFile?
> > >  
> > >  
> > >  
> > > --
> > > View this message in context:
> > > http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
> > > Sent from the Apache Spark User List mailing list archive at Nabble.com 
> > > (http://Nabble.com).
> > >  
> >  
> >  
>  
>  
>  




Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Patrick Wendell
Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
I accidentally assigned myself way back when I created it). This
should be an easy fix.

On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  wrote:
> Hi, Patrick,
>
> I think https://issues.apache.org/jira/browse/SPARK-1677 is talking about
> the same thing?
>
> How about assigning it to me?
>
> I think I missed the configuration part in my previous commit, though I
> declared that in the PR description
>
> Best,
>
> --
> Nan Zhu
>
> On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
>
> Hey There,
>
> The issue was that the old behavior could cause users to silently
> overwrite data, which is pretty bad, so to be conservative we decided
> to enforce the same checks that Hadoop does.
>
> This was documented by this JIRA:
> https://issues.apache.org/jira/browse/SPARK-1100
> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>
> However, it would be very easy to add an option that allows preserving
> the old behavior. Is anyone here interested in contributing that? I
> created a JIRA for it:
>
> https://issues.apache.org/jira/browse/SPARK-1993
>
> - Patrick
>
> On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
>  wrote:
>
> Indeed, the behavior has changed for good or for bad. I mean, I agree with
> the danger you mention but I'm not sure it's happening like that. Isn't
> there a mechanism for overwrite in Hadoop that automatically removes part
> files, then writes a _temporary folder and then only the part files along
> with the _success folder.
>
> In any case this change of behavior should be documented IMO.
>
> Cheers
> Pierre
>
> Message sent from a mobile device - excuse typos and abbreviations
>
> Le 2 juin 2014 à 17:42, Nicholas Chammas  a
> écrit :
>
> What I've found using saveAsTextFile() against S3 (prior to Spark 1.0.0.) is
> that files get overwritten automatically. This is one danger to this though.
> If I save to a directory that already has 20 part- files, but this time
> around I'm only saving 15 part- files, then there will be 5 leftover part-
> files from the previous set mixed in with the 15 newer files. This is
> potentially dangerous.
>
> I haven't checked to see if this behavior has changed in 1.0.0. Are you
> saying it has, Pierre?
>
> On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
> [pierre.borckm...@realimpactanalytics.com](mailto:pierre.borckm...@realimpactanalytics.com)
> wrote:
>
>
> Hi Michaël,
>
> Thanks for this. We could indeed do that.
>
> But I guess the question is more about the change of behaviour from 0.9.1
> to
> 1.0.0.
> We never had to care about that in previous versions.
>
> Does that mean we have to manually remove existing files or is there a way
> to "aumotically" overwrite when using saveAsTextFile?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>


Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Patrick Wendell
Are you building Spark with Java 6 or Java 7. Java 6 uses the extended
Zip format and Java 7 uses Zip64. I think we've tried to add some
build warnings if Java 7 is used, for this reason:

https://github.com/apache/spark/blob/master/make-distribution.sh#L102

Any luck if you use JDK 6 to compile?


On Mon, Jun 2, 2014 at 12:03 PM, Xu (Simon) Chen  wrote:
> OK, my colleague found this:
> https://mail.python.org/pipermail/python-list/2014-May/671353.html
>
> And my jar file has 70011 files. Fantastic..
>
>
>
>
> On Mon, Jun 2, 2014 at 2:34 PM, Xu (Simon) Chen  wrote:
>>
>> I asked several people, no one seems to believe that we can do this:
>> $ PYTHONPATH=/path/to/assembly/jar python
>> >>> import pyspark
>>
>> This following pull request did mention something about generating a zip
>> file for all python related modules:
>> https://www.mail-archive.com/reviews@spark.apache.org/msg08223.html
>>
>> I've tested that zipped modules can as least be imported via zipimport.
>>
>> Any ideas?
>>
>> -Simon
>>
>>
>>
>> On Mon, Jun 2, 2014 at 11:50 AM, Andrew Or  wrote:
>>>
>>> Hi Simon,
>>>
>>> You shouldn't have to install pyspark on every worker node. In YARN mode,
>>> pyspark is packaged into your assembly jar and shipped to your executors
>>> automatically. This seems like a more general problem. There are a few
>>> things to try:
>>>
>>> 1) Run a simple pyspark shell with yarn-client, and do
>>> "sc.parallelize(range(10)).count()" to see if you get the same error
>>> 2) If so, check if your assembly jar is compiled correctly. Run
>>>
>>> $ jar -tf  pyspark
>>> $ jar -tf  py4j
>>>
>>> to see if the files are there. For Py4j, you need both the python files
>>> and the Java class files.
>>>
>>> 3) If the files are there, try running a simple python shell (not pyspark
>>> shell) with the assembly jar on the PYTHONPATH:
>>>
>>> $ PYTHONPATH=/path/to/assembly/jar python
>>> >>> import pyspark
>>>
>>> 4) If that works, try it on every worker node. If it doesn't work, there
>>> is probably something wrong with your jar.
>>>
>>> There is a known issue for PySpark on YARN - jars built with Java 7
>>> cannot be properly opened by Java 6. I would either verify that the
>>> JAVA_HOME set on all of your workers points to Java 7 (by setting
>>> SPARK_YARN_USER_ENV), or simply build your jar with Java 6:
>>>
>>> $ cd /path/to/spark/home
>>> $ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
>>> 2.3.0-cdh5.0.0
>>>
>>> 5) You can check out
>>> http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application,
>>> which has more detailed information about how to debug running an
>>> application on YARN in general. In my experience, the steps outlined there
>>> are quite useful.
>>>
>>> Let me know if you get it working (or not).
>>>
>>> Cheers,
>>> Andrew
>>>
>>>
>>>
>>> 2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :
>>>
 Hi folks,

 I have a weird problem when using pyspark with yarn. I started ipython
 as follows:

 IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4
 --num-executors 4 --executor-memory 4G

 When I create a notebook, I can see workers being created and indeed I
 see spark UI running on my client machine on port 4040.

 I have the following simple script:
 """
 import pyspark
 data = sc.textFile("hdfs://test/tmp/data/*").cache()
 oneday = data.map(lambda line: line.split(",")).\
   map(lambda f: (f[0], float(f[1]))).\
   filter(lambda t: t[0] >= "2013-01-01" and t[0] <
 "2013-01-02").\
   map(lambda t: (parser.parse(t[0]), t[1]))
 oneday.take(1)
 """

 By executing this, I see that it is my client machine (where ipython is
 launched) is reading all the data from HDFS, and produce the result of
 take(1), rather than my worker nodes...

 When I do "data.count()", things would blow up altogether. But I do see
 in the error message something like this:
 """

 Error from python worker:
   /usr/bin/python: No module named pyspark

 """


 Am I supposed to install pyspark on every worker node?


 Thanks.

 -Simon
>>>
>>>
>>
>


How to create RDDs from another RDD?

2014-06-02 Thread Gerard Maas
The RDD API has  functions to join multiple RDDs, such as PariRDD.join
or PariRDD.cogroup that take another RDD as input. e.g.
 firstRDD.join(secondRDD)

I'm looking for ways to do the opposite: split an existing RDD. What is the
right way to create derivate RDDs from an existing RDD?

e.g. imagine I've an  collection or pairs as input: colRDD =
 (k1->v1)...(kx->vy)...
I could do:
val byKey = colRDD.groupByKey() = (k1->(k1->v1... k1->vn)),...(kn->(kn->vy,
...))

Now, I'd like to create an RDD from the values to have something like:

val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))

in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?

Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the
right/recommended way to do this?  Any other options?

Thanks,

Gerard.


Re: pyspark problems on yarn (job not parallelized, and Py4JJavaError)

2014-06-02 Thread Xu (Simon) Chen
Nope... didn't try java 6. The standard installation guide didn't say
anything about java 7 and suggested to do "-DskipTests" for the build..
http://spark.apache.org/docs/latest/building-with-maven.html

So, I didn't see the warning message...


On Mon, Jun 2, 2014 at 3:48 PM, Patrick Wendell  wrote:

> Are you building Spark with Java 6 or Java 7. Java 6 uses the extended
> Zip format and Java 7 uses Zip64. I think we've tried to add some
> build warnings if Java 7 is used, for this reason:
>
> https://github.com/apache/spark/blob/master/make-distribution.sh#L102
>
> Any luck if you use JDK 6 to compile?
>
>
> On Mon, Jun 2, 2014 at 12:03 PM, Xu (Simon) Chen 
> wrote:
> > OK, my colleague found this:
> > https://mail.python.org/pipermail/python-list/2014-May/671353.html
> >
> > And my jar file has 70011 files. Fantastic..
> >
> >
> >
> >
> > On Mon, Jun 2, 2014 at 2:34 PM, Xu (Simon) Chen 
> wrote:
> >>
> >> I asked several people, no one seems to believe that we can do this:
> >> $ PYTHONPATH=/path/to/assembly/jar python
> >> >>> import pyspark
> >>
> >> This following pull request did mention something about generating a zip
> >> file for all python related modules:
> >> https://www.mail-archive.com/reviews@spark.apache.org/msg08223.html
> >>
> >> I've tested that zipped modules can as least be imported via zipimport.
> >>
> >> Any ideas?
> >>
> >> -Simon
> >>
> >>
> >>
> >> On Mon, Jun 2, 2014 at 11:50 AM, Andrew Or 
> wrote:
> >>>
> >>> Hi Simon,
> >>>
> >>> You shouldn't have to install pyspark on every worker node. In YARN
> mode,
> >>> pyspark is packaged into your assembly jar and shipped to your
> executors
> >>> automatically. This seems like a more general problem. There are a few
> >>> things to try:
> >>>
> >>> 1) Run a simple pyspark shell with yarn-client, and do
> >>> "sc.parallelize(range(10)).count()" to see if you get the same error
> >>> 2) If so, check if your assembly jar is compiled correctly. Run
> >>>
> >>> $ jar -tf  pyspark
> >>> $ jar -tf  py4j
> >>>
> >>> to see if the files are there. For Py4j, you need both the python files
> >>> and the Java class files.
> >>>
> >>> 3) If the files are there, try running a simple python shell (not
> pyspark
> >>> shell) with the assembly jar on the PYTHONPATH:
> >>>
> >>> $ PYTHONPATH=/path/to/assembly/jar python
> >>> >>> import pyspark
> >>>
> >>> 4) If that works, try it on every worker node. If it doesn't work,
> there
> >>> is probably something wrong with your jar.
> >>>
> >>> There is a known issue for PySpark on YARN - jars built with Java 7
> >>> cannot be properly opened by Java 6. I would either verify that the
> >>> JAVA_HOME set on all of your workers points to Java 7 (by setting
> >>> SPARK_YARN_USER_ENV), or simply build your jar with Java 6:
> >>>
> >>> $ cd /path/to/spark/home
> >>> $ JAVA_HOME=/path/to/java6 ./make-distribution --with-yarn --hadoop
> >>> 2.3.0-cdh5.0.0
> >>>
> >>> 5) You can check out
> >>>
> http://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application
> ,
> >>> which has more detailed information about how to debug running an
> >>> application on YARN in general. In my experience, the steps outlined
> there
> >>> are quite useful.
> >>>
> >>> Let me know if you get it working (or not).
> >>>
> >>> Cheers,
> >>> Andrew
> >>>
> >>>
> >>>
> >>> 2014-06-02 17:24 GMT+02:00 Xu (Simon) Chen :
> >>>
>  Hi folks,
> 
>  I have a weird problem when using pyspark with yarn. I started ipython
>  as follows:
> 
>  IPYTHON=1 ./pyspark --master yarn-client --executor-cores 4
>  --num-executors 4 --executor-memory 4G
> 
>  When I create a notebook, I can see workers being created and indeed I
>  see spark UI running on my client machine on port 4040.
> 
>  I have the following simple script:
>  """
>  import pyspark
>  data = sc.textFile("hdfs://test/tmp/data/*").cache()
>  oneday = data.map(lambda line: line.split(",")).\
>    map(lambda f: (f[0], float(f[1]))).\
>    filter(lambda t: t[0] >= "2013-01-01" and t[0] <
>  "2013-01-02").\
>    map(lambda t: (parser.parse(t[0]), t[1]))
>  oneday.take(1)
>  """
> 
>  By executing this, I see that it is my client machine (where ipython
> is
>  launched) is reading all the data from HDFS, and produce the result of
>  take(1), rather than my worker nodes...
> 
>  When I do "data.count()", things would blow up altogether. But I do
> see
>  in the error message something like this:
>  """
> 
>  Error from python worker:
>    /usr/bin/python: No module named pyspark
> 
>  """
> 
> 
>  Am I supposed to install pyspark on every worker node?
> 
> 
>  Thanks.
> 
>  -Simon
> >>>
> >>>
> >>
> >
>


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Aaron Davidson
+1 please re-add this feature


On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell  wrote:

> Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
> I accidentally assigned myself way back when I created it). This
> should be an easy fix.
>
> On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  wrote:
> > Hi, Patrick,
> >
> > I think https://issues.apache.org/jira/browse/SPARK-1677 is talking
> about
> > the same thing?
> >
> > How about assigning it to me?
> >
> > I think I missed the configuration part in my previous commit, though I
> > declared that in the PR description
> >
> > Best,
> >
> > --
> > Nan Zhu
> >
> > On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
> >
> > Hey There,
> >
> > The issue was that the old behavior could cause users to silently
> > overwrite data, which is pretty bad, so to be conservative we decided
> > to enforce the same checks that Hadoop does.
> >
> > This was documented by this JIRA:
> > https://issues.apache.org/jira/browse/SPARK-1100
> >
> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
> >
> > However, it would be very easy to add an option that allows preserving
> > the old behavior. Is anyone here interested in contributing that? I
> > created a JIRA for it:
> >
> > https://issues.apache.org/jira/browse/SPARK-1993
> >
> > - Patrick
> >
> > On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
> >  wrote:
> >
> > Indeed, the behavior has changed for good or for bad. I mean, I agree
> with
> > the danger you mention but I'm not sure it's happening like that. Isn't
> > there a mechanism for overwrite in Hadoop that automatically removes part
> > files, then writes a _temporary folder and then only the part files along
> > with the _success folder.
> >
> > In any case this change of behavior should be documented IMO.
> >
> > Cheers
> > Pierre
> >
> > Message sent from a mobile device - excuse typos and abbreviations
> >
> > Le 2 juin 2014 à 17:42, Nicholas Chammas  a
> > écrit :
> >
> > What I've found using saveAsTextFile() against S3 (prior to Spark
> 1.0.0.) is
> > that files get overwritten automatically. This is one danger to this
> though.
> > If I save to a directory that already has 20 part- files, but this time
> > around I'm only saving 15 part- files, then there will be 5 leftover
> part-
> > files from the previous set mixed in with the 15 newer files. This is
> > potentially dangerous.
> >
> > I haven't checked to see if this behavior has changed in 1.0.0. Are you
> > saying it has, Pierre?
> >
> > On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
> > [pierre.borckm...@realimpactanalytics.com](mailto:
> pierre.borckm...@realimpactanalytics.com)
> > wrote:
> >
> >
> > Hi Michaël,
> >
> > Thanks for this. We could indeed do that.
> >
> > But I guess the question is more about the change of behaviour from 0.9.1
> > to
> > 1.0.0.
> > We never had to care about that in previous versions.
> >
> > Does that mean we have to manually remove existing files or is there a
> way
> > to "aumotically" overwrite when using saveAsTextFile?
> >
> >
> >
> > --
> > View this message in context:
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> >
>


Re: hadoopRDD stalls reading entire directory

2014-06-02 Thread Aaron Davidson
You may have to do "sudo jps", because it should definitely list your
processes.

What does hivecluster2:8080 look like? My guess is it says there are 2
applications registered, and one has taken all the executors. There must be
two applications running, as those are the only things that keep open those
4040/4041 ports.


On Mon, Jun 2, 2014 at 11:32 AM, Russell Jurney 
wrote:

> If it matters, I have servers running at
> http://hivecluster2:4040/stages/ and http://hivecluster2:4041/stages/
>
> When I run rdd.first, I see an item at
> http://hivecluster2:4041/stages/ but no tasks are running. Stage ID 1,
> first at :46, Tasks: Succeeded/Total 0/16.
>
> On Mon, Jun 2, 2014 at 10:09 AM, Russell Jurney
>  wrote:
> > Looks like just worker and master processes are running:
> >
> > [hivedata@hivecluster2 ~]$ jps
> >
> > 10425 Jps
> >
> > [hivedata@hivecluster2 ~]$ ps aux|grep spark
> >
> > hivedata 10424  0.0  0.0 103248   820 pts/3S+   10:05   0:00 grep
> spark
> >
> > root 10918  0.5  1.4 4752880 230512 ?  Sl   May27  41:43 java -cp
> >
> :/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/conf:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib/jline.jar
> > -Dspark.akka.logLifecycleEvents=true
> >
> -Djava.library.path=/opt/cloudera/parcels/SPARK-0.9.0-1.cdh4.6.0.p0.98/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
> > -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip hivecluster2
> > --port 7077 --webui-port 18080
> >
> > root 12715  0.0  0.0 148028   656 ?SMay27   0:00 sudo
> > /opt/cloudera/parcels/SPARK/lib/spark/bin/spark-class
> > org.apache.spark.deploy.worker.Worker spark://hivecluster2:7077
> >
> > root 12716  0.3  1.1 4155884 191340 ?  Sl   May27  30:21 java -cp
> >
> :/opt/cloudera/parcels/SPARK/lib/spark/conf:/opt/cloudera/parcels/SPARK/lib/spark/core/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/repl/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/examples/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/bagel/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/mllib/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/streaming/lib/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-hdfs/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-yarn/*:/opt/cloudera/parcels/CDH/lib/hadoop/../hadoop-mapreduce/*:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-library.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/scala-compiler.jar:/opt/cloudera/parcels/SPARK/lib/spark/lib/jline.jar
> > -Dspark.akka.logLifecycleEvents=true
> >
> -Djava.library.path=/opt/cloudera/parcels/SPARK/lib/spark/lib:/opt/cloudera/parcels/CDH/lib/hadoop/lib/native
> > -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker
> > spark://hivecluster2:7077
> >
> >
> >
> >
> > On Sun, Jun 1, 2014 at 7:41 PM, Aaron Davidson 
> wrote:
> >>
> >> Sounds like you have two shells running, and the first one is talking
> all
> >> your resources. Do a "jps" and kill the other guy, then try again.
> >>
> >> By the way, you can look at http://localhost:8080 (replace localhost
> with
> >> the server your Spark Master is running on) to see what applications are
> >> currently started, and what resource allocations they have.
> >>
> >>
> >> On Sun, Jun 1, 2014 at 6:47 PM, Russell Jurney <
> russell.jur...@gmail.com>
> >> wrote:
> >>>
> >>> Thanks again. Run results here:
> >>> https://gist.github.com/rjurney/dc0efae486ba7d55b7d5
> >>>
> >>> This time I get a port already in use exception on 4040, but it isn't
> >>> fatal. Then when I run rdd.first, I get this over and over:
> >>>
> >>> 14/06/01 18:35:40 WARN scheduler.TaskSchedulerImpl: Initial job has not
> >>> accepted any resources; check your cluster UI to ensure that workers
> are
> >>> registered and have sufficient memory
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Sun, Jun 1, 2014 at 3:09 PM, Aaron Davidson 
> >>> wrote:
> 
>  You can avoid that by using the constructor that takes a SparkConf, a
> la
> 
>  val conf = new SparkConf()
> >>>

Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Nicholas Chammas
So in summary:

   - As of Spark 1.0.0, saveAsTextFile() will no longer clobber by default.
   - There is an open JIRA issue to add an option to allow clobbering.
   - Even when clobbering, part- files may be left over from previous
   saves, which is dangerous.

Is this correct?


On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson  wrote:

> +1 please re-add this feature
>
>
> On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell 
> wrote:
>
>> Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
>> I accidentally assigned myself way back when I created it). This
>> should be an easy fix.
>>
>> On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  wrote:
>> > Hi, Patrick,
>> >
>> > I think https://issues.apache.org/jira/browse/SPARK-1677 is talking
>> about
>> > the same thing?
>> >
>> > How about assigning it to me?
>> >
>> > I think I missed the configuration part in my previous commit, though I
>> > declared that in the PR description
>> >
>> > Best,
>> >
>> > --
>> > Nan Zhu
>> >
>> > On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
>> >
>> > Hey There,
>> >
>> > The issue was that the old behavior could cause users to silently
>> > overwrite data, which is pretty bad, so to be conservative we decided
>> > to enforce the same checks that Hadoop does.
>> >
>> > This was documented by this JIRA:
>> > https://issues.apache.org/jira/browse/SPARK-1100
>> >
>> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>> >
>> > However, it would be very easy to add an option that allows preserving
>> > the old behavior. Is anyone here interested in contributing that? I
>> > created a JIRA for it:
>> >
>> > https://issues.apache.org/jira/browse/SPARK-1993
>> >
>> > - Patrick
>> >
>> > On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
>> >  wrote:
>> >
>> > Indeed, the behavior has changed for good or for bad. I mean, I agree
>> with
>> > the danger you mention but I'm not sure it's happening like that. Isn't
>> > there a mechanism for overwrite in Hadoop that automatically removes
>> part
>> > files, then writes a _temporary folder and then only the part files
>> along
>> > with the _success folder.
>> >
>> > In any case this change of behavior should be documented IMO.
>> >
>> > Cheers
>> > Pierre
>> >
>> > Message sent from a mobile device - excuse typos and abbreviations
>> >
>> > Le 2 juin 2014 à 17:42, Nicholas Chammas  a
>> > écrit :
>> >
>> > What I've found using saveAsTextFile() against S3 (prior to Spark
>> 1.0.0.) is
>> > that files get overwritten automatically. This is one danger to this
>> though.
>> > If I save to a directory that already has 20 part- files, but this time
>> > around I'm only saving 15 part- files, then there will be 5 leftover
>> part-
>> > files from the previous set mixed in with the 15 newer files. This is
>> > potentially dangerous.
>> >
>> > I haven't checked to see if this behavior has changed in 1.0.0. Are you
>> > saying it has, Pierre?
>> >
>> > On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
>> > [pierre.borckm...@realimpactanalytics.com](mailto:
>> pierre.borckm...@realimpactanalytics.com)
>> > wrote:
>> >
>> >
>> > Hi Michaël,
>> >
>> > Thanks for this. We could indeed do that.
>> >
>> > But I guess the question is more about the change of behaviour from
>> 0.9.1
>> > to
>> > 1.0.0.
>> > We never had to care about that in previous versions.
>> >
>> > Does that mean we have to manually remove existing files or is there a
>> way
>> > to "aumotically" overwrite when using saveAsTextFile?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> >
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> >
>>
>
>


EC2 Simple Cluster

2014-06-02 Thread Gianluca Privitera

Hi everyone,
I would like to setup a very simple cluster (specifically using 2 micro 
instances only) of Spark on EC2 and make it run a simple Spark Streaming 
application I created.

Someone actually managed to do that?
Because after launching the scripts from this page: 
http://spark.apache.org/docs/0.9.1/ec2-scripts.html and logging into the 
master node, I cannot find the spark folder the page is talking about, 
so I suppose the launch didn't go well.


Thank you
Gianluca


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-02 Thread Aaron Davidson
Yes.


On Mon, Jun 2, 2014 at 1:23 PM, Nicholas Chammas  wrote:

> So in summary:
>
>- As of Spark 1.0.0, saveAsTextFile() will no longer clobber by
>default.
>- There is an open JIRA issue to add an option to allow clobbering.
>- Even when clobbering, part- files may be left over from previous
>saves, which is dangerous.
>
> Is this correct?
>
>
> On Mon, Jun 2, 2014 at 4:17 PM, Aaron Davidson  wrote:
>
>> +1 please re-add this feature
>>
>>
>> On Mon, Jun 2, 2014 at 12:44 PM, Patrick Wendell 
>> wrote:
>>
>>> Thanks for pointing that out. I've assigned you to SPARK-1677 (I think
>>> I accidentally assigned myself way back when I created it). This
>>> should be an easy fix.
>>>
>>> On Mon, Jun 2, 2014 at 12:19 PM, Nan Zhu  wrote:
>>> > Hi, Patrick,
>>> >
>>> > I think https://issues.apache.org/jira/browse/SPARK-1677 is talking
>>> about
>>> > the same thing?
>>> >
>>> > How about assigning it to me?
>>> >
>>> > I think I missed the configuration part in my previous commit, though I
>>> > declared that in the PR description
>>> >
>>> > Best,
>>> >
>>> > --
>>> > Nan Zhu
>>> >
>>> > On Monday, June 2, 2014 at 3:03 PM, Patrick Wendell wrote:
>>> >
>>> > Hey There,
>>> >
>>> > The issue was that the old behavior could cause users to silently
>>> > overwrite data, which is pretty bad, so to be conservative we decided
>>> > to enforce the same checks that Hadoop does.
>>> >
>>> > This was documented by this JIRA:
>>> > https://issues.apache.org/jira/browse/SPARK-1100
>>> >
>>> https://github.com/apache/spark/commit/3a8b698e961ac05d9d53e2bbf0c2844fcb1010d1
>>> >
>>> > However, it would be very easy to add an option that allows preserving
>>> > the old behavior. Is anyone here interested in contributing that? I
>>> > created a JIRA for it:
>>> >
>>> > https://issues.apache.org/jira/browse/SPARK-1993
>>> >
>>> > - Patrick
>>> >
>>> > On Mon, Jun 2, 2014 at 9:22 AM, Pierre Borckmans
>>> >  wrote:
>>> >
>>> > Indeed, the behavior has changed for good or for bad. I mean, I agree
>>> with
>>> > the danger you mention but I'm not sure it's happening like that. Isn't
>>> > there a mechanism for overwrite in Hadoop that automatically removes
>>> part
>>> > files, then writes a _temporary folder and then only the part files
>>> along
>>> > with the _success folder.
>>> >
>>> > In any case this change of behavior should be documented IMO.
>>> >
>>> > Cheers
>>> > Pierre
>>> >
>>> > Message sent from a mobile device - excuse typos and abbreviations
>>> >
>>> > Le 2 juin 2014 à 17:42, Nicholas Chammas 
>>> a
>>> > écrit :
>>> >
>>> > What I've found using saveAsTextFile() against S3 (prior to Spark
>>> 1.0.0.) is
>>> > that files get overwritten automatically. This is one danger to this
>>> though.
>>> > If I save to a directory that already has 20 part- files, but this time
>>> > around I'm only saving 15 part- files, then there will be 5 leftover
>>> part-
>>> > files from the previous set mixed in with the 15 newer files. This is
>>> > potentially dangerous.
>>> >
>>> > I haven't checked to see if this behavior has changed in 1.0.0. Are you
>>> > saying it has, Pierre?
>>> >
>>> > On Mon, Jun 2, 2014 at 9:41 AM, Pierre B
>>> > [pierre.borckm...@realimpactanalytics.com](mailto:
>>> pierre.borckm...@realimpactanalytics.com)
>>> > wrote:
>>> >
>>> >
>>> > Hi Michaël,
>>> >
>>> > Thanks for this. We could indeed do that.
>>> >
>>> > But I guess the question is more about the change of behaviour from
>>> 0.9.1
>>> > to
>>> > 1.0.0.
>>> > We never had to care about that in previous versions.
>>> >
>>> > Does that mean we have to manually remove existing files or is there a
>>> way
>>> > to "aumotically" overwrite when using saveAsTextFile?
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> >
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-tp6696p6700.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> >
>>>
>>
>>
>


Re: hadoopRDD stalls reading entire directory

2014-06-02 Thread Russell Jurney
Nothing appears to be running on hivecluster2:8080.

'sudo jps' does show

[hivedata@hivecluster2 ~]$ sudo jps
9953 PepAgent
13797 JournalNode
7618 NameNode
6574 Jps
12716 Worker
16671 RunJar
18675 Main
18177 JobTracker
10918 Master
18139 TaskTracker
7674 DataNode


I kill all processes listed. I restart Spark Master on hivecluster2:

[hivedata@hivecluster2 ~]$ sudo
/opt/cloudera/parcels/SPARK/lib/spark/sbin/start-master.sh

starting org.apache.spark.deploy.master.Master, logging to
/var/log/spark/spark-root-org.apache.spark.deploy.master.Master-1-hivecluster2.out

I run the spark shell again:

[hivedata@hivecluster2 ~]$ spark-shell -usejavacp -classpath "*.jar"
14/06/02 13:52:13 INFO spark.HttpServer: Starting HTTP Server
14/06/02 13:52:13 INFO server.Server: jetty-7.6.8.v20121106
14/06/02 13:52:13 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:52814
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
  /_/

Using Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java
1.6.0_31)
Type in expressions to have them evaluated.
Type :help for more information.
14/06/02 13:52:19 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/06/02 13:52:19 INFO Remoting: Starting remoting
14/06/02 13:52:19 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@hivecluster2:46033]
14/06/02 13:52:19 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@hivecluster2:46033]
14/06/02 13:52:19 INFO spark.SparkEnv: Registering BlockManagerMaster
14/06/02 13:52:19 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140602135219-bd8a
14/06/02 13:52:19 INFO storage.MemoryStore: MemoryStore started with
capacity 294.4 MB.
14/06/02 13:52:19 INFO network.ConnectionManager: Bound socket to port
50645 with id = ConnectionManagerId(hivecluster2,50645)
14/06/02 13:52:19 INFO storage.BlockManagerMaster: Trying to register
BlockManager
14/06/02 13:52:19 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Registering block manager hivecluster2:50645 with 294.4 MB RAM
14/06/02 13:52:19 INFO storage.BlockManagerMaster: Registered BlockManager
14/06/02 13:52:19 INFO spark.HttpServer: Starting HTTP Server
14/06/02 13:52:19 INFO server.Server: jetty-7.6.8.v20121106
14/06/02 13:52:19 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:36103
14/06/02 13:52:19 INFO broadcast.HttpBroadcast: Broadcast server started at
http://10.10.30.211:36103
14/06/02 13:52:19 INFO spark.SparkEnv: Registering MapOutputTracker
14/06/02 13:52:19 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-ecce4c62-fef6-4369-a3d5-e3d7cbd1e00c
14/06/02 13:52:19 INFO spark.HttpServer: Starting HTTP Server
14/06/02 13:52:19 INFO server.Server: jetty-7.6.8.v20121106
14/06/02 13:52:19 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:37662
14/06/02 13:52:19 INFO server.Server: jetty-7.6.8.v20121106
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/stage,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/pool,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/environment,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/executors,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/metrics/json,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/static,null}
14/06/02 13:52:19 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/,null}
14/06/02 13:52:19 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
14/06/02 13:52:19 INFO ui.SparkUI: Started Spark Web UI at
*http://hivecluster2:4040
*
14/06/02 13:52:19 INFO client.AppClient$ClientActor: Connecting to master
spark://hivecluster2:7077...
14/06/02 13:52:20 INFO cluster.SparkDeploySchedulerBackend: Connected to
Spark cluster with app ID app-20140602135220-
Created spark context..
Spark context available as sc.


Note that the Spark Web UI is running at hivecluster2:4040, I get the UI
when I go there. I verify again that nothing exists at hivecluster2:8080.

I try to run my code:

...

val sparkConf = new SparkConf()
sparkConf.setMaster("spark://hivecluster2:7077")
sparkConf.setAppName("Test Spark App")
sparkConf.setJars(Array("avro-1.7.6.jar", "avro-mapred-1.7.6.jar"))
val sc = new SparkContext(sparkConf)

This produces a new spark server(!) at port 4041:


14/06/02 13:55:31 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4041
14/06/02 13:55:31