Issue during Spark streaming with ZeroMQ source

2014-04-29 Thread Francis . Hu
Hi, all

 

I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example:

 

./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher
tcp://127.0.1.1:1234 foo.bar`

./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount
local[2] tcp://127.0.1.1:1234 foo`

 

No any message was received in ZeroMQWordCount side. 

 

Does anyone know what the issue is ? 

 

 

Thanks,

Francis

 



RE: questions about debugging a spark application

2014-04-29 Thread wxhsdp
Hi Liu, 
is it the feature of spark 0.9.1?
my version is 0.9.0, it has no effect when i set spark.eventLog.enabled



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/questions-about-debugging-a-spark-application-tp4891p5028.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Issue during Spark streaming with ZeroMQ source

2014-04-29 Thread Prashant Sharma
Unfortunately zeromq 4.0.1 is not supported.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala#L63Says
about the version. You will need that version of zeromq to see it
work. Basically I have seen it working nicely with zeromq 2.2.0 and if you
have jzmq libraries installed performance is much better.

Prashant Sharma


On Tue, Apr 29, 2014 at 12:29 PM, Francis.Hu
wrote:

>  Hi, all
>
>
>
> I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example:
>
>
>
> ./bin/run-example
> org.apache.spark.streaming.examples.SimpleZeroMQPublisher *tcp*://
> 127.0.1.1:1234 foo.bar`
>
> ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount
> local[2] *tcp*://127.0.1.1:1234 *foo*`
>
>
>
> No any message was received in ZeroMQWordCount side.
>
>
>
> Does anyone know what the issue is ?
>
>
>
>
>
> Thanks,
>
> Francis
>
>
>


Re: File list read into single RDD

2014-04-29 Thread Christophe Préaud

Hi,

You can also use any path pattern as defined here: 
http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus%28org.apache.hadoop.fs.Path%29

e.g.:

sc.textFile('{/path/to/file1,/path/to/file2}')

Christophe.

On 29/04/2014 05:07, Nicholas Chammas wrote:
Not that I know of. We were discussing it on another thread and it came up.

I think if you look up the Hadoop FileInputFormat API (which Spark uses) you'll 
see it mentioned there in the docs.

http://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/mapred/FileInputFormat.html

But that's not obvious.

Nick

2014년 4월 28일 월요일, Pat Ferrelmailto:pat.fer...@gmail.com>> 
님이 작성한 메시지:
Perfect.

BTW just so I know where to look next time, was that in some docs?

On Apr 28, 2014, at 7:04 PM, Nicholas Chammas 
>
 wrote:


Yep, as I just found out, you can also provide sc.textFile() with a 
comma-delimited string of all the files you want to load.

For example:

sc.textFile('/path/to/file1,/path/to/file2')


So once you have your list of files, concatenate their paths like that and pass 
the single string to textFile().

Nick


On Mon, Apr 28, 2014 at 7:23 PM, Pat Ferrel 
> 
wrote:
sc.textFile(URI) supports reading multiple files in parallel but only with a 
wildcard. I need to walk a dir tree, match a regex to create a list of files, 
then I’d like to read them into a single RDD in parallel. I understand these 
could go into separate RDDs then a union RDD can be created. Is there a way to 
create a single RDD from a URI list?





Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Spark RDD cache memory usage

2014-04-29 Thread Han JU
Hi,

By default a fraction of the executor memory (60%) is reserved for RDD
caching, so if there's no explicit caching in the code (eg. rdd.cache()
etc.), or if we persist RDD with StorageLevel.DISK_ONLY, is this part of
memory wated? Does Spark allocates the RDD cache memory dynamically? Or
does spark automatically caches RDDs when it can?

Thanks.
-- 
*JU Han*

Data Engineer @ Botify.com

+33 061960


答复: Issue during Spark streaming with ZeroMQ source

2014-04-29 Thread Francis . Hu
Thanks, Prashant Sharma

 

 

It works right now after degrade zeromq from 4.0.1 to  2.2. 

Do you know the new release of spark  whether it will upgrade zeromq ?

Many of our programs are using zeromq 4.0.1, so if in next release ,spark 
streaming can release with a newer zeromq  that would be better for us.

 

 

Francis.

 

发件人: Prashant Sharma [mailto:scrapco...@gmail.com] 
发送时间: Tuesday, April 29, 2014 15:53
收件人: user@spark.apache.org
主题: Re: Issue during Spark streaming with ZeroMQ source

 

Unfortunately zeromq 4.0.1 is not supported. 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala#L63
 Says about the version. You will need that version of zeromq to see it work. 
Basically I have seen it working nicely with zeromq 2.2.0 and if you have jzmq 
libraries installed performance is much better.




Prashant Sharma

 

On Tue, Apr 29, 2014 at 12:29 PM, Francis.Hu  
wrote:

Hi, all

 

I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example:

 

./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher 
tcp://127.0.1.1:1234 foo.bar`

./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] 
tcp://127.0.1.1:1234 foo`

 

No any message was received in ZeroMQWordCount side. 

 

Does anyone know what the issue is ? 

 

 

Thanks,

Francis

 

 



Re: 答复: Issue during Spark streaming with ZeroMQ source

2014-04-29 Thread Prashant Sharma
Well that is not going to be easy, simply because we depend on akka-zeromq
for zeromq support. And since akka does not support the latest zeromq
library yet, I doubt if there is something simple that can be done to
support it.

Prashant Sharma


On Tue, Apr 29, 2014 at 2:44 PM, Francis.Hu wrote:

>  Thanks, Prashant Sharma
>
>
>
>
>
> It works right now after degrade zeromq from 4.0.1 to  2.2.
>
> Do you know the new release of spark  whether it will upgrade zeromq ?
>
> Many of our programs are using zeromq 4.0.1, so if in next release ,spark
> streaming can release with a newer zeromq  that would be better for us.
>
>
>
>
>
> Francis.
>
>
>
> *发件人:* Prashant Sharma [mailto:scrapco...@gmail.com]
> *发送时间:* Tuesday, April 29, 2014 15:53
> *收件人:* user@spark.apache.org
> *主题:* Re: Issue during Spark streaming with ZeroMQ source
>
>
>
> Unfortunately zeromq 4.0.1 is not supported.
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala#L63Says
>  about the version. You will need that version of zeromq to see it
> work. Basically I have seen it working nicely with zeromq 2.2.0 and if you
> have jzmq libraries installed performance is much better.
>
>
>   Prashant Sharma
>
>
>
> On Tue, Apr 29, 2014 at 12:29 PM, Francis.Hu 
> wrote:
>
> Hi, all
>
>
>
> I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example:
>
>
>
> ./bin/run-example
> org.apache.spark.streaming.examples.SimpleZeroMQPublisher *tcp*://
> 127.0.1.1:1234 foo.bar`
>
> ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount
> local[2] *tcp*://127.0.1.1:1234 *foo*`
>
>
>
> No any message was received in ZeroMQWordCount side.
>
>
>
> Does anyone know what the issue is ?
>
>
>
>
>
> Thanks,
>
> Francis
>
>
>
>
>


Re: launching concurrent jobs programmatically

2014-04-29 Thread ishaaq
Very interesting.

One of spark's attractive features is being able to do stuff interactively
via spark-shell. Is something like that still available via Ooyala's job
server?

Or do you use the spark-shell independently of that? If the latter then how
do you manage custom jars for spark-shell? Our app has a number of jars that
I don't particularly want to have to upload each time I want to run a small
ad-hoc spark-shell session.

Thanks,
Ishaaq



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/launching-concurrent-jobs-programmatically-tp4990p5033.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Joining not-pair RDDs in Spark

2014-04-29 Thread jsantos
In the context of telecom industry, let's supose we have several existing
RDDs populated from some tables in Cassandra:

val callPrices: RDD[PriceRow]
val calls: RDD[CallRow]
val offersInCourse: RDD[OfferRow]

where types are defined as follows,

/** Represents the price per minute for a concrete hour */
case class PriceRow(
val year: Int,
val month: Int,
val day: Int,
val hour: Int,
val basePrice: Float)

/** Call registries*/
case class CallRow(
val customer: String,
val year: Int,
val month: Int,
val day: Int,
val minutes: Int)

/** Is there any discount that could be applicable here? */
case class OfferRow(
val offerName: String,
val hour: Int,//[0..23]
val discount: Float)//[0..1]

Assuming we cannot use `flatMap` to mix these three RDDs like this way
(since RDD is not really 'monadic'):

/** 
 * The final bill at a concrete hour for a call 
 * is defined as {{{ 
 *def billPerHour(minutes: Int,basePrice:Float,discount:Float) = 
 *  minutes * basePrice * discount
 * }}}
 */
val bills: RDD[BillRow] = for{
price <- callPrices
call <- calls if call.hour==price.hour
offer <- offersInCourse if offer.hour==price.hour
} yield BillRow(
call.customer,
call.hour,
billPerHour(call.minutes,price.basePrice,offer.discount))

case class BillRow(
val customer: String,
val hour: DateTime,
val amount: Float)

which is the best practise for generating a new RDD that join all these
three RDDs and represents the bill for a concrete customer?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why Spark require this object to be serializerable?

2014-04-29 Thread Earthson
Finally, I'm using file to save RDDs, and then reload it. It works fine,
because Gibbs Sampling for LDA is really slow. It's about 10min to sampling
10k wiki document for 10 round(1 round/min).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5036.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Shuffle Spill Issue

2014-04-29 Thread Daniel Darabos
I have no idea why shuffle spill is so large. But this might make it
smaller:

val addition = (a: Int, b: Int) => a + b
val wordsCount = wordsPair.combineByKey(identity, addition, addition)

This way only one entry per distinct word will end up in the shuffle for
each partition, instead of one entry per word occurrence.


On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond  wrote:

> Hi  Patrick
>
> I am just doing simple word count , the data is generated by
> hadoop random text writer.
>
> This seems to me not quite related to compress , If I turn off
> compress on shuffle, the metrics is something like below for the smaller
> 240MB Dataset.
>
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB
>  2.2 GB  1291.2 KB
> 12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB
>  1822.6 MB   1073.3 KB
> 13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB
>  1099.2 MB   621.2 KB
> 15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB
>  1898.8 MB   1072.6 KB
> 16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB
>  1638.0 MB   1044.6 KB
>
>
> And the program pretty simple:
>
> val files = sc.textFile(args(1))
> val words = files.flatMap(_.split(" "))
> val wordsPair = words.map(x => (x, 1))
>
> val wordsCount = wordsPair.reduceByKey(_ + _)
> val count = wordsCount.count()
>
> println("Number of words = " + count)
>
>
> Best Regards,
> Raymond Liu
>
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Could you explain more what your job is doing and what data types you are
> using? These numbers alone don't necessarily indicate something is wrong.
> The relationship between the in-memory and on-disk shuffle amount is
> definitely a bit strange, the data gets compressed when written to disk,
> but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it
> to compress _that_ much.
>
> On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond 
> wrote:
> Hi
>
>
> I am running a simple word count program on spark standalone
> cluster. The cluster is made up of 6 node, each run 4 worker and each
> worker own 10G memory and 16 core thus total 96 core and 240G memory. (
> well, also used to configed as 1 worker with 40G memory on each node )
>
> I run a very small data set (2.4GB on HDFS on total) to confirm
> the problem here as below:
>
> As you can read from part of the task metrics as below, I noticed
> that the shuffle spill part of metrics indicate that there are something
> wrong.
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB
>  23.6 GB 4.3 MB
> 1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB
>  19.0 GB 3.4 MB
> 10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB
>  25.6 GB 4.6 MB
> 11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB
>  25.0 GB 4.4 MB
> 12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB
>  25.1 GB 4.4 MB
>
>
> You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x
> of the actual shuffle data and Shuffle Spill (Disk), and also it seems to
> me that by no means that the spill should trigger, since the memory is not
> used up at all.
>
> To verify that I further reduce the data size to 240MB on total
>
> And here is the result:
>
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0   sr437:50895 15 s4   0   4   0.0 B   703.0 KB
>  80.0 MB 43.2 KB
> 1   sr433:50207 17 s4   0   4   0.0 B   704.7 KB
>  389.5 MB90.2 KB
> 10  sr436:56352 16 s4   0   4   0.0 B   700.9 KB
>  814.9 MB181.6 KB
> 11  sr437:53099 15 s4   0   4   0.0 B   689.7 KB
>  0.0 B   0.0 B
> 12  sr435:48318 15 s4   0   4   0.0 B   702.1 KB
>  427.4 MB90.7 KB
> 13  sr433:59294 17 s4   0   4   0.0 B   704.8 KB
>  779.9 MB180.3 KB
>
> Nothing prevent spill from happening.
>
> Now, there seems to me that there must be something wrong with the spill
> trigger codes.
>
> So anyone encounter this issue?  By the way, I am using latest trunk code.
>
>
> Best Regards,
> Raymond Liu
>
>


User/Product Clustering with pySpark ALS

2014-04-29 Thread Laird, Benjamin
Hi all -

I’m using pySpark/MLLib ALS for user/item clustering and would like to directly 
access the user/product RDDs (called userFeatures/productFeatures in class 
MatrixFactorizationModel in mllib/recommendation/MatrixFactorizationModel.scala

This doesn’t seem to complex, but it doesn’t seem like the functionality is 
currently available. I think it requires accessing the underlying java mode 
like so:
model = ALS.train(ratings,1,iterations=1,blocks=5)
userFeatures = RDD(model.javamodel.userFeatures, sc, ???)

However, I don’t know what to pass as the deserializer. I need these low 
dimensional vectors as an RDD to then use in Kmeans clustering. Has anyone done 
something similar?

Ben


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Fwd: Spark RDD cache memory usage

2014-04-29 Thread Han JU
Hi,

By default a fraction of the executor memory (60%) is reserved for RDD
caching, so if there's no explicit caching in the code (eg. rdd.cache()
etc.), or if we persist RDD with StorageLevel.DISK_ONLY, is this part of
memory wated? Does Spark allocates the RDD cache memory dynamically? Or
does spark automatically caches RDDs when it can?

Thanks.

-- 
*JU Han*

Data Engineer @ Botify.com

+33 061960


Re: Joining not-pair RDDs in Spark

2014-04-29 Thread Daniel Darabos
Create a key and join on that.

val callPricesByHour = callPrices.map(p => ((p.year, p.month, p.day,
p.hour), p))
val callsByHour = calls.map(c => ((c.year, c.month, c.day, c.hour), c))
val bills = callPricesByHour.join(callsByHour).mapValues({ case (p, c) =>
BillRow(c.customer, c.hour, c.minutes * p.basePrice) }).values

You should be able to expand this approach to three RDDs too.


On Tue, Apr 29, 2014 at 11:55 AM, jsantos  wrote:

> In the context of telecom industry, let's supose we have several existing
> RDDs populated from some tables in Cassandra:
>
> val callPrices: RDD[PriceRow]
> val calls: RDD[CallRow]
> val offersInCourse: RDD[OfferRow]
>
> where types are defined as follows,
>
> /** Represents the price per minute for a concrete hour */
> case class PriceRow(
> val year: Int,
> val month: Int,
> val day: Int,
> val hour: Int,
> val basePrice: Float)
>
> /** Call registries*/
> case class CallRow(
> val customer: String,
> val year: Int,
> val month: Int,
> val day: Int,
> val minutes: Int)
>
> /** Is there any discount that could be applicable here? */
> case class OfferRow(
> val offerName: String,
> val hour: Int,//[0..23]
> val discount: Float)//[0..1]
>
> Assuming we cannot use `flatMap` to mix these three RDDs like this way
> (since RDD is not really 'monadic'):
>
> /**
>  * The final bill at a concrete hour for a call
>  * is defined as {{{
>  *def billPerHour(minutes: Int,basePrice:Float,discount:Float)
> =
>  *  minutes * basePrice * discount
>  * }}}
>  */
> val bills: RDD[BillRow] = for{
> price <- callPrices
> call <- calls if call.hour==price.hour
> offer <- offersInCourse if offer.hour==price.hour
> } yield BillRow(
> call.customer,
> call.hour,
> billPerHour(call.minutes,price.basePrice,offer.discount))
>
> case class BillRow(
> val customer: String,
> val hour: DateTime,
> val amount: Float)
>
> which is the best practise for generating a new RDD that join all these
> three RDDs and represents the bill for a concrete customer?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: User/Product Clustering with pySpark ALS

2014-04-29 Thread Nick Pentreath
There's no easy way to d this currently. The pieces are there from the PySpark 
code for regression which should be adaptable.


But you'd have to roll your own solution.




This is something I also want so I intend to put together a pull request for 
this soon
—
Sent from Mailbox

On Tue, Apr 29, 2014 at 4:28 PM, Laird, Benjamin
 wrote:

> Hi all -
> I’m using pySpark/MLLib ALS for user/item clustering and would like to 
> directly access the user/product RDDs (called userFeatures/productFeatures in 
> class MatrixFactorizationModel in 
> mllib/recommendation/MatrixFactorizationModel.scala
> This doesn’t seem to complex, but it doesn’t seem like the functionality is 
> currently available. I think it requires accessing the underlying java mode 
> like so:
> model = ALS.train(ratings,1,iterations=1,blocks=5)
> userFeatures = RDD(model.javamodel.userFeatures, sc, ???)
> However, I don’t know what to pass as the deserializer. I need these low 
> dimensional vectors as an RDD to then use in Kmeans clustering. Has anyone 
> done something similar?
> Ben
> 
> The information contained in this e-mail is confidential and/or proprietary 
> to Capital One and/or its affiliates. The information transmitted herewith is 
> intended only for use by the individual or entity to which it is addressed.  
> If the reader of this message is not the intended recipient, you are hereby 
> notified that any review, retransmission, dissemination, distribution, 
> copying or other use of, or taking of any action in reliance upon this 
> information is strictly prohibited. If you have received this communication 
> in error, please contact the sender and delete the material from your 
> computer.

Storage information about an RDD from the API

2014-04-29 Thread Andras Nemeth
Hi,

Is it possible to know from code about an RDD if it is cached, and more
precisely, how many of its partitions are cached in memory and how many are
cached on disk? I know I can get the storage level, but I also want to know
the current actual caching status. Knowing memory consumption would also be
awesome. :)

Basically what I'm looking for is the information on the storage tab of the
UI, but accessible from the API.

Thanks,
Andras


Re: Storage information about an RDD from the API

2014-04-29 Thread Koert Kuipers
SparkContext.getRDDStorageInfo


On Tue, Apr 29, 2014 at 12:34 PM, Andras Nemeth <
andras.nem...@lynxanalytics.com> wrote:

> Hi,
>
> Is it possible to know from code about an RDD if it is cached, and more
> precisely, how many of its partitions are cached in memory and how many are
> cached on disk? I know I can get the storage level, but I also want to know
> the current actual caching status. Knowing memory consumption would also be
> awesome. :)
>
> Basically what I'm looking for is the information on the storage tab of
> the UI, but accessible from the API.
>
> Thanks,
> Andras
>


Re: How to declare Tuple return type for a function

2014-04-29 Thread Roger Hoover
The return type should be RDD[(Int, Int, Int)] because sc.textFile()
returns an RDD.  Try adding an import for the RDD type to get rid of the
compile error.

import org.apache.spark.rdd.RDD


On Mon, Apr 28, 2014 at 6:22 PM, SK  wrote:

> Hi,
>
> I am a new user of Spark. I have a class that defines a function as
> follows.
> It returns a tuple : (Int, Int, Int).
>
> class Sim extends VectorSim {
>  override def  input(master:String): (Int,Int,Int) = {
> sc = new SparkContext(master, "Test")
> val ratings = sc.textFile(INP_FILE)
>   .map(line=> {
> val fields = line.split("\t")
> (fields(0).toInt, fields(1).toInt, fields(2).toInt)
>   })
> ratings
>   }
> }
>
> The class extends the trait VectorSim where the function  input() is
> declared as follows.
>
> trait VectorSim {
>   def input (s:String): (Int, Int, Int)
> }
>
> However, when I compile, I get a type mismatch saying input() returns
> RDD[(Int,Int,Int)]. So I changed the return type to RDD[(Int,Int,Int)], but
> the compiler complains that there is no type called RDD. What is the right
> way to  declare when the return type of a function is  a tuple that is
> (Int,Int,Int).
>
> I am using spark 0.9.
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-declare-Tuple-return-type-for-a-function-tp4999.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Python Spark on YARN

2014-04-29 Thread Guanhua Yan
Hi all:

Is it possible to develop Spark programs in Python and run them on YARN?
>From the Python SparkContext class, it doesn't seem to have such an option.

Thank you,
- Guanhua

===
Guanhua Yan, Ph.D.
Information Sciences Group (CCS-3)
Los Alamos National Laboratory
Tel: +1-505-667-0176
Email: gh...@lanl.gov
Web: http://ghyan.weebly.com/
===




How to declare Tuple return type for a function

2014-04-29 Thread SK
Hi,

I am a new user of Spark. I have a class that defines a function as follows.
It returns a tuple : (Int, Int, Int).

class Sim extends VectorSim {
override def  input(master:String): (Int,Int,Int) = {
sc = new SparkContext(master, "Test")
val ratings = sc.textFile(INP_FILE)
  .map(line=> {
val fields = line.split("\t")
(fields(0).toInt, fields(1).toInt, fields(2).toInt)
  })
ratings
  }
}

The class extends the trait VectorSim where the function  input() is
declared as follows.

trait VectorSim {
  def input (s:String): (Int, Int, Int)
}

However, when I compile, I get a type mismatch saying input() returns
RDD[(Int,Int,Int)]. So I changed the return type to RDD[(Int,Int,Int)], but
the compiler complains that there is no type called RDD. What is the right
way to  declare the return type for a function that returns a tuple that is
(Int,Int,Int).

I am using spark 0.9.

thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-declare-Tuple-return-type-for-a-function-tp5047.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


What is Seq[V] in updateStateByKey?

2014-04-29 Thread Adrian Mocanu
What is Seq[V] in updateStateByKey?
Does this store the collected tuples of the RDD in a collection?

Method signature:
def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) => 
Option[S] ): DStream[(K, S)]

In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the 
moment I switched to a different type like Seq[(String, Double)] the code 
didn't compile.

-Adrian



packaging time

2014-04-29 Thread SK
Each time I run sbt/sbt assembly to compile my program, the packaging time
takes about 370 sec (about 6 min). How can I reduce this time? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/packaging-time-tp5048.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Delayed Scheduling - Setting spark.locality.wait.node parameter in interactive shell

2014-04-29 Thread Sai Prasanna
Hi All,

I have replication factor 3 in my HDFS.
With 3 datanodes, i ran my experiments. Now i just added another node to it
with no data in it.
When i ran, SPARK launches non-local tasks in it and the time taken is more
than what it took for 3 node cluster.

Here delayed scheduling fails i think because of the parameter
spark.locality.wait.node which is by default 3 sec. It launches "ANY" level
tasks in the added data node.

I wanted to increase this parameter in the interactive shell. How do i do
it.
What variable should i set to pass it onto the spark-context in interactive
shell?

Thanks.


Spark: issues with running a sbt fat jar due to akka dependencies

2014-04-29 Thread Shivani Rao
Hello folks,

I was going to post this question to spark user group as well. If you have
any leads on how to solve this issue please let me know:

I am trying to build a basic spark project (spark depends on akka) and I am
trying to create a fatjar using sbt assembly. The goal is to run the fatjar
via commandline as follows:
 java -cp "path to my spark fatjar" mainclassname

I encountered deduplication errors in the following akka libraries during
sbt assembly
akka-remote_2.10-2.2.3.jar with akka-remote_2.10-2.2.3-shaded-protobuf.jar
 akka-actor_2.10-2.2.3.jar with akka-actor_2.10-2.2.3-shaded-protobuf.jar

I resolved them by using MergeStrategy.first and that helped with a
successful compilation of the sbt assembly command. But for some or the
other configuration parameter in the akka kept throwing up with the
following message

"Exception in thread "main" com.typesafe.config.ConfigException$Missing: No
configuration setting found for key"

I then used MergeStrategy.concat for "reference.conf" and I started getting
this repeated error

Exception in thread "main" com.typesafe.config.ConfigException$Missing: No
configuration setting found for key 'akka.version'.

I noticed that akka.version is only in the akka-actor jars and not in the
akka-remote. The resulting reference.conf (in my final fat jar) does not
contain akka.version either. So the strategy is not working.

There are several things I could try

a) Use the following dependency https://github.com/sbt/sbt-proguard
b) Write a build.scala to handle merging of reference.conf

https://spark-project.atlassian.net/browse/SPARK-395
http://letitcrash.com/post/21025950392/howto-sbt-assembly-vs-reference-conf

c) Create a reference.conf by merging all akka configurations and then
passing it in my java -cp command as shown below

java -cp  -DConfig.file=

The main issue is that if I run the spark jar as "sbt run" there are no
errors in accessing any of the akka configuration parameters. It is only
when I run it via command line (java -cp  classname) that I
encounter the error.

Which of these is a long term fix to akka issues? For now, I removed the
akka dependencies and that solved the problem, but I know that is not a
long term solution

Regards,
Shivani

-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Re: Spark: issues with running a sbt fat jar due to akka dependencies

2014-04-29 Thread Koert Kuipers
you need to merge reference.conf files and its no longer an issue.

see the Build for for spark itself:
  case "reference.conf" => MergeStrategy.concat


On Tue, Apr 29, 2014 at 3:32 PM, Shivani Rao  wrote:

> Hello folks,
>
> I was going to post this question to spark user group as well. If you have
> any leads on how to solve this issue please let me know:
>
> I am trying to build a basic spark project (spark depends on akka) and I
> am trying to create a fatjar using sbt assembly. The goal is to run the
> fatjar via commandline as follows:
>  java -cp "path to my spark fatjar" mainclassname
>
> I encountered deduplication errors in the following akka libraries during
> sbt assembly
> akka-remote_2.10-2.2.3.jar with akka-remote_2.10-2.2.3-shaded-protobuf.jar
>  akka-actor_2.10-2.2.3.jar with akka-actor_2.10-2.2.3-shaded-protobuf.jar
>
> I resolved them by using MergeStrategy.first and that helped with a
> successful compilation of the sbt assembly command. But for some or the
> other configuration parameter in the akka kept throwing up with the
> following message
>
> "Exception in thread "main" com.typesafe.config.ConfigException$Missing:
> No configuration setting found for key"
>
> I then used MergeStrategy.concat for "reference.conf" and I started
> getting this repeated error
>
> Exception in thread "main" com.typesafe.config.ConfigException$Missing: No
> configuration setting found for key 'akka.version'.
>
> I noticed that akka.version is only in the akka-actor jars and not in the
> akka-remote. The resulting reference.conf (in my final fat jar) does not
> contain akka.version either. So the strategy is not working.
>
> There are several things I could try
>
> a) Use the following dependency https://github.com/sbt/sbt-proguard
> b) Write a build.scala to handle merging of reference.conf
>
> https://spark-project.atlassian.net/browse/SPARK-395
> http://letitcrash.com/post/21025950392/howto-sbt-assembly-vs-reference-conf
>
> c) Create a reference.conf by merging all akka configurations and then
> passing it in my java -cp command as shown below
>
> java -cp  -DConfig.file=
>
> The main issue is that if I run the spark jar as "sbt run" there are no
> errors in accessing any of the akka configuration parameters. It is only
> when I run it via command line (java -cp  classname) that I
> encounter the error.
>
> Which of these is a long term fix to akka issues? For now, I removed the
> akka dependencies and that solved the problem, but I know that is not a
> long term solution
>
> Regards,
> Shivani
>
> --
> Software Engineer
> Analytics Engineering Team@ Box
> Mountain View, CA
>


Re: packaging time

2014-04-29 Thread Daniel Darabos
Tips from my experience. Disable scaladoc:

sources in doc in Compile := List()

Do not package the source:

publishArtifact in packageSrc := false

And most importantly do not run "sbt assembly". It creates a fat jar. Use
"sbt package" or "sbt stage" (from sbt-native-packager). They create a
directory full of jars, and only need to update the one containing your
code.



On Tue, Apr 29, 2014 at 8:50 PM, SK  wrote:

> Each time I run sbt/sbt assembly to compile my program, the packaging time
> takes about 370 sec (about 6 min). How can I reduce this time?
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/packaging-time-tp5048.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: packaging time

2014-04-29 Thread Mark Hamstra
Tip: read the wiki --
https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools


On Tue, Apr 29, 2014 at 12:48 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> Tips from my experience. Disable scaladoc:
>
> sources in doc in Compile := List()
>
> Do not package the source:
>
> publishArtifact in packageSrc := false
>
> And most importantly do not run "sbt assembly". It creates a fat jar. Use
> "sbt package" or "sbt stage" (from sbt-native-packager). They create a
> directory full of jars, and only need to update the one containing your
> code.
>
>
>
> On Tue, Apr 29, 2014 at 8:50 PM, SK  wrote:
>
>> Each time I run sbt/sbt assembly to compile my program, the packaging time
>> takes about 370 sec (about 6 min). How can I reduce this time?
>>
>> thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/packaging-time-tp5048.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


java.lang.ClassCastException for groupByKey

2014-04-29 Thread amit karmakar
I am getting a class cast Exception. I am clueless to why this occurs.

I am transforming a non pair RDD to PairRDD and doing groupByKey


org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times
(most recent failure: Exception failure: java.lang.ClassCastException:
java.lang.Double cannot be cast to scala.Product2)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)


Re: What is Seq[V] in updateStateByKey?

2014-04-29 Thread Sean Owen
The original DStream is of (K,V). This function creates a DStream of
(K,S). Each time slice brings one or more new V for each K. The old
state S (can be different from V!) for each K -- possibly non-existent
-- is updated in some way by a bunch of new V, to produce a new state
S -- which also might not exist anymore after update. That's why the
function is from a Seq[V], and an Option[S], to an Option[S].

If you RDD has value type V = Double then your function needs to
update state based on a new Seq[Double] at each time slice, since
Doubles are the new thing arriving for each key at each time slice.


On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
 wrote:
> What is Seq[V] in updateStateByKey?
>
> Does this store the collected tuples of the RDD in a collection?
>
>
>
> Method signature:
>
> def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =>
> Option[S] ): DStream[(K, S)]
>
>
>
> In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the
> moment I switched to a different type like Seq[(String, Double)] the code
> didn’t compile.
>
>
>
> -Adrian
>
>


Re: What is Seq[V] in updateStateByKey?

2014-04-29 Thread Tathagata Das
You may have already seen it, but I will mention it anyways. This example
may help.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala

Here the state is essentially a running count of the words seen. So the
value type (i.e, V) is Int (count of a word in each batch) and the state
type (i.e. S) is also a Int (running count). The updateFunction essentially
sums up the running count with the new count and to generate a new running
count.

TD



On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen  wrote:

> The original DStream is of (K,V). This function creates a DStream of
> (K,S). Each time slice brings one or more new V for each K. The old
> state S (can be different from V!) for each K -- possibly non-existent
> -- is updated in some way by a bunch of new V, to produce a new state
> S -- which also might not exist anymore after update. That's why the
> function is from a Seq[V], and an Option[S], to an Option[S].
>
> If you RDD has value type V = Double then your function needs to
> update state based on a new Seq[Double] at each time slice, since
> Doubles are the new thing arriving for each key at each time slice.
>
>
> On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
>  wrote:
> > What is Seq[V] in updateStateByKey?
> >
> > Does this store the collected tuples of the RDD in a collection?
> >
> >
> >
> > Method signature:
> >
> > def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =>
> > Option[S] ): DStream[(K, S)]
> >
> >
> >
> > In my case I used Seq[Double] assuming a sequence of Doubles in the RDD;
> the
> > moment I switched to a different type like Seq[(String, Double)] the code
> > didn’t compile.
> >
> >
> >
> > -Adrian
> >
> >
>


Re: Python Spark on YARN

2014-04-29 Thread Matei Zaharia
This will be possible in 1.0 after this pull request: 
https://github.com/apache/spark/pull/30

Matei

On Apr 29, 2014, at 9:51 AM, Guanhua Yan  wrote:

> Hi all:
> 
> Is it possible to develop Spark programs in Python and run them on YARN? From 
> the Python SparkContext class, it doesn't seem to have such an option.
> 
> Thank you,
> - Guanhua
> 
> ===
> Guanhua Yan, Ph.D.
> Information Sciences Group (CCS-3)
> Los Alamos National Laboratory
> Tel: +1-505-667-0176
> Email: gh...@lanl.gov
> Web: http://ghyan.weebly.com/
> ===



Spark cluster standalone setup

2014-04-29 Thread pradeep_s
Hi,
I am configuring a standalone setup for spark cluster using
spark-0.9.1-bin-hadoop2 binary.
Started the master and slave(localhost) using start-master and start-slaves
sh.I can see the master and worker started in web ui.
Now i am running a sample poc java jar file which connects to the master
url. But the application is failing with log  as given below
14/04/29 14:15:12 INFO scheduler.DAGScheduler: Submitting Stage 0
(FilteredRDD[2] at filter at SparkPOC.java:16), which has no missing parents
14/04/29 14:15:12 INFO client.AppClient$ClientActor: Executor updated:
app-20140429141512-/2 is now FAILED (class java.io.IOException: Cannot
run program "java" (in directory
"/u01/app/spark-0.9.1-bin-hadoop2/work/app-20140429141512-/2"): error=2,
No such file or directory)


I have attached the full log . 

Running the application using java -jar command  and the dependencies are
taken from relative classpath folder which has all the required jars.
spark-logs.txt
 
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-cluster-standalone-setup-tp5060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark's behavior

2014-04-29 Thread Eduardo Costa Alfaia
Hi TD,

In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code 
and a program that I wrote that sends words to the Spark worker, I use TCP as 
transport. I verified that after starting Spark, it connects to my source which 
actually starts sending, but the first word count is advertised approximately 
30 seconds after the context creation. So I'm wondering where is stored the 30 
seconds data already sent by the source. Is this a normal spark’s behaviour? I 
saw the same behaviour using the shipped JavaNetworkWordCount application.

Many thanks.
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


rdd ordering gets scrambled

2014-04-29 Thread Mohit Jaggi
Hi,
I started with a text file(CSV) of sorted data (by first column), parsed it
into Scala objects using map operation in Scala. Then I used more maps to
add some extra info to the data and saved it as text file.
The final text file is not sorted. What do I need to do to keep the order
from the original input intact?

My code looks like:

csvFile = sc.textFile(..) //file is CSV and ordered by first column
splitRdd = csvFile map { line => line.split(",",-1) }
parsedRdd = rdd map { parts =>
  {
key = parts(0) //use first column as key
value = new MyObject(parts(0), parts(1)) //parse into scala objects
(key, value)
  }

augmentedRdd = parsedRdd map { x =>
   key =  x._1
   value = //add extra fields to x._2
   (key, value)
}
augmentedRdd.saveAsFile(...) //this file is not sorted

Mohit.


Re: Spark's behavior

2014-04-29 Thread Tathagata Das
Is you batch size 30 seconds by any chance?

Assuming not, please check whether you are creating the streaming context
with master "local[n]" where n > 2. With "local" or "local[1]", the system
only has one processing slot, which is occupied by the receiver leaving no
room for processing the received data. It could be that after 30 seconds,
the server disconnects, the receiver terminates, releasing the single slot
for the processing to proceed.

TD


On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia <
e.costaalf...@unibs.it> wrote:

> Hi TD,
>
> In my tests with spark streaming, I'm using JavaNetworkWordCount(modified)
> code and a program that I wrote that sends words to the Spark worker, I use
> TCP as transport. I verified that after starting Spark, it connects to my
> source which actually starts sending, but the first word count is
> advertised approximately 30 seconds after the context creation. So I'm
> wondering where is stored the 30 seconds data already sent by the source.
> Is this a normal spark’s behaviour? I saw the same behaviour using the
> shipped JavaNetworkWordCount application.
>
> Many thanks.
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>


Re: Spark cluster standalone setup memory issue

2014-04-29 Thread pradeep_s
Also seeing logs related to memory towards the end.
14/04/29 15:07:54 INFO MemoryStore: ensureFreeSpace(138763) called with
curMem=0, maxMem=1116418867
14/04/29 15:07:54 INFO MemoryStore: Block broadcast_0 stored as values to
memory (estimated size 135.5 KB, free 1064.6 MB)
14/04/29 15:07:54 INFO FileInputFormat: Total input paths to process : 1
14/04/29 15:07:54 INFO SparkContext: Starting job: count at SparkPOC.java:18
14/04/29 15:07:54 INFO DAGScheduler: Got job 0 (count at SparkPOC.java:18)
with 2 output partitions (allowLocal=false)
14/04/29 15:07:54 INFO DAGScheduler: Final stage: Stage 0 (count at
SparkPOC.java:18)
14/04/29 15:07:54 INFO DAGScheduler: Parents of final stage: List()
14/04/29 15:07:54 INFO DAGScheduler: Missing parents: List()
14/04/29 15:07:54 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at
filter at SparkPOC.java:16), which has no missing parents
14/04/29 15:07:54 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0
(FilteredRDD[2] at filter at SparkPOC.java:16)
14/04/29 15:07:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
*1/4/04/29 15:08:09 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory/*
 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-cluster-standalone-setup-tp5060p5065.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark's behavior

2014-04-29 Thread Eduardo Costa Alfaia
Hi TD,
We are not using stream context with master local, we have 1 Master and 8 
Workers and 1 word source. The command line that we are using is:
bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount 
spark://192.168.0.13:7077
 
On Apr 30, 2014, at 0:09, Tathagata Das  wrote:

> Is you batch size 30 seconds by any chance? 
> 
> Assuming not, please check whether you are creating the streaming context 
> with master "local[n]" where n > 2. With "local" or "local[1]", the system 
> only has one processing slot, which is occupied by the receiver leaving no 
> room for processing the received data. It could be that after 30 seconds, the 
> server disconnects, the receiver terminates, releasing the single slot for 
> the processing to proceed. 
> 
> TD
> 
> 
> On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia 
>  wrote:
> Hi TD,
> 
> In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) 
> code and a program that I wrote that sends words to the Spark worker, I use 
> TCP as transport. I verified that after starting Spark, it connects to my 
> source which actually starts sending, but the first word count is advertised 
> approximately 30 seconds after the context creation. So I'm wondering where 
> is stored the 30 seconds data already sent by the source. Is this a normal 
> spark’s behaviour? I saw the same behaviour using the shipped 
> JavaNetworkWordCount application.
> 
> Many thanks.
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
> 


-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Python Spark on YARN

2014-04-29 Thread Guanhua Yan
Thanks, Matei. Will take a look at it.

Best regards,
Guanhua

From:  Matei Zaharia 
Reply-To:  
Date:  Tue, 29 Apr 2014 14:19:30 -0700
To:  
Subject:  Re: Python Spark on YARN

This will be possible in 1.0 after this pull request:
https://github.com/apache/spark/pull/30

Matei

On Apr 29, 2014, at 9:51 AM, Guanhua Yan  wrote:

> Hi all:
> 
> Is it possible to develop Spark programs in Python and run them on YARN? From
> the Python SparkContext class, it doesn't seem to have such an option.
> 
> Thank you,
> - Guanhua
> 
> ===
> Guanhua Yan, Ph.D.
> Information Sciences Group (CCS-3)
> Los Alamos National Laboratory
> Tel: +1-505-667-0176
> Email: gh...@lanl.gov
> Web: http://ghyan.weebly.com  /
> ===





Re: Spark's behavior

2014-04-29 Thread Tathagata Das
Strange! Can you just do lines.print() to print the raw data instead of
doing word count. Beyond that we can do two things.

1. Can see the Spark stage UI to see whether there are stages running
during the 30 second period you referred to?
2. If you upgrade to using Spark master branch (or Spark 1.0 RC3, see
different thread by Patrick), it has a streaming UI, which shows the number
of records received, the state of the receiver, etc. That may be more
useful in debugging whats going on .

TD


On Tue, Apr 29, 2014 at 3:31 PM, Eduardo Costa Alfaia <
e.costaalf...@unibs.it> wrote:

> Hi TD,
> We are not using stream context with master local, we have 1 Master and 8
> Workers and 1 word source. The command line that we are using is:
> bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount
> spark://192.168.0.13:7077
>
> On Apr 30, 2014, at 0:09, Tathagata Das 
> wrote:
>
> Is you batch size 30 seconds by any chance?
>
> Assuming not, please check whether you are creating the streaming context
> with master "local[n]" where n > 2. With "local" or "local[1]", the system
> only has one processing slot, which is occupied by the receiver leaving no
> room for processing the received data. It could be that after 30 seconds,
> the server disconnects, the receiver terminates, releasing the single slot
> for the processing to proceed.
>
> TD
>
>
> On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia <
> e.costaalf...@unibs.it> wrote:
>
>> Hi TD,
>>
>> In my tests with spark streaming, I'm using
>> JavaNetworkWordCount(modified) code and a program that I wrote that sends
>> words to the Spark worker, I use TCP as transport. I verified that after
>> starting Spark, it connects to my source which actually starts sending, but
>> the first word count is advertised approximately 30 seconds after the
>> context creation. So I'm wondering where is stored the 30 seconds data
>> already sent by the source. Is this a normal spark’s behaviour? I saw the
>> same behaviour using the shipped JavaNetworkWordCount application.
>>
>> Many thanks.
>> --
>> Informativa sulla Privacy: http://www.unibs.it/node/8155
>>
>
>
>
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>


R: Spark's behavior

2014-04-29 Thread Eduardo Alfaia
Hi TD, I am GMT +8 from you, Tomorrow I will get these information that you 
have asked me.

Thanks

- Messaggio originale -
Da: "Tathagata Das" 
Inviato: ‎30/‎04/‎2014 00.57
A: "user@spark.apache.org" 
Oggetto: Re: Spark's behavior

Strange! Can you just do lines.print() to print the raw data instead of doing 
word count. Beyond that we can do two things. 


1. Can see the Spark stage UI to see whether there are stages running during 
the 30 second period you referred to?
2. If you upgrade to using Spark master branch (or Spark 1.0 RC3, see different 
thread by Patrick), it has a streaming UI, which shows the number of records 
received, the state of the receiver, etc. That may be more useful in debugging 
whats going on .


TD 



On Tue, Apr 29, 2014 at 3:31 PM, Eduardo Costa Alfaia  
wrote:

Hi TD,
We are not using stream context with master local, we have 1 Master and 8 
Workers and 1 word source. The command line that we are using is:
bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount 
spark://192.168.0.13:7077
 

On Apr 30, 2014, at 0:09, Tathagata Das  wrote:


Is you batch size 30 seconds by any chance? 


Assuming not, please check whether you are creating the streaming context with 
master "local[n]" where n > 2. With "local" or "local[1]", the system only has 
one processing slot, which is occupied by the receiver leaving no room for 
processing the received data. It could be that after 30 seconds, the server 
disconnects, the receiver terminates, releasing the single slot for the 
processing to proceed. 


TD



On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia  
wrote:

Hi TD,

In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code 
and a program that I wrote that sends words to the Spark worker, I use TCP as 
transport. I verified that after starting Spark, it connects to my source which 
actually starts sending, but the first word count is advertised approximately 
30 seconds after the context creation. So I'm wondering where is stored the 30 
seconds data already sent by the source. Is this a normal spark’s behaviour? I 
saw the same behaviour using the shipped JavaNetworkWordCount application.

Many thanks.
--
Informativa sulla Privacy: http://www.unibs.it/node/8155






Informativa sulla Privacy: http://www.unibs.it/node/8155
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Fwd: MultipleOutputs IdentityReducer

2014-04-29 Thread Andre Kuhnen
Hello,

I am trying to write multiple files with Spark, but I can not find a way to
do it.

Here is the idea.

val rddKeyValue : Rdd[(String, String)] = rddlines.map( line =>
createKeyValue(line))

now I would like to save this as   and all the values inside
the file

I tried to use this after the map,  but it would overwrite the file, so I
would get only one value for each file.

With GroupByKey I get outOfMemoryError,  so  I wonder if there is a way to
append the next line on the text with the same key ??
On Hadoop we can use IdentityReducer  and KeyBAsedOutput[1]

I tried to this:

rddKeyValue.saveAsHadoopFile("hdfs://test-platform-analytics-master/tmp/dump/product",
classOf[String], classOf[String], classOf[KeyBasedOutput[String, String]])

[1]
class KeyBasedOutput[T >: Null ,V <: AnyRef] extends
MultipleTextOutputFormat[T , V] {

  /**
   * Use they key as part of the path for the final output file.
   */

  override protected def generateFileNameForKeyValue(key: T, value: V,
leaf: String) = {
key.toString()
  }

  /**
   * When actually writing the data, discard the key since it is already in
   * the file path.
   */

  override protected def generateActualKey(key: T, value: V) = {
null
  }
}

Thanks a lot


RE: Shuffle Spill Issue

2014-04-29 Thread Liu, Raymond
Hi Daniel

Thanks for your reply, While I think for reduceByKey, it will also do 
map side combine, thus extra the result is the same, say, for each partition, 
one entry per distinct word. In my case with javaserializer,  240MB dataset 
yield to around 70MB shuffle data. Only that shuffle Spill ( memory ) is 
abnormal, and sounds to me should not trigger at all. And, by the way, this 
behavior only occurs in map out side, on reduce / shuffle fetch side, this 
strange behavior won't happen.

Best Regards,
Raymond Liu

From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] 

I have no idea why shuffle spill is so large. But this might make it smaller:

val addition = (a: Int, b: Int) => a + b
val wordsCount = wordsPair.combineByKey(identity, addition, addition)

This way only one entry per distinct word will end up in the shuffle for each 
partition, instead of one entry per word occurrence.

On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond  wrote:
Hi  Patrick

        I am just doing simple word count , the data is generated by hadoop 
random text writer.

        This seems to me not quite related to compress , If I turn off compress 
on shuffle, the metrics is something like below for the smaller 240MB Dataset.


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
10      sr437:48527     35 s    8       0       8       0.0 B   2.5 MB  2.2 GB  
1291.2 KB
12      sr437:46077     34 s    8       0       8       0.0 B   2.5 MB  1822.6 
MB       1073.3 KB
13      sr434:37896     31 s    8       0       8       0.0 B   2.4 MB  1099.2 
MB       621.2 KB
15      sr438:52819     31 s    8       0       8       0.0 B   2.5 MB  1898.8 
MB       1072.6 KB
16      sr434:37103     32 s    8       0       8       0.0 B   2.4 MB  1638.0 
MB       1044.6 KB


        And the program pretty simple:

val files = sc.textFile(args(1))
val words = files.flatMap(_.split(" "))
val wordsPair = words.map(x => (x, 1))

val wordsCount = wordsPair.reduceByKey(_ + _)
val count = wordsCount.count()

println("Number of words = " + count)


Best Regards,
Raymond Liu

From: Patrick Wendell [mailto:pwend...@gmail.com]

Could you explain more what your job is doing and what data types you are 
using? These numbers alone don't necessarily indicate something is wrong. The 
relationship between the in-memory and on-disk shuffle amount is definitely a 
bit strange, the data gets compressed when written to disk, but unless you have 
a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much.

On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond  wrote:
Hi


        I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

        I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

        As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:42139     29 s    4       0       4       0.0 B   4.3 MB  23.6 GB 
4.3 MB
1       sr433:46935     1.1 min 4       0       4       0.0 B   4.2 MB  19.0 GB 
3.4 MB
10      sr436:53277     26 s    4       0       4       0.0 B   4.3 MB  25.6 GB 
4.6 MB
11      sr437:58872     32 s    4       0       4       0.0 B   4.3 MB  25.0 GB 
4.4 MB
12      sr435:48358     27 s    4       0       4       0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:50895     15 s    4       0       4       0.0 B   703.0 KB        
80.0 MB 43.2 KB
1       sr433:50207     17 s    4       0       4       0.0 B   704.7 KB        
389.5 MB        90.2 KB
10      sr436:56352     16 s    4       0       4       0.0 B   700.9 KB        
814.9 MB        181.6 KB
11      sr437:53099     15 s    4       0       4       0.0 B   689.7 KB        
0.0 B   0.0 B
12      sr435:48318     15 s    4       0       4       0.0 B   702.1 KB        
427.4 MB        90.7 KB
13      sr433:59294     17 s    4       0       4       0.0 B   704.8 KB        
779.9 MB        180.3 KB

Nothing prevent spill from h

About pluggable storage roadmap?

2014-04-29 Thread Liu, Raymond
Hi

I noticed that in spark 1.0 meetup, on 1.1 and beyond roadmap, it 
mentioned support for pluggable storage strategies. We are also planning on 
similar things to enable block manager to store data on more storage media.

So is there any exist plan or design or rough idea on this one already? 
If yes, can it be shared thus we could see how to fit our plan in.

Or if not, any idea on what this strategy should cover, other than 
block manager / shuffle manager ? So we could help to implement this framework?


Best Regards,
Raymond Liu



sparkR - is it possible to run sparkR on yarn?

2014-04-29 Thread phoenix bai
Hi all,

I searched around, but fail to find anything that says about running sparkR
on YARN.

so, is it possible to run sparkR with yarn ? either with yarn-standalone or
yarn-client mode.
if so, is there any document that could guide me through the build & setup
processes?

I am desparate for some answers, so please help!


JavaSparkConf

2014-04-29 Thread Soren Macbeth
There is a JavaSparkContext, but no JavaSparkConf object. I know SparkConf
is new in 0.9.x.

Is there a plan to add something like this to the java api?

It's rather a bother to have things like setAll take a scala
Traverable[String String] when using SparkConf from the java api.

At a minimum adding methods signatures for java collections where there are
currently scala collection would be a good start.

TIA


Re: parallelize for a large Seq is extreamly slow.

2014-04-29 Thread Earthson
I think the real problem is "spark.akka.frameSize". It is to small for
passing the data. every executor failed, and there is no executor, then the
task hangs up.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p5075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: NoSuchMethodError from Spark Java

2014-04-29 Thread wxhsdp
i met with the same question when update to spark 0.9.1
(svn checkout https://github.com/apache/spark/)

Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.SparkContext$.jarOfClass(Ljava/lang/Class;)Lscala/collection/Seq;
at org.apache.spark.examples.GroupByTest$.main(GroupByTest.scala:38)
at org.apache.spark.examples.GroupByTest.main(GroupByTest.scala)

sbt.buid:
name := "GroupByTest"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/";

is there something need to modify?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5076.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
Hi

I am running a WordCount program which count words from HDFS, and I 
noticed that the serializer part of code takes a lot of CPU time. On a 
16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, 
and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I 
have 12 disks per node and files scatter across disks, so HDFS BW is not a 
problem)

And I also notice that, in this case, the object to write is (String, 
Int), if I try some case with (int, int), the throughput will be 2-3x faster 
further.

So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle 
compress on, the 150MB/s data bandwidth in input side, will usually lead to 
around 50MB/s shuffle data)

This serialize BW looks somehow too low , so I am wondering, what's BW 
you observe in your case? Does this throughput sounds reasonable to you? If 
not, anything might possible need to be examined in my case?



Best Regards,
Raymond Liu




Re: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Patrick Wendell
Is this the serialization throughput per task or the serialization
throughput for all the tasks?

On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
> Hi
>
> I am running a WordCount program which count words from HDFS, and I 
> noticed that the serializer part of code takes a lot of CPU time. On a 
> 16core/32thread node, the total throughput is around 50MB/s by 
> JavaSerializer, and if I switching to KryoSerializer, it doubles to around 
> 100-150MB/s. ( I have 12 disks per node and files scatter across disks, so 
> HDFS BW is not a problem)
>
> And I also notice that, in this case, the object to write is (String, 
> Int), if I try some case with (int, int), the throughput will be 2-3x faster 
> further.
>
> So, in my Wordcount case, the bottleneck is CPU ( cause if with 
> shuffle compress on, the 150MB/s data bandwidth in input side, will usually 
> lead to around 50MB/s shuffle data)
>
> This serialize BW looks somehow too low , so I am wondering, what's 
> BW you observe in your case? Does this throughput sounds reasonable to you? 
> If not, anything might possible need to be examined in my case?
>
>
>
> Best Regards,
> Raymond Liu
>
>


Re: NoSuchMethodError from Spark Java

2014-04-29 Thread Patrick Wendell
The signature of this function was changed in spark 1.0... is there
any chance that somehow you are actually running against a newer
version of Spark?

On Tue, Apr 29, 2014 at 8:58 PM, wxhsdp  wrote:
> i met with the same question when update to spark 0.9.1
> (svn checkout https://github.com/apache/spark/)
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.spark.SparkContext$.jarOfClass(Ljava/lang/Class;)Lscala/collection/Seq;
> at org.apache.spark.examples.GroupByTest$.main(GroupByTest.scala:38)
> at org.apache.spark.examples.GroupByTest.main(GroupByTest.scala)
>
> sbt.buid:
> name := "GroupByTest"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1"
>
> resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
>
> is there something need to modify?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5076.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
For all the tasks, say 32 task on total

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 

Is this the serialization throughput per task or the serialization throughput 
for all the tasks?

On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
> Hi
>
> I am running a WordCount program which count words from HDFS, 
> and I noticed that the serializer part of code takes a lot of CPU 
> time. On a 16core/32thread node, the total throughput is around 50MB/s 
> by JavaSerializer, and if I switching to KryoSerializer, it doubles to 
> around 100-150MB/s. ( I have 12 disks per node and files scatter 
> across disks, so HDFS BW is not a problem)
>
> And I also notice that, in this case, the object to write is (String, 
> Int), if I try some case with (int, int), the throughput will be 2-3x faster 
> further.
>
> So, in my Wordcount case, the bottleneck is CPU ( cause if 
> with shuffle compress on, the 150MB/s data bandwidth in input side, 
> will usually lead to around 50MB/s shuffle data)
>
> This serialize BW looks somehow too low , so I am wondering, what's 
> BW you observe in your case? Does this throughput sounds reasonable to you? 
> If not, anything might possible need to be examined in my case?
>
>
>
> Best Regards,
> Raymond Liu
>
>


Re: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Patrick Wendell
Hm - I'm still not sure if you mean
100MB/s for each task = 3200MB/s across all cores
-or-
3.1MB/s for each task = 100MB/s across all cores

If it's the second one, that's really slow and something is wrong. If
it's the first one this in the range of what I'd expect, but I'm no
expert.

On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond  wrote:
> For all the tasks, say 32 task on total
>
> Best Regards,
> Raymond Liu
>
>
> -Original Message-
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Is this the serialization throughput per task or the serialization throughput 
> for all the tasks?
>
> On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
>> Hi
>>
>> I am running a WordCount program which count words from HDFS,
>> and I noticed that the serializer part of code takes a lot of CPU
>> time. On a 16core/32thread node, the total throughput is around 50MB/s
>> by JavaSerializer, and if I switching to KryoSerializer, it doubles to
>> around 100-150MB/s. ( I have 12 disks per node and files scatter
>> across disks, so HDFS BW is not a problem)
>>
>> And I also notice that, in this case, the object to write is 
>> (String, Int), if I try some case with (int, int), the throughput will be 
>> 2-3x faster further.
>>
>> So, in my Wordcount case, the bottleneck is CPU ( cause if
>> with shuffle compress on, the 150MB/s data bandwidth in input side,
>> will usually lead to around 50MB/s shuffle data)
>>
>> This serialize BW looks somehow too low , so I am wondering, what's 
>> BW you observe in your case? Does this throughput sounds reasonable to you? 
>> If not, anything might possible need to be examined in my case?
>>
>>
>>
>> Best Regards,
>> Raymond Liu
>>
>>


Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Hi Patrick,

I¹m a little confused about your comment that RDDs are not ordered. As far
as I know, RDDs keep list of partitions that are ordered and this is why I
can call RDD.take() and get the same first k rows every time I call it and
RDD.take() returns the same entries as RDD.map(Š).take() because map
preserves the partition order. RDD order is also what allows me to get the
top k out of RDD by doing RDD.sort().take().

Am I misunderstanding it? Or, is it just when RDD is written to disk that
the order is not well preserved? Thanks in advance!

Mingyu




On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:

>Ah somehow after all this time I've never seen that!
>
>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia 
>wrote:
>>
>>
>>
>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell 
>> wrote:
>>>
>>> What is the ++ operator here? Is this something you defined?
>>
>>
>> No, it's an alias for union defined in RDD.scala:
>>
>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>
>>>
>>>
>>> Another issue is that RDD's are not ordered, so when you union two
>>> together it doesn't have a well defined ordering.
>>>
>>> If you do want to do this you could coalesce into one partition, then
>>> call MapPartitions and return an iterator that first adds your header
>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>> this will only work if you coalesce into a single partition.
>>
>>
>> Thanks! I'll give this a try.
>>
>>>
>>>
>>> myRdd.coalesce(1)
>>> .map(_.mkString(",")))
>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>> .saveAsTextFile("out.csv")
>>>
>>> - Patrick
>>>
>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>  wrote:
>>> > Hi,
>>> >
>>> > I'm trying to find a way to create a csv header when using
>>> > saveAsTextFile,
>>> > and I came up with this:
>>> >
>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>> >   .saveAsTextFile("out.csv")
>>> >
>>> > But it only saves the header part. Why is that the union method does
>>>not
>>> > return both RDD's?
>>
>>


smime.p7s
Description: S/MIME cryptographic signature


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
By the way, to be clear, I run repartition firstly to make all data go through 
shuffle instead of run ReduceByKey etc directly ( which reduce the data need to 
be shuffle and serialized), thus say all 50MB/s data from HDFS will go to 
serializer. ( in fact, I also tried generate data in memory directly instead of 
read from HDFS, similar throughput result)

Best Regards,
Raymond Liu


-Original Message-
From: Liu, Raymond [mailto:raymond@intel.com] 

For all the tasks, say 32 task on total

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 

Is this the serialization throughput per task or the serialization throughput 
for all the tasks?

On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
> Hi
>
> I am running a WordCount program which count words from HDFS, 
> and I noticed that the serializer part of code takes a lot of CPU 
> time. On a 16core/32thread node, the total throughput is around 50MB/s 
> by JavaSerializer, and if I switching to KryoSerializer, it doubles to 
> around 100-150MB/s. ( I have 12 disks per node and files scatter 
> across disks, so HDFS BW is not a problem)
>
> And I also notice that, in this case, the object to write is (String, 
> Int), if I try some case with (int, int), the throughput will be 2-3x faster 
> further.
>
> So, in my Wordcount case, the bottleneck is CPU ( cause if 
> with shuffle compress on, the 150MB/s data bandwidth in input side, 
> will usually lead to around 50MB/s shuffle data)
>
> This serialize BW looks somehow too low , so I am wondering, what's 
> BW you observe in your case? Does this throughput sounds reasonable to you? 
> If not, anything might possible need to be examined in my case?
>
>
>
> Best Regards,
> Raymond Liu
>
>


Re: JavaSparkConf

2014-04-29 Thread Patrick Wendell
This class was made to be "java friendly" so that we wouldn't have to
use two versions. The class itself is simple. But I agree adding java
setters would be nice.

On Tue, Apr 29, 2014 at 8:32 PM, Soren Macbeth  wrote:
> There is a JavaSparkContext, but no JavaSparkConf object. I know SparkConf
> is new in 0.9.x.
>
> Is there a plan to add something like this to the java api?
>
> It's rather a bother to have things like setAll take a scala
> Traverable[String String] when using SparkConf from the java api.
>
> At a minimum adding methods signatures for java collections where there are
> currently scala collection would be a good start.
>
> TIA


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
Later case, total throughput aggregated from all cores.

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: Wednesday, April 30, 2014 1:22 PM
To: user@spark.apache.org
Subject: Re: How fast would you expect shuffle serialize to be?

Hm - I'm still not sure if you mean
100MB/s for each task = 3200MB/s across all cores
-or-
3.1MB/s for each task = 100MB/s across all cores

If it's the second one, that's really slow and something is wrong. If it's the 
first one this in the range of what I'd expect, but I'm no expert.

On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond  wrote:
> For all the tasks, say 32 task on total
>
> Best Regards,
> Raymond Liu
>
>
> -Original Message-
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Is this the serialization throughput per task or the serialization throughput 
> for all the tasks?
>
> On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
>> Hi
>>
>> I am running a WordCount program which count words from HDFS, 
>> and I noticed that the serializer part of code takes a lot of CPU 
>> time. On a 16core/32thread node, the total throughput is around 
>> 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it 
>> doubles to around 100-150MB/s. ( I have 12 disks per node and files 
>> scatter across disks, so HDFS BW is not a problem)
>>
>> And I also notice that, in this case, the object to write is 
>> (String, Int), if I try some case with (int, int), the throughput will be 
>> 2-3x faster further.
>>
>> So, in my Wordcount case, the bottleneck is CPU ( cause if 
>> with shuffle compress on, the 150MB/s data bandwidth in input side, 
>> will usually lead to around 50MB/s shuffle data)
>>
>> This serialize BW looks somehow too low , so I am wondering, what's 
>> BW you observe in your case? Does this throughput sounds reasonable to you? 
>> If not, anything might possible need to be examined in my case?
>>
>>
>>
>> Best Regards,
>> Raymond Liu
>>
>>


Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Patrick Wendell
You are right, once you sort() the RDD, then yes it has a well defined ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim  wrote:
> Hi Patrick,
>
> I¹m a little confused about your comment that RDDs are not ordered. As far
> as I know, RDDs keep list of partitions that are ordered and this is why I
> can call RDD.take() and get the same first k rows every time I call it and
> RDD.take() returns the same entries as RDD.map(Š).take() because map
> preserves the partition order. RDD order is also what allows me to get the
> top k out of RDD by doing RDD.sort().take().
>
> Am I misunderstanding it? Or, is it just when RDD is written to disk that
> the order is not well preserved? Thanks in advance!
>
> Mingyu
>
>
>
>
> On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:
>
>>Ah somehow after all this time I've never seen that!
>>
>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia 
>>wrote:
>>>
>>>
>>>
>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell 
>>> wrote:

 What is the ++ operator here? Is this something you defined?
>>>
>>>
>>> No, it's an alias for union defined in RDD.scala:
>>>
>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>


 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition, then
 call MapPartitions and return an iterator that first adds your header
 and then the rest of the file, then call saveAsTextFile. Keep in mind
 this will only work if you coalesce into a single partition.
>>>
>>>
>>> Thanks! I'll give this a try.
>>>


 myRdd.coalesce(1)
 .map(_.mkString(",")))
 .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
 .saveAsTextFile("out.csv")

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
  wrote:
 > Hi,
 >
 > I'm trying to find a way to create a csv header when using
 > saveAsTextFile,
 > and I came up with this:
 >
 > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
 > myRdd.coalesce(1).map(_.mkString(",")))
 >   .saveAsTextFile("out.csv")
 >
 > But it only saves the header part. Why is that the union method does
not
 > return both RDD's?
>>>
>>>


Re: JavaSparkConf

2014-04-29 Thread Soren Macbeth
My implication is that it isn't "java friendly" enough. The follow methods
return scala objects

getAkkaConf
getAll
getExecutorEnv

and the follow method require scala objects as their params

setAll
setExecutorEnv (both of the bulk methods)

so-- while it is usable from java, I wouldn't call it friendly. all of the
bulk setters and getters take and return scala objects (the exception being
setJars, luckily).


On Tue, Apr 29, 2014 at 10:23 PM, Patrick Wendell wrote:

> This class was made to be "java friendly" so that we wouldn't have to
> use two versions. The class itself is simple. But I agree adding java
> setters would be nice.
>
> On Tue, Apr 29, 2014 at 8:32 PM, Soren Macbeth  wrote:
> > There is a JavaSparkContext, but no JavaSparkConf object. I know
> SparkConf
> > is new in 0.9.x.
> >
> > Is there a plan to add something like this to the java api?
> >
> > It's rather a bother to have things like setAll take a scala
> > Traverable[String String] when using SparkConf from the java api.
> >
> > At a minimum adding methods signatures for java collections where there
> are
> > currently scala collection would be a good start.
> >
> > TIA
>


Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Thanks for the quick response!

To better understand it, the reason sorted RDD has a well-defined ordering
is because sortedRDD.getPartitions() returns the partitions in the right
order and each partition internally is properly sorted. So, if you have

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]

Since mapValues doesn’t change the order of partitions not change the
order of rows within the partitions, I think “mapped” should have the
exact same order as “sorted”. Sure, if a transform involves shuffling, the
order will change. Am I mistaken? Is there an extra detail in sortedRDD
that guarantees a well-defined ordering?

If it’s true that the order of partitions returned by RDD.getPartitions()
and the row orders within the partitions determine the row order, I’m not
sure why union doesn’t respect the order because union operation simply
concatenates the two lists of partitions from the two RDDs.

Mingyu




On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:

>You are right, once you sort() the RDD, then yes it has a well defined
>ordering.
>
>But that ordering is lost as soon as you transform the RDD, including
>if you union it with another RDD.
>
>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim  wrote:
>> Hi Patrick,
>>
>> I¹m a little confused about your comment that RDDs are not ordered. As
>>far
>> as I know, RDDs keep list of partitions that are ordered and this is
>>why I
>> can call RDD.take() and get the same first k rows every time I call it
>>and
>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>> preserves the partition order. RDD order is also what allows me to get
>>the
>> top k out of RDD by doing RDD.sort().take().
>>
>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>that
>> the order is not well preserved? Thanks in advance!
>>
>> Mingyu
>>
>>
>>
>>
>> On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:
>>
>>>Ah somehow after all this time I've never seen that!
>>>
>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>
>>>wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell 
 wrote:
>
> What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)

>
>
> Another issue is that RDD's are not ordered, so when you union two
> together it doesn't have a well defined ordering.
>
> If you do want to do this you could coalesce into one partition, then
> call MapPartitions and return an iterator that first adds your header
> and then the rest of the file, then call saveAsTextFile. Keep in mind
> this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.

>
>
> myRdd.coalesce(1)
> .map(_.mkString(",")))
> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
> .saveAsTextFile("out.csv")
>
> - Patrick
>
> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>  wrote:
> > Hi,
> >
> > I'm trying to find a way to create a csv header when using
> > saveAsTextFile,
> > and I came up with this:
> >
> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
> > myRdd.coalesce(1).map(_.mkString(",")))
> >   .saveAsTextFile("out.csv")
> >
> > But it only saves the header part. Why is that the union method
>does
>not
> > return both RDD's?




smime.p7s
Description: S/MIME cryptographic signature


Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Patrick Wendell
If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
> Thanks for the quick response!
>
> To better understand it, the reason sorted RDD has a well-defined ordering
> is because sortedRDD.getPartitions() returns the partitions in the right
> order and each partition internally is properly sorted. So, if you have
>
> var rdd = sc.parallelize([2, 1, 3]);
> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>
> Since mapValues doesn’t change the order of partitions not change the
> order of rows within the partitions, I think “mapped” should have the
> exact same order as “sorted”. Sure, if a transform involves shuffling, the
> order will change. Am I mistaken? Is there an extra detail in sortedRDD
> that guarantees a well-defined ordering?
>
> If it’s true that the order of partitions returned by RDD.getPartitions()
> and the row orders within the partitions determine the row order, I’m not
> sure why union doesn’t respect the order because union operation simply
> concatenates the two lists of partitions from the two RDDs.
>
> Mingyu
>
>
>
>
> On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:
>
>>You are right, once you sort() the RDD, then yes it has a well defined
>>ordering.
>>
>>But that ordering is lost as soon as you transform the RDD, including
>>if you union it with another RDD.
>>
>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim  wrote:
>>> Hi Patrick,
>>>
>>> I¹m a little confused about your comment that RDDs are not ordered. As
>>>far
>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>why I
>>> can call RDD.take() and get the same first k rows every time I call it
>>>and
>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>> preserves the partition order. RDD order is also what allows me to get
>>>the
>>> top k out of RDD by doing RDD.sort().take().
>>>
>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>that
>>> the order is not well preserved? Thanks in advance!
>>>
>>> Mingyu
>>>
>>>
>>>
>>>
>>> On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:
>>>
Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia

wrote:
>
>
>
> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell 
> wrote:
>>
>> What is the ++ operator here? Is this something you defined?
>
>
> No, it's an alias for union defined in RDD.scala:
>
> def ++(other: RDD[T]): RDD[T] = this.union(other)
>
>>
>>
>> Another issue is that RDD's are not ordered, so when you union two
>> together it doesn't have a well defined ordering.
>>
>> If you do want to do this you could coalesce into one partition, then
>> call MapPartitions and return an iterator that first adds your header
>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>> this will only work if you coalesce into a single partition.
>
>
> Thanks! I'll give this a try.
>
>>
>>
>> myRdd.coalesce(1)
>> .map(_.mkString(",")))
>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>> .saveAsTextFile("out.csv")
>>
>> - Patrick
>>
>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>  wrote:
>> > Hi,
>> >
>> > I'm trying to find a way to create a csv header when using
>> > saveAsTextFile,
>> > and I came up with this:
>> >
>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>> > myRdd.coalesce(1).map(_.mkString(",")))
>> >   .saveAsTextFile("out.csv")
>> >
>> > But it only saves the header part. Why is that the union method
>>does
>>not
>> > return both RDD's?
>
>


Re: sparkR - is it possible to run sparkR on yarn?

2014-04-29 Thread Shivaram Venkataraman
We don't have any documentation on running SparkR on YARN and I think there
might be some issues that need to be fixed (The recent PySpark on YARN PRs
are an example).
SparkR has only been tested to work with Spark standalone mode so far.

Thanks
Shivaram



On Tue, Apr 29, 2014 at 7:56 PM, phoenix bai  wrote:

> Hi all,
>
> I searched around, but fail to find anything that says about running
> sparkR on YARN.
>
> so, is it possible to run sparkR with yarn ? either with yarn-standalone
> or yarn-client mode.
> if so, is there any document that could guide me through the build & setup
> processes?
>
> I am desparate for some answers, so please help!
>


Setting spark.locality.wait.node parameter in interactive shell

2014-04-29 Thread Sai Prasanna
Hi, Any suggestion to the following issue ??

I have replication factor 3 in my HDFS.
With 3 datanodes, i ran my experiments. Now i just added another node to it
with no data in it.
When i ran, SPARK launches non-local tasks in it and the time taken is more
than what it took for 3 node cluster.

Here delayed scheduling fails i think because of the parameter
spark.locality.wait.node which is by default 3 sec. It launches "ANY" level
tasks in the added data node.

*How to set the spark.locality.wait.node parameter in the env for
interactive shell sc.*

Thanks !


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
I just tried to use serializer to write object directly in local mode with code:

val datasize =  args(1).toInt
val dataset = (0 until datasize).map( i => ("asmallstring", i))

val out: OutputStream = {
new BufferedOutputStream(new FileOutputStream(args(2)), 1024 * 100)
  }

val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)

dataset.foreach( value =>
  serOut.writeObject(value)
)
serOut.flush()
serOut.close()

Thus one core on one disk. When using javaserializer, throughput is 10~12MB/s, 
and kryo doubles. So it seems to me that when running the full path code in my 
previous case, 32 core with 50MB/s total throughput are reasonable?


Best Regards,
Raymond Liu


-Original Message-
From: Liu, Raymond [mailto:raymond@intel.com] 


Later case, total throughput aggregated from all cores.

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com]
Sent: Wednesday, April 30, 2014 1:22 PM
To: user@spark.apache.org
Subject: Re: How fast would you expect shuffle serialize to be?

Hm - I'm still not sure if you mean
100MB/s for each task = 3200MB/s across all cores
-or-
3.1MB/s for each task = 100MB/s across all cores

If it's the second one, that's really slow and something is wrong. If it's the 
first one this in the range of what I'd expect, but I'm no expert.

On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond  wrote:
> For all the tasks, say 32 task on total
>
> Best Regards,
> Raymond Liu
>
>
> -Original Message-
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Is this the serialization throughput per task or the serialization throughput 
> for all the tasks?
>
> On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
>> Hi
>>
>> I am running a WordCount program which count words from HDFS, 
>> and I noticed that the serializer part of code takes a lot of CPU 
>> time. On a 16core/32thread node, the total throughput is around 
>> 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it 
>> doubles to around 100-150MB/s. ( I have 12 disks per node and files 
>> scatter across disks, so HDFS BW is not a problem)
>>
>> And I also notice that, in this case, the object to write is 
>> (String, Int), if I try some case with (int, int), the throughput will be 
>> 2-3x faster further.
>>
>> So, in my Wordcount case, the bottleneck is CPU ( cause if 
>> with shuffle compress on, the 150MB/s data bandwidth in input side, 
>> will usually lead to around 50MB/s shuffle data)
>>
>> This serialize BW looks somehow too low , so I am wondering, what's 
>> BW you observe in your case? Does this throughput sounds reasonable to you? 
>> If not, anything might possible need to be examined in my case?
>>
>>
>>
>> Best Regards,
>> Raymond Liu
>>
>>


Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
but the order of rows semantically are kept throughout non-shuffling
transforms. I’m on board with you on union as well.

Back to the original question, then, why is it important to coalesce to a
single partition? When you union two RDDs, for example, rdd1 = [“a, b,
c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
two reds are concatenated.

Mingyu




On 4/29/14, 10:55 PM, "Patrick Wendell"  wrote:

>If you call map() on an RDD it will retain the ordering it had before,
>but that is not necessarily a correct sort order for the new RDD.
>
>var rdd = sc.parallelize([2, 1, 3]);
>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>
>Note that mapped is no longer sorted.
>
>When you union two RDD's together it will effectively concatenate the
>two orderings, which is also not a valid sorted order on the new RDD:
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5]
>
>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim  wrote:
>> Thanks for the quick response!
>>
>> To better understand it, the reason sorted RDD has a well-defined
>>ordering
>> is because sortedRDD.getPartitions() returns the partitions in the right
>> order and each partition internally is properly sorted. So, if you have
>>
>> var rdd = sc.parallelize([2, 1, 3]);
>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>
>> Since mapValues doesn’t change the order of partitions not change the
>> order of rows within the partitions, I think “mapped” should have the
>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>the
>> order will change. Am I mistaken? Is there an extra detail in sortedRDD
>> that guarantees a well-defined ordering?
>>
>> If it’s true that the order of partitions returned by
>>RDD.getPartitions()
>> and the row orders within the partitions determine the row order, I’m
>>not
>> sure why union doesn’t respect the order because union operation simply
>> concatenates the two lists of partitions from the two RDDs.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:25 PM, "Patrick Wendell"  wrote:
>>
>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>ordering.
>>>
>>>But that ordering is lost as soon as you transform the RDD, including
>>>if you union it with another RDD.
>>>
>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim  wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered. As
far
 as I know, RDDs keep list of partitions that are ordered and this is
why I
 can call RDD.take() and get the same first k rows every time I call it
and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to get
the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk
that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, "Patrick Wendell"  wrote:

>Ah somehow after all this time I've never seen that!
>
>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>
>wrote:
>>
>>
>>
>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>
>> wrote:
>>>
>>> What is the ++ operator here? Is this something you defined?
>>
>>
>> No, it's an alias for union defined in RDD.scala:
>>
>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>
>>>
>>>
>>> Another issue is that RDD's are not ordered, so when you union two
>>> together it doesn't have a well defined ordering.
>>>
>>> If you do want to do this you could coalesce into one partition,
>>>then
>>> call MapPartitions and return an iterator that first adds your
>>>header
>>> and then the rest of the file, then call saveAsTextFile. Keep in
>>>mind
>>> this will only work if you coalesce into a single partition.
>>
>>
>> Thanks! I'll give this a try.
>>
>>>
>>>
>>> myRdd.coalesce(1)
>>> .map(_.mkString(",")))
>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>> .saveAsTextFile("out.csv")
>>>
>>> - Patrick
>>>
>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>  wrote:
>>> > Hi,
>>> >
>>> > I'm trying to find a way to create a csv header when using
>>> > saveAsTextFile,
>>> > and I came up with this:
>>> >
>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>> >   .saveAsTextFile("out.csv")
>>> >

Re: NoSuchMethodError from Spark Java

2014-04-29 Thread wxhsdp
Hi, patrick

i checked out https://github.com/apache/spark/ this morning and built
/spark/trunk
with ./sbt/sbt assembly

is it spark 1.0?

so how can i update my sbt file? the latest version in
http://repo1.maven.org/maven2/org/apache/spark/
is 0.9.1

thank you for your help



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.