Re: Persist streams to text files

2014-11-21 Thread Prannoy
Hi ,

You can use FileUtil.copemerge API and specify the path to the folder where
saveAsTextFile is save the part text file.

Suppose your directory is /a/b/c/

use FileUtil.copeMerge(FileSystem of source, a/b/c, FileSystem of
destination, Path to the merged file say (a/b/c.txt), true(to delete the
original dir,null))

Thanks.

On Fri, Nov 21, 2014 at 11:31 AM, Jishnu Prathap [via Apache Spark User
List]  wrote:

>  Hi I am also having similar problem.. any fix suggested..
>
>
>
> *Originally Posted by GaganBM*
>
> Hi,
>
> I am trying to persist the DStreams to text files. When I use the inbuilt
> API 'saveAsTextFiles' as :
>
> stream.saveAsTextFiles(resultDirectory)
>
> this creates a number of subdirectories, for each batch, and within each
> sub directory, it creates bunch of text files for each RDD (I assume).
>
> I am wondering if I can have single text files for each batch. Is there
> any API for that ? Or else, a single output file for the entire stream ?
>
> I tried to manually write from each RDD stream to a text file as :
>
> stream.foreachRDD(rdd =>{
>   rdd.foreach(element => {
>   fileWriter.write(element)
>   })
>   })
>
> where 'fileWriter' simply makes use of a Java BufferedWriter to write
> strings to a file. However, this fails with exception :
>
> DStreamCheckpointData.writeObject used
> java.io.BufferedWriter
> java.io.NotSerializableException: java.io.BufferedWriter
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> .
>
> Any help on how to proceed with this ?
>
> The information contained in this electronic message and any attachments
> to this message are intended for the exclusive use of the addressee(s) and
> may contain proprietary, confidential or privileged information. If you are
> not the intended recipient, you should not disseminate, distribute or copy
> this e-mail. Please notify the sender immediately and destroy all copies of
> this message and any attachments.
>
> WARNING: Computer viruses can be transmitted via email. The recipient
> should check this email and any attachments for the presence of viruses.
> The company accepts no liability for any damage caused by any virus
> transmitted by this email.
>
> www.wipro.com
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Persist-streams-to-text-files-tp19449.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Persist-streams-to-text-files-tp19449p19457.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: beeline via spark thrift doesn't retain cache

2014-11-21 Thread Yanbo Liang
1) make sure your beeline client connected to Hiveserver2 of Spark SQL.
You can found execution logs of Hiveserver2 in the environment
of start-thriftserver.sh.
2) what about your scale of data. If cache with small data, it will take
more time to schedule workload between different executors.
Look the configuration of spark execution environment. Whether there are
enough memory for RDD storage, if not, it will take some time to
serialize/deserialize data between memory and disk.

2014-11-21 11:06 GMT+08:00 Judy Nash :

>  Hi friends,
>
>
>
> I have successfully setup thrift server and execute beeline on top.
>
>
>
> Beeline can handle select queries just fine, but it cannot seem to do any
> kind of caching/RDD operations.
>
>
>
> i.e.
>
> 1)  Command “cache table” doesn’t work. See error:
>
> Error: Error while processing statement: FAILED: ParseException line 1:0
> cannot
>
> recognize input near 'cache' 'table' 'hivesampletable'
> (state=42000,code=4)
>
>
>
> 2)  Re-run SQL commands do not have any performance improvements.
>
>
>
> By comparison, Spark-SQL shell can execute “cache table” command and
> rerunning SQL command has a huge performance boost.
>
>
>
> Am I missing something or this is expected when execute through Spark
> thrift server?
>
>
>
> Thanks!
>
> Judy
>
>
>


RE: Persist streams to text files

2014-11-21 Thread jishnu.prathap
Hi
Thank you ☺Akhil it worked like charm…..
I used the file writer outside rdd.foreach that might be the reason for 
nonserialisable exception….

Thanks & Regards
Jishnu Menath Prathap
From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Friday, November 21, 2014 1:15 PM
To: Jishnu Menath Prathap (WT01 - BAS)
Cc: u...@spark.incubator.apache.org
Subject: Re: Persist streams to text files

Here's a quick version to store (append) in your local machine

val tweets = TwitterUtils.createStream(ssc, None)

val hashTags = tweets.flatMap(status => status.getText.split(" 
").filter(_.startsWith("#")))


hashTags.foreachRDD(rdds => {

  rdds.foreach(rdd => {
val fw = new FileWriter("/home/akhld/tags.txt", true)
println("HashTag => " + rdd)
fw.write(rdd + "\n")
fw.close()
  })

})

Thanks
Best Regards

On Fri, Nov 21, 2014 at 12:12 PM, 
mailto:jishnu.prat...@wipro.com>> wrote:
Hi Akhil
Thanks for reply
But it creates different directories ..I tried using filewriter  but it shows 
non serializable error..
val stream = TwitterUtils.createStream(ssc, None) //, filters)

val statuses = stream.map(
  status => sentimentAnalyzer.findSentiment({
status.getText().replaceAll("[^A-Za-z0-9 \\#]", "")

  })
  )

val line = statuses.foreachRDD(
  rdd => {
rdd.foreach(
  tweetWithSentiment => {
if(!tweetWithSentiment.getLine().isEmpty())
println(tweetWithSentiment.getCssClass() + " for line :=>  " + 
tweetWithSentiment.getLine())//Now I print in console but I need to update it 
to a file in local machine

  })
  })

Thanks & Regards
Jishnu Menath Prathap
From: Akhil Das 
[mailto:ak...@sigmoidanalytics.com]
Sent: Friday, November 21, 2014 11:48 AM
To: Jishnu Menath Prathap (WT01 - BAS)
Cc: u...@spark.incubator.apache.org
Subject: Re: Persist streams to text files


To have a single text file output for each batch you can repartition it to 1 
and then call the saveAsTextFiles

stream.repartition(1).saveAsTextFiles(location)
On 21 Nov 2014 11:28, 
mailto:jishnu.prat...@wipro.com>> wrote:
Hi I am also having similar problem.. any fix suggested..

Originally Posted by GaganBM
Hi,

I am trying to persist the DStreams to text files. When I use the inbuilt API 
'saveAsTextFiles' as :

stream.saveAsTextFiles(resultDirectory)

this creates a number of subdirectories, for each batch, and within each sub 
directory, it creates bunch of text files for each RDD (I assume).

I am wondering if I can have single text files for each batch. Is there any API 
for that ? Or else, a single output file for the entire stream ?

I tried to manually write from each RDD stream to a text file as :

stream.foreachRDD(rdd =>{
  rdd.foreach(element => {
  fileWriter.write(element)
  })
  })

where 'fileWriter' simply makes use of a Java BufferedWriter to write strings 
to a file. However, this fails with exception :

DStreamCheckpointData.writeObject used
java.io.BufferedWriter
java.io.NotSerializableException: java.io.BufferedWriter
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
.

Any help on how to proceed with this ?

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments.

WARNING: Computer viruses can be transmitted via email. The recipient should 
check this email and any attachments for the presence of viruses. The company 
accepts no liability for any damage caused by any virus transmitted by this 
email.

www.wipro.com

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments.

WARNING: Computer viruses can be transmitted via email. The recipient should 
check this email and any attachments for the presence of viruses. The company 
accepts no liability for any damage caused by any virus transmitted by this 
email.

www.wipro.com


The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you 

Re: processing files

2014-11-21 Thread phiroc
Hi Simon,

no, I don't need to run the tasks on multiple machines for now.

I will therefore stick to Makefile + shell or Java programs as Spark appears 
not to be the right tool for the tasks I am trying to accomplish.

Thanks you for your input.

Philippe



- Mail original -
De: "Simon Hafner" 
À: "Philippe de Rochambeau" 
Envoyé: Vendredi 21 Novembre 2014 09:47:25
Objet: Re: processing files

2014-11-21 1:46 GMT-06:00 Philippe de Rochambeau :
> - reads xml files in thousands of directories, two levels down, from year x 
> to year y

You could try

sc.parallelize(new File(dirWithXML)).flatMap(sc.wholeTextFiles(_))

... not guaranteed to work.

> - extracts data from  tags in those files and stores them in a Sql or 
> NoSql database

>From what I understand, spark expects no side effects from the
functions you pass to map(). So that's probably not that good of an
idea if you don't want duplicated records.

> - generates ImageMagick commands based on the extracted data to generate 
> images

data transformation, easy. collect() and save.

> - generates curl commands to index the image files with Solr

same as imagemagick.

> Does Spark provide any tools/features to facilitate and automate ("batchify") 
> the above tasks?

Sure, but I wouldn't run the commands with spark. They might be run
twice or more.

> I can do all of the above with one or several Java programs, but I wondered 
> if using Spark would be of any use in such an endeavour.

Personally, I'd use a Makefile, xmlstarlet for the xml parsing, and
store the image paths to plaintext instead of a database, and get
parallelization via -j X. You could also run the imagemagick and curl
commands from there. But that naturally doesn't scale to multiple
machines.

Do you have more than one machine available to run this one? Do you
need to run it on more than one machine, because it takes too long on
just one? That's what spark excels at.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark serialization issues with third-party libraries

2014-11-21 Thread Sean Owen
You are probably casually sending UIMA objects from the driver to
executors in a closure. You'll have to design your program so that you
do not need to ship these objects to or from the remote task workers.

On Fri, Nov 21, 2014 at 8:39 AM, jatinpreet  wrote:
> Hi,
>
> I am planning to use UIMA library to process data in my RDDs. I have had bad
> experiences while using third party libraries inside worker tasks. The
> system gets plagued with Serialization issues. But as UIMA classes are not
> necessarily Serializable, I am not sure if it will work.
>
> Please explain which classes need to be Serializable and which of them can
> be left as it is? A clear understanding will help me a lot.
>
> Thanks,
> Jatin
>
>
>
> -
> Novice Big Data Programmer
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-serialization-issues-with-third-party-libraries-tp19454.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)

2014-11-21 Thread Earthson
Is there any way to get the yarn application_id inside the program?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




spark code style

2014-11-21 Thread Kevin Jung
Hi all.
Here are two code snippets.
And they will produce the same result.

1.
rdd.map( function )

2.
rdd.map( function1 ).map( function2 ).map( function3 )

What are the pros and cons of these two methods?

Regards
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-code-style-tp19463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is there a way to turn on spark eventLog on the worker node?

2014-11-21 Thread Xuelin Cao

Hi,

I'm going to debug some spark applications on our testing platform. And
it would be helpful if we can see the eventLog on the *worker *node. 

I've tried to turn on *spark.eventLog.enabled* and set
*spark.eventLog.dir* parameters on the worker node. However, it doesn't
work.

I do have event logs on my driver node, and I know how to turn it on.
However, the same settings doesn't work on the worker node. 

Can anyone help me to clarify whether event log is only available on
driver node?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-turn-on-spark-eventLog-on-the-worker-node-tp19464.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Determine number of running executors

2014-11-21 Thread Yanbo Liang
You can get parameter such as spark.executor.memory, but you can not get
executor or core numbers.
Because executor and core are parameters of spark deploy environment not
spark context.

val conf = new SparkConf().set("spark.executor.memory","2G")
val sc = new SparkContext(conf)

sc.getConf.get("spark.executor.memory")
conf.get("spark.executor.memory")

2014-11-21 15:35 GMT+08:00 Tobias Pfeiffer :

> Hi,
>
> when running on YARN, is there a way for the Spark driver to know how many
> executors, cores per executor etc. there are? I want to know this so I can
> repartition to a good number.
>
> Thanks
> Tobias
>


short-circuit local reads cannot be used

2014-11-21 Thread Daniel Haviv
Hi,
Everytime I start the spark-shell I encounter this message:
14/11/18 00:27:43 WARN hdfs.BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.

Any idea how to overcome it ?
the short-circuit feature is a big perfomance boost I don't want to lose..

Thanks,
Daniel


Re: spark code style

2014-11-21 Thread Gerard Maas
I suppose that here function(x) = function3(function2(function1(x)))

In that case, the difference will be modularity and readability of your
program.
If function{1,2,3} are logically different steps and potentially reusable
somewhere else, I'd keep them separate.

A sequence of map transformations will be pipelined by Spark with little
overhead.

-kr, Gerard.

On Fri, Nov 21, 2014 at 10:20 AM, Kevin Jung  wrote:

> Hi all.
> Here are two code snippets.
> And they will produce the same result.
>
> 1.
> rdd.map( function )
>
> 2.
> rdd.map( function1 ).map( function2 ).map( function3 )
>
> What are the pros and cons of these two methods?
>
> Regards
> Kevin
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-code-style-tp19463.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to get applicationId for yarn mode(both yarn-client and yarn-cluster mode)

2014-11-21 Thread Earthson
Finally, I've found two ways:

1. search the output with something like "Submitted application
application_1416319392519_0115"
2. use specific AppName. We could query the ApplicationID(yarn)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-applicationId-for-yarn-mode-both-yarn-client-and-yarn-cluster-mode-tp19462p19466.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Another accumulator question

2014-11-21 Thread Sean Owen
This sounds more like a use case for reduce? or fold? it sounds like
you're kind of cobbling together the same function on accumulators,
when reduce/fold are simpler and have the behavior you suggest.

On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
 wrote:
> I think I understand what is going on here, but I was hoping someone could
> confirm (or explain reality if I don't) what I'm seeing.
>
> We are collecting data using a rather sizable accumulator - essentially, an
> array of tens of thousands of entries.  All told, about 1.3m of data.
>
> If I understand things correctly, it looks to me like, when our job is done,
> a copy of this array is retrieved from each individual task, all at once,
> for combination on the client - which means, with 400 tasks to the job, each
> collection is using up half a gig of memory on the client.
>
> Is this true?  If so, does anyone know a way to get accumulators to
> accumulate as results collect, rather than all at once at the end, so we
> only have to hold a few in memory at a time, rather than all 400?
>
> Thanks,
>   -Nathan
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to deal with BigInt in my case class for RDD => SchemaRDD convertion

2014-11-21 Thread Jianshi Huang
Hi,

I got an error during rdd.registerTempTable(...) saying scala.MatchError:
scala.BigInt

Looks like BigInt cannot be used in SchemaRDD, is that correct?

So what would you recommend to deal with it?

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Spark Streaming Metrics

2014-11-21 Thread Gerard Maas
Looks like metrics are not a hot topic to discuss - yet so important to
sleep well when jobs are running in production.

I've created Spark-4537 
to track this issue.

-kr, Gerard.

On Thu, Nov 20, 2014 at 9:25 PM, Gerard Maas  wrote:

> As the Spark Streaming tuning guide indicates, the key indicators of a
> healthy streaming job are:
> - Processing Time
> - Total Delay
>
> The Spark UI page for the Streaming job [1] shows these two indicators but
> the metrics source for Spark Streaming (StreamingSource.scala)  [2] does
> not.
>
> Any reasons for that? I would like to monitor job performance through an
> external monitor (Ganglia in our case) and I've connected already the
> currently published metrics.
>
> -kr,  Gerard.
>
>
> [1]
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127
>
> [2]
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
>


Re: Why is ALS class serializable ?

2014-11-21 Thread Hao Ren
It makes sense.

Thx. =)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262p19472.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark: Simple local test failed depending on memory settings

2014-11-21 Thread rzykov
Dear all,

We encountered problems of failed jobs with huge amount of data.

A simple local test was prepared for this question at
https://gist.github.com/copy-of-rezo/6a137e13a1e4f841e7eb
It generates 2 sets of key-value pairs, join them, selects distinct values
and counts data finally.

object Spill {
  def generate = {
for{
  j <- 1 to 10
  i <- 1 to 200
} yield(j, i)
  }
 
  def main(args: Array[String]) {
val conf = new SparkConf().setAppName(getClass.getSimpleName)
conf.set("spark.shuffle.spill", "true")
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
println(generate)
 
val dataA = sc.parallelize(generate)
val dataB = sc.parallelize(generate)
val dst = dataA.join(dataB).distinct().count()
println(dst)
  }
}

We compiled it locally and run 3 times with different settings of memory:
1) *--executor-memory 10M --driver-memory 10M --num-executors 1
--executor-cores 1*
It fails wtih "java.lang.OutOfMemoryError: GC overhead limit exceeded" at 
.
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)

2) *--executor-memory 20M --driver-memory 20M --num-executors 1
--executor-cores 1*
It works OK

3)  *--executor-memory 10M --driver-memory 10M --num-executors 1
--executor-cores 1* But let's make less data for i from 200 to 100. It
reduces input data in 2 times and joined data in 4 times

  def generate = {
for{
  j <- 1 to 10
  i <- 1 to 100   // previous value was 200 
} yield(j, i)
  }
This code works OK. 

We don't understand why 10M is not enough for such simple operation with
32000 bytes of ints (2 * 10 * 200 * 2 * 4) approximately? 10M of RAM works
if we change the data volume in 2 times (2000 of records of (int, int)).  
Why spilling to disk doesn't cover this case? 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Simple-local-test-failed-depending-on-memory-settings-tp19473.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



EC2 cluster with SSD ebs

2014-11-21 Thread Hao Ren
Hi, 

Is it possible to launch spark ec2 cluster with SSD ebs ?

In the spark-ec2.py, we can only specify the ebs-size, ebs type is always
normal(Magnetic)

I am using Spark-1.1.0.

Thank you.

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EC2-cluster-with-SSD-ebs-tp19474.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cores on Master

2014-11-21 Thread Prannoy
Hi,

You can also set the cores in the spark application itself .

http://spark.apache.org/docs/1.0.1/spark-standalone.html

On Wed, Nov 19, 2014 at 6:11 AM, Pat Ferrel-2 [via Apache Spark User List] <
ml-node+s1001560n19238...@n3.nabble.com> wrote:

> OK hacking the start-slave.sh did it
>
> On Nov 18, 2014, at 4:12 PM, Pat Ferrel <[hidden email]
> > wrote:
>
> This seems to work only on a ‘worker’ not the master? So I’m back to
> having no way to control cores on the master?
>
> On Nov 18, 2014, at 3:24 PM, Pat Ferrel <[hidden email]
> > wrote:
>
> Looks like I can do this by not using start-all.sh but starting each
> worker separately passing in a '--cores n' to the master? No config/env
> way?
>
> On Nov 18, 2014, at 3:14 PM, Pat Ferrel <[hidden email]
> > wrote:
>
> I see the default and max cores settings but these seem to control total
> cores per cluster.
>
> My cobbled together home cluster needs the Master to not use all its cores
> or it may lock up (it does other things). Is there a way to control max
> cores used for a particular cluster machine in standalone mode?
> -
> To unsubscribe, e-mail: [hidden email]
> 
> For additional commands, e-mail: [hidden email]
> 
>
>
>
> -
> To unsubscribe, e-mail: [hidden email]
> 
> For additional commands, e-mail: [hidden email]
> 
>
>
>
> -
> To unsubscribe, e-mail: [hidden email]
> 
> For additional commands, e-mail: [hidden email]
> 
>
>
>
> -
> To unsubscribe, e-mail: [hidden email]
> 
> For additional commands, e-mail: [hidden email]
> 
>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Cores-on-Master-tp19230p19238.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cores-on-Master-tp19230p19475.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Slow performance in spark streaming

2014-11-21 Thread Prannoy
Hi,

Spark runs in local with a speed less than in cluster. Cluster machines
usually have a high configuration and also the tasks are distrubuted in
workers in order to get a faster result. So you will always find a
difference in speed when running in local and when running in cluster. Try
running the same in a cluster and evaluate the speed there.

Thanks

On Thu, Nov 20, 2014 at 6:52 PM, Blackeye [via Apache Spark User List] <
ml-node+s1001560n1937...@n3.nabble.com> wrote:

> I am using spark streaming 1.1.0 locally (not in a cluster). I created a
> simple app that parses the data (about 10.000 entries), stores it in a
> stream and then makes some transformations on it. Here is the code:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *def main(args : Array[String]){ val master = "local[8]" val conf
> = new SparkConf().setAppName("Tester").setMaster(master) val sc = new
> StreamingContext(conf, Milliseconds(11)) val stream =
> sc.receiverStream(new MyReceiver("localhost", )) val parsedStream =
> parse(stream) parsedStream.foreachRDD(rdd =>
> println(rdd.first()+"\nRULE STARTS "+System.currentTimeMillis())) val
> result1 = parsedStream.filter(entry =>
> entry.symbol.contains("walking")&& entry.symbol.contains("true") &&
> entry.symbol.contains("id0")).map(_.time) val result2 =
> parsedStream.filter(entry => entry.symbol == "disappear" &&
> entry.symbol.contains("id0")).map(_.time) val result3 = result1
>   .transformWith(result2, (rdd1, rdd2: RDD[Int]) =>
> rdd1.subtract(rdd2)) result3.foreachRDD(rdd =>
> println(rdd.first()+"\nRULE ENDS "+System.currentTimeMillis()))
>  sc.start()sc.awaitTermination() } def parse(stream: DStream[String]) =
> { stream.flatMap { line => val entries =
> line.split("assert").filter(entry => !entry.isEmpty) entries.map {
> tuple => val pattern =
> """\s*[(](.+)[,]\s*([0-9]+)+\s*[)]\s*[)]\s*[,|\.]\s*""".r tuple
> match {   case pattern(symbol, time) =>   new
> Data(symbol, time.toInt) }  } } } case class Data
> (symbol: String, time: Int)*
>
> I have a batch duration of 110.000 milliseconds in order to receive all
> the data in one batch. I believed that, even locally, the spark is very
> fast. In this case, it takes about 3.5sec to execute the rule (between
> "RULE STARTS" and "RULE ENDS"). Am I doing something wrong or this is the
> expected time? Any advise
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Slow-performance-in-spark-streaming-tp19371.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Slow-performance-in-spark-streaming-tp19371p19476.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Parsing a large XML file using Spark

2014-11-21 Thread Prannoy
Hi,

Parallel processing of xml files may be an issue due to the tags in the xml
file. The xml file has to be intact as while parsing it matches the start
and end entity and if its distributed in parts to workers possibly it may
or may not find start and end tags within the same worker which will give
an exception.

Thanks.

On Wed, Nov 19, 2014 at 6:26 AM, ssimanta [via Apache Spark User List] <
ml-node+s1001560n19239...@n3.nabble.com> wrote:

> If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump
> that all revision information also) that is stored in HDFS, is it possible
> to parse it in parallel/faster using Spark? Or do we have to use something
> like a PullParser or Iteratee?
>
> My current solution is to read the single XML file in the first pass -
> write it to HDFS and then read the small files in parallel on the Spark
> workers.
>
> Thanks
> -Soumya
>
>
>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239p19477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How can I read this avro file using spark & scala?

2014-11-21 Thread thomas j
Thanks for the pointer Michael.

I've downloaded spark 1.2.0 from
https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and
built the spark-avro repo you linked to.

When I run it against the example avro file linked to in the documentation
it works. However, when I try to load my avro file (linked to in my
original question) I receive the following error:

java.lang.RuntimeException: Unsupported type LONG
at scala.sys.package$.error(package.scala:27)
at com.databricks.spark.avro.AvroRelation.com
$databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
at
com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
at
com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
...

If this is useful I'm happy to try loading the various different avro files
I have to try to battle-test spark-avro.

Thanks

On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust 
wrote:

> One option (starting with Spark 1.2, which is currently in preview) is to
> use the Avro library for Spark SQL.  This is very new, but we would love to
> get feedback: https://github.com/databricks/spark-avro
>
> On Thu, Nov 20, 2014 at 10:19 AM, al b  wrote:
>
>> I've read several posts of people struggling to read avro in spark. The
>> examples I've tried don't work. When I try this solution (
>> https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files)
>> I get errors:
>>
>> spark java.io.NotSerializableException:
>> org.apache.avro.mapred.AvroWrapper
>>
>> How can I read the following sample file in spark using scala?
>>
>> http://www.4shared.com/file/SxnYcdgJce/sample.html
>>
>> Thomas
>>
>
>


Re: How can I read this avro file using spark & scala?

2014-11-21 Thread thomas j
I've been able to load a different avro file based on GenericRecord with:

val person = sqlContext.avroFile("/tmp/person.avro")

When I try to call `first()` on it, I get "NotSerializableException"
exceptions again:

person.first()

...
14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
20)
java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
...

Apart from this I want to transform the records into pairs of (user_id,
record). I can do this by specifying the offset of the user_id column with
something like this:

person.map(r => (r.getInt(2), r)).take(4).collect()

Is there any way to be able to specify the column name ("user_id") instead
of needing to know/calculate the offset somehow?

Thanks again


On Fri, Nov 21, 2014 at 11:48 AM, thomas j  wrote:

> Thanks for the pointer Michael.
>
> I've downloaded spark 1.2.0 from
> https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and
> built the spark-avro repo you linked to.
>
> When I run it against the example avro file linked to in the documentation
> it works. However, when I try to load my avro file (linked to in my
> original question) I receive the following error:
>
> java.lang.RuntimeException: Unsupported type LONG
> at scala.sys.package$.error(package.scala:27)
> at com.databricks.spark.avro.AvroRelation.com
> $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
> at
> com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
> at
> com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> ...
>
> If this is useful I'm happy to try loading the various different avro
> files I have to try to battle-test spark-avro.
>
> Thanks
>
> On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust 
> wrote:
>
>> One option (starting with Spark 1.2, which is currently in preview) is to
>> use the Avro library for Spark SQL.  This is very new, but we would love to
>> get feedback: https://github.com/databricks/spark-avro
>>
>> On Thu, Nov 20, 2014 at 10:19 AM, al b  wrote:
>>
>>> I've read several posts of people struggling to read avro in spark. The
>>> examples I've tried don't work. When I try this solution (
>>> https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files)
>>> I get errors:
>>>
>>> spark java.io.NotSerializableException:
>>> org.apache.avro.mapred.AvroWrapper
>>>
>>> How can I read the following sample file in spark using scala?
>>>
>>> http://www.4shared.com/file/SxnYcdgJce/sample.html
>>>
>>> Thomas
>>>
>>
>>
>


RE: tableau spark sql cassandra

2014-11-21 Thread jererc
Hi!

Sure, I'll post the info I grabbed once the cassandra tables values appear
in Tableau.

Best,
Jerome



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282p19480.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Setup Remote HDFS for Spark

2014-11-21 Thread EH
Hi,

Are there any way that I can setup a remote HDFS for Spark (more specific,
for Spark Streaming checkpoints)?  The reason I'm asking is that our Spark
and HDFS do not run on the same machines.  I've been looked around but still
no clue so far.

Thanks,
EH



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Execute Spark programs from local machine on Yarn-hadoop cluster

2014-11-21 Thread Naveen Kumar Pokala
Hi,

I am executing my spark jobs on yarn cluster by forming conf object in the 
following way.

SparkConf conf = new SparkConf().setAppName("NewJob").setMaster("yarn-cluster");

Now I want to execute spark jobs from my local machine how to do that.

What I mean is there a way to give IP address, port all the details to connect 
a master(YARN) on some other network from my local spark Program.

-Naveen


Re: Setup Remote HDFS for Spark

2014-11-21 Thread Akhil Das
Having them on the same network will give you better performance. What
problems are you facing? Can you elaborate more about the versions? (Spark
and hadoop), Only problem that could happen is the OS's OOM Killer might be
killing your HDFS processes due to low  memory and such. But you can check
the logs to get more clear picture about whats happening.

Thanks
Best Regards

On Fri, Nov 21, 2014 at 7:58 PM, EH  wrote:

> Hi,
>
> Are there any way that I can setup a remote HDFS for Spark (more specific,
> for Spark Streaming checkpoints)?  The reason I'm asking is that our Spark
> and HDFS do not run on the same machines.  I've been looked around but
> still
> no clue so far.
>
> Thanks,
> EH
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Execute Spark programs from local machine on Yarn-hadoop cluster

2014-11-21 Thread Prannoy
Hi naveen,

I dont think this is possible. If you are setting the master with your
cluster details you cannot execute any job from your local machine. You
have to execute the jobs inside your yarn machine so that sparkconf is able
to connect with all the provided details.

If this is not the case such give a detail explaintation of what exactly
you are trying to do :)

Thanks.

On Fri, Nov 21, 2014 at 8:11 PM, Naveen Kumar Pokala [via Apache Spark User
List]  wrote:

> Hi,
>
>
>
> I am executing my spark jobs on yarn cluster by forming conf object in the
> following way.
>
>
>
> SparkConf conf = *new* SparkConf().setAppName("NewJob").setMaster(
> "yarn-cluster");
>
>
>
> Now I want to execute spark jobs from my local machine how to do that.
>
>
>
> What I mean is there a way to give IP address, port all the details to
> connect a master(YARN) on some other network from my local spark Program.
>
>
>
> -Naveen
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Execute-Spark-programs-from-local-machine-on-Yarn-hadoop-cluster-tp19482.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Execute-Spark-programs-from-local-machine-on-Yarn-hadoop-cluster-tp19482p19484.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Setup Remote HDFS for Spark

2014-11-21 Thread EH
Unfortunately whether it is possible to have both Spark and HDFS running on
the same machine is not under our control.  :(  Right now we have Spark and
HDFS running in different machines.  In this case, is it still possible to
hook up a remote HDFS with Spark so that we can use Spark Streaming
checkpoints?  Thank you for your help.

Best,
EH



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Remote-HDFS-for-Spark-tp19481p19485.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD data checkpoint cleaning

2014-11-21 Thread Luis Ángel Vicente Sánchez
I have seen the same behaviour while testing the latest spark 1.2.0
snapshot.

I'm trying the ReliableKafkaReceiver and it works quite well but the
checkpoints folder is always increasing in size. The receivedMetaData
folder remains almost constant in size but the receivedData folder is
always increasing in size even if I set spark.cleaner.ttl to 300 seconds.

Regards,

Luis

2014-09-23 22:47 GMT+01:00 RodrigoB :

> Just a follow-up.
>
> Just to make sure about the RDDs not being cleaned up, I just replayed the
> app both on the windows remote laptop and then on the linux machine and at
> the same time was observing the RDD folders in HDFS.
>
> Confirming the observed behavior: running on the laptop I could see the
> RDDs
> continuously increasing. When I ran on linux, only two RDD folders were
> there and continuously being recycled.
>
> Metadata checkpoints were being cleaned on both scenarios.
>
> tnks,
> Rod
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14939.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to deal with BigInt in my case class for RDD => SchemaRDD convertion

2014-11-21 Thread Yin Huai
Hello Jianshi,

The reason of that error is that we do not have a Spark SQL data type for
Scala BigInt. You can use Decimal for your case.

Thanks,

Yin

On Fri, Nov 21, 2014 at 5:11 AM, Jianshi Huang 
wrote:

> Hi,
>
> I got an error during rdd.registerTempTable(...) saying scala.MatchError:
> scala.BigInt
>
> Looks like BigInt cannot be used in SchemaRDD, is that correct?
>
> So what would you recommend to deal with it?
>
> Thanks,
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>


Re: Spark Streaming Metrics

2014-11-21 Thread andy petrella
Yo,

I've discussed with some guyz from cloudera that are working (only oO) on
spark-core and streaming.
The streaming was telling me the same thing about the scheduling part.

Do you have some nice screenshots and info about stages running, task time,
akka health and things like these -- I said the guy that I might poke him
today with more materials.

Btw, how're you?

Tchuss man
andy

PS: did you tried the recent events thingy?


On Fri Nov 21 2014 at 11:17:17 AM Gerard Maas  wrote:

> Looks like metrics are not a hot topic to discuss - yet so important to
> sleep well when jobs are running in production.
>
> I've created Spark-4537 
> to track this issue.
>
> -kr, Gerard.
>
> On Thu, Nov 20, 2014 at 9:25 PM, Gerard Maas 
> wrote:
>
>> As the Spark Streaming tuning guide indicates, the key indicators of a
>> healthy streaming job are:
>> - Processing Time
>> - Total Delay
>>
>> The Spark UI page for the Streaming job [1] shows these two indicators
>> but the metrics source for Spark Streaming (StreamingSource.scala)  [2]
>> does not.
>>
>> Any reasons for that? I would like to monitor job performance through an
>> external monitor (Ganglia in our case) and I've connected already the
>> currently published metrics.
>>
>> -kr,  Gerard.
>>
>>
>> [1]
>> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127
>>
>> [2]
>> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
>>
>
>


Error: Unrecognized option '--conf' (trying to set auto.offset.reset)

2014-11-21 Thread YaoPau
I'm trying to configure my Spark Streaming + Kafka job so it always pulls
real-time data.  I think setting auto.offset.reset="largest" will do it, but
when I try to set that configuration at runtime I get an error:

*spark-submit --class com.autotrader.scalaspark.sbStreaming --master
yarn-client --driver-memory 10g --executor-memory 10g --conf
auto.offset.reset="largest"
sbStreaming-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Error: Unrecognized option '--conf'.*

How can I configure my job to pull the latest data?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-Unrecognized-option-conf-trying-to-set-auto-offset-reset-tp19489.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Lots of small input files

2014-11-21 Thread Pat Ferrel
I have a job that searches for input recursively and creates a string of 
pathnames to treat as one input. 

The files are part-x files and they are fairly small. The job seems to take 
a long time to complete considering the size of the total data (150m) and only 
runs on the master machine. The job only does rdd.map type operations.

1) Why doesn’t it use the other workers in the cluster?
2) Is there a downside to using a lot of small part files? Should I coalesce 
them into one input file?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How can I read this avro file using spark & scala?

2014-11-21 Thread Simone Franzini
I have also been struggling with reading avro. Very glad to hear that there
is a new avro library coming in Spark 1.2 (which by the way, seems to have
a lot of other very useful improvements).

In the meanwhile, I have been able to piece together several snippets/tips
that I found from various sources and I am now able to read/write avro
correctly. From my understanding, you basically need 3 pieces:
1. Use the kryo serializer.
2. Register your avro classes. I have done this using twitter chill 0.4.0.
3. Read/write avro with a snippet of code like the one you posted.

Here is relevant code (hopefully all of it).

// All of the following are needed in order to read/write AVRO files
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.{ FileSystem, Path }
// Uncomment the following line if you want to use generic AVRO, I am using
specific
//import org.apache.avro.generic.GenericData
import org.apache.avro.Schema
import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat,
AvroKeyOutputFormat }
import org.apache.avro.mapred.AvroKey
// Kryo/avro serialization stuff
import com.esotericsoftware.kryo.Kryo
import com.twitter.chill.avro.AvroSerializer
import org.apache.spark.serializer.{ KryoSerializer, KryoRegistrator }

object MyApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "MyRegistrator")

}

// Read
val readJob = new Job()
AvroJob.setInputKeySchema(readJob, schema)
sc.newAPIHadoopFile(inputPath,
classOf[AvroKeyInputFormat[MyAvroClass]],
classOf[AvroKey[MyAvroClass]],
classOf[NullWritable],
readJob.getConfiguration)
.map { t => t._1.datum }

// Write
val rddAvroWritable = rdd.map { s => (new AvroKey(s), NullWritable.get) }
val writeJob = new Job()
AvroJob.setOutputKeySchema(writeJob, schema)

writeJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[MyAvroClass]])
rddAvroWritable.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[MyAvroClass]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[MyAvroClass]],
writeJob.getConfiguration)

}
}


class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
// Put a line like the following for each of your Avro classes if you use
specific Avro
// If you use generic Avro, chill also has a function for that:
GenericRecordSerializer
kryo.register(classOf[MyAvroClass],
AvroSerializer.SpecificRecordBinarySerializer[MyAvroClass])
}
}

Simone Franzini, PhD

http://www.linkedin.com/in/simonefranzini

On Fri, Nov 21, 2014 at 7:04 AM, thomas j  wrote:

> I've been able to load a different avro file based on GenericRecord with:
>
> val person = sqlContext.avroFile("/tmp/person.avro")
>
> When I try to call `first()` on it, I get "NotSerializableException"
> exceptions again:
>
> person.first()
>
> ...
> 14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 20)
> java.io.NotSerializableException:
> org.apache.avro.generic.GenericData$Record
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ...
>
> Apart from this I want to transform the records into pairs of (user_id,
> record). I can do this by specifying the offset of the user_id column with
> something like this:
>
> person.map(r => (r.getInt(2), r)).take(4).collect()
>
> Is there any way to be able to specify the column name ("user_id") instead
> of needing to know/calculate the offset somehow?
>
> Thanks again
>
>
> On Fri, Nov 21, 2014 at 11:48 AM, thomas j 
> wrote:
>
>> Thanks for the pointer Michael.
>>
>> I've downloaded spark 1.2.0 from
>> https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and
>> built the spark-avro repo you linked to.
>>
>> When I run it against the example avro file linked to in the
>> documentation it works. However, when I try to load my avro file (linked to
>> in my original question) I receive the following error:
>>
>> java.lang.RuntimeException: Unsupported type LONG
>> at scala.sys.package$.error(package.scala:27)
>> at com.databricks.spark.avro.AvroRelation.com
>> $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116)
>> at
>> com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97)
>> at
>> com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96)
>> at
>> scala.co

Re: driver memory

2014-11-21 Thread Gen
Hi,

I am sorry for distributing you and thank you for your explication.
However, I find "spark.driver.memory" is used also for standalone.(I set
this in spark/conf/spark-defaults.conf).

Cheers
Gen


Andrew Or-2 wrote
> Hi Maria,
> 
> SPARK_MEM is actually a deprecated because it was too general; the reason
> it worked was because SPARK_MEM applies to everything (drivers, executors,
> masters, workers, history servers...). In favor of more specific configs,
> we broke this down into SPARK_DRIVER_MEMORY and SPARK_EXECUTOR_MEMORY and
> other environment variables and configs. Note that while
> "spark.executor.memory" is an equivalent config, "spark.driver.memory" is
> only used for YARN.
> 
> If you are using Spark 1.0+, the recommended way of specifying driver
> memory is through the "--driver-memory" command line argument of
> spark-submit. The equivalent also holds for executor memory (i.e.
> "--executor-memory").  That way you don't have to wrangle with the
> millions
> of overlapping configs / environment variables for all the deploy modes.
> 
> -Andrew
> 
> 
> 2014-07-23 4:18 GMT-07:00 mrm <

> maria@

> >:
> 
>> Hi,
>>
>> I figured out my problem so I wanted to share my findings. I was
>> basically
>> trying to broadcast an array with 4 million elements, and a size of
>> approximatively 150 MB. Every time I was trying to broadcast, I got an
>> OutOfMemory error. I fixed my problem by increasing the driver memory
>> using:
>> export SPARK_MEM="2g"
>>
>> Using SPARK_DAEMON_MEM or spark.executor.memory did not help in this
>> case!
>> I
>> don't have a good understanding of all these settings and I have the
>> feeling
>> many people are in the same situation.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/driver-memory-tp10486p10489.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/driver-memory-tp10486p19493.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Nightly releases

2014-11-21 Thread Arun Ahuja
Great - what can we do to make this happen?  So should I file a JIRA to
track?

Thanks,

Arun

On Tue, Nov 18, 2014 at 11:46 AM, Andrew Ash  wrote:

> I can see this being valuable for users wanting to live on the cutting
> edge without building CI infrastructure themselves, myself included.  I
> think Patrick's recent work on the build scripts for 1.2.0 will make
> delivering nightly builds to a public maven repo easier.
>
> On Tue, Nov 18, 2014 at 10:22 AM, Arun Ahuja  wrote:
>
>> Of course we can run this as well to get the lastest, but the build is
>> fairly long and this seems like a resource many would need.
>>
>> On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja  wrote:
>>
>>> Are nightly releases posted anywhere?  There are quite a few vital
>>> bugfixes and performance improvements being commited to Spark and using the
>>> latest commits is useful (or even necessary for some jobs).
>>>
>>> Is there a place to post them, it doesn't seem like it would diffcult to
>>> run make-dist nightly and place it somwhere?
>>>
>>> Is is possible extract this from Jenkins bulds?
>>>
>>> Thanks,
>>> Arun
>>>  ​
>>>
>>
>>
>


Re: Another accumulator question

2014-11-21 Thread Nathan Kronenfeld
We've done this with reduce - that definitely works.

I've reworked the logic to use accumulators because, when it works, it's
5-10x faster

On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen  wrote:

> This sounds more like a use case for reduce? or fold? it sounds like
> you're kind of cobbling together the same function on accumulators,
> when reduce/fold are simpler and have the behavior you suggest.
>
> On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
>  wrote:
> > I think I understand what is going on here, but I was hoping someone
> could
> > confirm (or explain reality if I don't) what I'm seeing.
> >
> > We are collecting data using a rather sizable accumulator - essentially,
> an
> > array of tens of thousands of entries.  All told, about 1.3m of data.
> >
> > If I understand things correctly, it looks to me like, when our job is
> done,
> > a copy of this array is retrieved from each individual task, all at once,
> > for combination on the client - which means, with 400 tasks to the job,
> each
> > collection is using up half a gig of memory on the client.
> >
> > Is this true?  If so, does anyone know a way to get accumulators to
> > accumulate as results collect, rather than all at once at the end, so we
> > only have to hold a few in memory at a time, rather than all 400?
> >
> > Thanks,
> >   -Nathan
> >
> >
> > --
> > Nathan Kronenfeld
> > Senior Visualization Developer
> > Oculus Info Inc
> > 2 Berkeley Street, Suite 600,
> > Toronto, Ontario M5A 4J5
> > Phone:  +1-416-203-3003 x 238
> > Email:  nkronenf...@oculusinfo.com
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


JVM Memory Woes

2014-11-21 Thread pthai
Hello!

I am debugging a reduceByKey job in Spark v0.9.1
The main issue is that an executor dies after the reduceByKey job succeeds.
This is detected by Spark, which then starts re-running all the tasks for
that stage.

I've been working at this for over a day now, so any help would be
fantastic!

*Let me provide some details about my job & cluster settings:*
8 workers each with 32 cores & 60GB ram
SPARK_MEM=48g
SPARK_WORKER_MEM=48g
-Dspark.shuffle.memoryFraction=0.2
-Dspark.storage.memoryFraction=0.2
-Dspark.default.parallelism=1024

*I can't figure out why the executor dies. Here is the stdout from that
executor that died:
*
OpenJDK 64-Bit Server VM warning: INFO:
os::commit_memory(0x7fbb3928, 42991616, 0) failed; error='Cannot
allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 42991616 bytes for
committing reserved memory.
# An error report file with more information is saved as:
# /tmp/spark/work/app-20141121172318-0005/1/hs_err_pid25755.log


*Here're the significant cuts of the output from the driver when the
executor dies after reduceByKey finishes:
*
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Finished TID 4482 in 21681
ms on ip-10-144-235-194.ec2.internal (progress: 4608/4608)
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3,
4331)
14/11/21 06:47:22 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0,
whose tasks have all completed, from pool 
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
routines.scala:278) finished in 185.384 s
14/11/21 06:47:22 INFO scheduler.DAGScheduler: looking for newly runnable
stages
14/11/21 06:47:22 INFO scheduler.DAGScheduler: running: Set()
14/11/21 06:47:22 INFO scheduler.DAGScheduler: waiting: Set(Stage 2)
14/11/21 06:47:22 INFO scheduler.DAGScheduler: failed: Set()
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Missing parents for Stage 2:
List()
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Submitting Stage 2
(FilteredRDD[18] at filter at routines.scala:279), which is now runnable
14/11/21 06:47:22 INFO scheduler.DAGScheduler: Submitting 1024 missing tasks
from Stage 2 (FilteredRDD[18] at filter at routines.scala:279)
14/11/21 06:47:22 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with
1024 tasks
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID
4756 on executor 2: ip-10-113-180-17.ec2.internal (PROCESS_LOCAL)
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as
2194 bytes in 0 ms
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Starting task 2.0:1 as TID
4757 on executor 5: ip-10-5-150-185.ec2.internal (PROCESS_LOCAL)
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Serialized task 2.0:1 as
2194 bytes in 0 ms
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Starting task 2.0:2 as TID
4758 on executor 1: ip-10-37-170-43.ec2.internal (PROCESS_LOCAL)
[ truncate ]
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Serialized task 2.0:222 as
2194 bytes in 0 ms
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Starting task 2.0:223 as
TID 4979 on executor 3: ip-10-101-204-143.ec2.internal (PROCESS_LOCAL)
14/11/21 06:47:22 INFO scheduler.TaskSetManager: Serialized task 2.0:223 as
2194 bytes in 0 ms
14/11/21 06:47:22 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-7-149-203.ec2.internal:54264
14/11/21 06:47:23 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 0 is 1585800 bytes
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-101-204-143.ec2.internal:36102
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-5-150-185.ec2.internal:50972
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-37-170-43.ec2.internal:36199
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-11-163-49.ec2.internal:51003
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-144-235-194.ec2.internal:40101
14/11/21 06:47:23 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@ip-10-113-180-17.ec2.internal:35881
14/11/21 06:47:27 INFO cluster.SparkDeploySchedulerBackend: Executor 5
disconnected, so removing it
14/11/21 06:47:27 INFO client.AppClient$ClientActor: Executor updated:
app-20141121064325-0001/5 is now FAILED (Command exited with code 1)
14/11/21 06:47:27 ERROR scheduler.TaskSchedulerImpl: Lost executor 5 on
ip-10-5-150-185.ec2.internal: remote Akka client disassociated
14/11/21 06:47:27 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20141121064325-0001/5 removed: Command exited with code 1
14/

Many retries for Python job

2014-11-21 Thread Brett Meyer
I¹m running a Python script with spark-submit on top of YARN on an EMR
cluster with 30 nodes.  The script reads in approximately 3.9 TB of data
from S3, and then does some transformations and filtering, followed by some
aggregate counts.  During Stage 2 of the job, everything looks to complete
just fine with no executor failures or resubmissions, but when Stage 3
starts up, many Stage 2 tasks have to be rerun due to FetchFailure errors.
Actually, I usually see at least 3-4 retries on Stage 2 before Stage 3 can
successfully start.  The whole application eventually completes, but there
is an addition of about 1+ hour overhead for all of the retries.

I¹m trying to determine why there were FetchFailure exceptions, since
anything computed in the job that could not fit in the available memory
cache should be by default spilled to disk for further retrieval.  I also
see some "java.net.ConnectException: Connection refused² and
"java.io.IOException: sendMessageReliably failed without being ACK¹d" errors
in the logs after a CancelledKeyException followed by a
ClosedChannelException, but I have no idea why the nodes in the EMR cluster
would suddenly stop being able to communicate.

If anyone has ideas as to why the data needs to be rerun several times in
this job, please let me know as I am fairly bewildered about this behavior.




smime.p7s
Description: S/MIME cryptographic signature


Re: Nightly releases

2014-11-21 Thread Andrew Ash
Yes you should file a Jira and echo it out here so others can follow and
comment on it.  Thanks Arun!

On Fri, Nov 21, 2014 at 12:02 PM, Arun Ahuja  wrote:

> Great - what can we do to make this happen?  So should I file a JIRA to
> track?
>
> Thanks,
>
> Arun
>
> On Tue, Nov 18, 2014 at 11:46 AM, Andrew Ash  wrote:
>
>> I can see this being valuable for users wanting to live on the cutting
>> edge without building CI infrastructure themselves, myself included.  I
>> think Patrick's recent work on the build scripts for 1.2.0 will make
>> delivering nightly builds to a public maven repo easier.
>>
>> On Tue, Nov 18, 2014 at 10:22 AM, Arun Ahuja  wrote:
>>
>>> Of course we can run this as well to get the lastest, but the build is
>>> fairly long and this seems like a resource many would need.
>>>
>>> On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja  wrote:
>>>
 Are nightly releases posted anywhere?  There are quite a few vital
 bugfixes and performance improvements being commited to Spark and using the
 latest commits is useful (or even necessary for some jobs).

 Is there a place to post them, it doesn't seem like it would diffcult
 to run make-dist nightly and place it somwhere?

 Is is possible extract this from Jenkins bulds?

 Thanks,
 Arun
  ​

>>>
>>>
>>
>


SparkSQL - can we add new column(s) to parquet files

2014-11-21 Thread Sadhan Sood
We create the table definition by reading the parquet file for schema and
store it in hive metastore. But if someone adds a new column to the schema,
and if we rescan the schema from the new parquet files and update the table
definition, would it still work if we run queries on the table ?

So, old table has -> Int a, Int b
new table -> Int a, Int b, String c

but older parquet files don't have String c, so on querying the table would
it return me null for column c  from older files and data from newer files
or fail?


Re: Nightly releases

2014-11-21 Thread Arun Ahuja
Great - posted here https://issues.apache.org/jira/browse/SPARK-4542

On Fri, Nov 21, 2014 at 1:03 PM, Andrew Ash  wrote:

> Yes you should file a Jira and echo it out here so others can follow and
> comment on it.  Thanks Arun!
>
> On Fri, Nov 21, 2014 at 12:02 PM, Arun Ahuja  wrote:
>
>> Great - what can we do to make this happen?  So should I file a JIRA to
>> track?
>>
>> Thanks,
>>
>> Arun
>>
>> On Tue, Nov 18, 2014 at 11:46 AM, Andrew Ash 
>> wrote:
>>
>>> I can see this being valuable for users wanting to live on the cutting
>>> edge without building CI infrastructure themselves, myself included.  I
>>> think Patrick's recent work on the build scripts for 1.2.0 will make
>>> delivering nightly builds to a public maven repo easier.
>>>
>>> On Tue, Nov 18, 2014 at 10:22 AM, Arun Ahuja  wrote:
>>>
 Of course we can run this as well to get the lastest, but the build is
 fairly long and this seems like a resource many would need.

 On Tue, Nov 18, 2014 at 10:21 AM, Arun Ahuja 
 wrote:

> Are nightly releases posted anywhere?  There are quite a few vital
> bugfixes and performance improvements being commited to Spark and using 
> the
> latest commits is useful (or even necessary for some jobs).
>
> Is there a place to post them, it doesn't seem like it would diffcult
> to run make-dist nightly and place it somwhere?
>
> Is is possible extract this from Jenkins bulds?
>
> Thanks,
> Arun
>  ​
>


>>>
>>
>


SparkSQL Timestamp query failure

2014-11-21 Thread whitebread
Hi all,

I put some log files into sql tables through Spark and my schema looks like
this:

 |-- timestamp: timestamp (nullable = true)
 |-- c_ip: string (nullable = true)
 |-- cs_username: string (nullable = true)
 |-- s_ip: string (nullable = true)
 |-- s_port: string (nullable = true)
 |-- cs_method: string (nullable = true)
 |-- cs_uri_stem: string (nullable = true)
 |-- cs_query: string (nullable = true)
 |-- sc_status: integer (nullable = false)
 |-- sc_bytes: integer (nullable = false)
 |-- cs_bytes: integer (nullable = false)
 |-- time_taken: integer (nullable = false)
 |-- User_Agent: string (nullable = true)
 |-- Referrer: string (nullable = true)

As you can notice I created a timestamp field which I read is supported by
Spark (Date wouldn't work as far as I understood). I would love to use for
queries like "where timestamp>(2012-10-08 16:10:36.0)" but when I run it I
keep getting errors.
I tried these 2 following sintax forms:
For the second one I parse a string so Im sure Im actually pass it in a
timestamp format.
I use 2 functions: /parse/ and  /date2timestamp/.

*Any hint on how I should handle timestamp values?* 

Thanks,

Alessandro

1)
scala> sqlContext.sql("SELECT * FROM Logs as l where l.timestamp=(2012-10-08
16:10:36.0)").collect
java.lang.RuntimeException: [1.55] failure: ``)'' expected but 16 found

SELECT * FROM Logs as l where l.timestamp=(2012-10-08 16:10:36.0)
  ^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:21)
at $iwC$$iwC$$iwC$$iwC.(:26)
at $iwC$$iwC$$iwC.(:28)
at $iwC$$iwC.(:30)
at $iwC.(:32)
at (:34)
at .(:38)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

2)
sqlContext.sql("SELECT * FROM Logs as l where
l.timestamp="+date2timestamp(formatTime3.parse("2012-10-08
16:10:36.0"))).collect
java.lang.RuntimeException: [1.54] failure: ``UNION'' expected but 16 found

SELECT * FROM Logs as l where l.timestamp=2012-10-08 16:10:36.0
 ^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260)
at $iwC$$

Re: Many retries for Python job

2014-11-21 Thread Sandy Ryza
Hi Brett,

Are you noticing executors dying?  Are you able to check the YARN
NodeManager logs and see whether YARN is killing them for exceeding memory
limits?

-Sandy

On Fri, Nov 21, 2014 at 9:47 AM, Brett Meyer 
wrote:

> I’m running a Python script with spark-submit on top of YARN on an EMR
> cluster with 30 nodes.  The script reads in approximately 3.9 TB of data
> from S3, and then does some transformations and filtering, followed by some
> aggregate counts.  During Stage 2 of the job, everything looks to complete
> just fine with no executor failures or resubmissions, but when Stage 3
> starts up, many Stage 2 tasks have to be rerun due to FetchFailure errors.
> Actually, I usually see at least 3-4 retries on Stage 2 before Stage 3 can
> successfully start.  The whole application eventually completes, but there
> is an addition of about 1+ hour overhead for all of the retries.
>
> I’m trying to determine why there were FetchFailure exceptions, since
> anything computed in the job that could not fit in the available memory
> cache should be by default spilled to disk for further retrieval.  I also
> see some "java.net.ConnectException: Connection refused” and
> "java.io.IOException: sendMessageReliably failed without being ACK’d"
> errors in the logs after a CancelledKeyException followed by
> a ClosedChannelException, but I have no idea why the nodes in the EMR
> cluster would suddenly stop being able to communicate.
>
> If anyone has ideas as to why the data needs to be rerun several times in
> this job, please let me know as I am fairly bewildered about this behavior.
>


Extracting values from a Collecion

2014-11-21 Thread Sanjay Subramanian
hey guys
names.txt= 1,paul2,john3,george4,ringo 

songs.txt= 1,Yesterday2,Julia3,While My Guitar Gently Weeps4,With a 
Little Help From My Friends1,Michelle2,Nowhere Man3,Norwegian Wood4,Octopus's 
Garden
What I want to do is real simple 
Desired Output ==(4,(With a Little Help From My Friends, Octopus's 
Garden))(2,(Julia, Nowhere Man))(3,(While My Guitar Gently Weeps, Norwegian 
Wood))(1,(Yesterday, Michelle))

My Code===val file1Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2RddGrp = 
file2Rdd.groupByKey()file2Rdd.groupByKey().mapValues(names => 
names.toSet).collect().foreach(println)

Result===(4,Set(With a Little Help From My Friends, Octopus's 
Garden))(2,Set(Julia, Nowhere Man))(3,Set(While My Guitar Gently Weeps, 
Norwegian Wood))(1,Set(Yesterday, Michelle))

How can I extract values from the Set ?
Thanks
sanjay


Re: Parsing a large XML file using Spark

2014-11-21 Thread Paul Brown
Unfortunately, unless you impose restrictions on the XML file (e.g., where
namespaces are declared, whether entity replacement is used, etc.), you
really can't parse only a piece of it even if you have start/end elements
grouped together.  If you want to deal effectively (and scalably) with
large XML files consisting of many records, the right thing to do is to
write them as one XML document per line just like the one JSON document per
line, at which point the data can be split effectively.  Something like
Woodstox and a little custom code should make an effective pre-processor.

Once you have the line-delimited XML, you can shred it however you want:
 JAXB, Jackson XML, etc.

—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/

On Fri, Nov 21, 2014 at 3:38 AM, Prannoy 
wrote:

> Hi,
>
> Parallel processing of xml files may be an issue due to the tags in the
> xml file. The xml file has to be intact as while parsing it matches the
> start and end entity and if its distributed in parts to workers possibly it
> may or may not find start and end tags within the same worker which will
> give an exception.
>
> Thanks.
>
> On Wed, Nov 19, 2014 at 6:26 AM, ssimanta [via Apache Spark User List] 
> <[hidden
> email] > wrote:
>
>> If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump
>> that all revision information also) that is stored in HDFS, is it possible
>> to parse it in parallel/faster using Spark? Or do we have to use something
>> like a PullParser or Iteratee?
>>
>> My current solution is to read the single XML file in the first pass -
>> write it to HDFS and then read the small files in parallel on the Spark
>> workers.
>>
>> Thanks
>> -Soumya
>>
>>
>>
>>
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239.html
>>  To start a new topic under Apache Spark User List, email [hidden email]
>> 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> 
>>
>
>
> --
> View this message in context: Re: Parsing a large XML file using Spark
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Determine number of running executors

2014-11-21 Thread Sandy Ryza
Hi Tobias,

One way to find out the number of executors is through
SparkContext#getExecutorMemoryStatus.  You can find out the number of by
asking the SparkConf for the "spark.executor.cores" property, which, if not
set, means 1 for YARN.

-Sandy


On Fri, Nov 21, 2014 at 1:30 AM, Yanbo Liang  wrote:

> You can get parameter such as spark.executor.memory, but you can not get
> executor or core numbers.
> Because executor and core are parameters of spark deploy environment not
> spark context.
>
> val conf = new SparkConf().set("spark.executor.memory","2G")
> val sc = new SparkContext(conf)
>
> sc.getConf.get("spark.executor.memory")
> conf.get("spark.executor.memory")
>
> 2014-11-21 15:35 GMT+08:00 Tobias Pfeiffer :
>
>> Hi,
>>
>> when running on YARN, is there a way for the Spark driver to know how
>> many executors, cores per executor etc. there are? I want to know this so I
>> can repartition to a good number.
>>
>> Thanks
>> Tobias
>>
>
>


Re: Parsing a large XML file using Spark

2014-11-21 Thread andy petrella
Actually, it's a real

On Tue Nov 18 2014 at 2:52:00 AM Tobias Pfeiffer  wrote:

> Hi,
>
> see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for
> one solution.
>
> One issue with those XML files is that they cannot be processed line by
> line in parallel; plus you inherently need shared/global state to parse XML
> or check for well-formedness, I think. (Same issue with multi-line JSON, by
> the way.)
>
> Tobias
>
>


Re: Extracting values from a Collecion

2014-11-21 Thread Sanjay Subramanian
I am sorry the last line in the code is 
file1Rdd.join(file2RddGrp.mapValues(names => 
names.toSet)).collect().foreach(println)
so 
My Code===val file1Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2RddGrp = 
file2Rdd.groupByKey()file1Rdd.join(file2RddGrp.mapValues(names => 
names.toSet)).collect().foreach(println)
Result===(4,(ringo,Set(With a Little Help From My Friends, Octopus's 
Garden)))(2,(john,Set(Julia, Nowhere Man)))(3,(george,Set(While My Guitar 
Gently Weeps, Norwegian Wood)))(1,(paul,Set(Yesterday, Michelle)))
Again the question is how do I extract values from the Set ?
thanks
sanjay  From: Sanjay Subramanian 
 To: Arun Ahuja ; Andrew Ash  
Cc: user  
 Sent: Friday, November 21, 2014 10:41 AM
 Subject: Extracting values from a Collecion
   
hey guys
names.txt= 1,paul2,john3,george4,ringo 

songs.txt= 1,Yesterday2,Julia3,While My Guitar Gently Weeps4,With a 
Little Help From My Friends1,Michelle2,Nowhere Man3,Norwegian Wood4,Octopus's 
Garden
What I want to do is real simple 
Desired Output ==(4,(With a Little Help From My Friends, Octopus's 
Garden))(2,(Julia, Nowhere Man))(3,(While My Guitar Gently Weeps, Norwegian 
Wood))(1,(Yesterday, Michelle))

My Code===val file1Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2Rdd = 
sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x => 
(x.split(",")(0), x.split(",")(1)))val file2RddGrp = 
file2Rdd.groupByKey()file2Rdd.groupByKey().mapValues(names => 
names.toSet).collect().foreach(println)

Result===(4,Set(With a Little Help From My Friends, Octopus's 
Garden))(2,Set(Julia, Nowhere Man))(3,Set(While My Guitar Gently Weeps, 
Norwegian Wood))(1,Set(Yesterday, Michelle))

How can I extract values from the Set ?


Thanks
sanjay


  

Re: Parsing a large XML file using Spark

2014-11-21 Thread andy petrella
(sorry about the previous spam... google inbox didn't allowed me to cancel
the miserable sent action :-/)

So what I was about to say: it's a real PAIN tin the ass to parse the
wikipedia articles in the dump due to this mulitline articles...

However, there is a way to manage that "quite" easily, although I found it
rather slow.

*1/ use XML reader*
Use the "org.apache.hadoop" % "hadoop-streaming" % "1.0.4"

*2/ configure the hadoop job*
import org.apache.hadoop.streaming.StreamXmlRecordReader
import org.apache.hadoop.mapred.JobConf
val jobConf = new JobConf()
jobConf.set("stream.recordreader.class",
"org.apache.hadoop.streaming.StreamXmlRecordReader")
jobConf.set("stream.recordreader.begin", "")
org.apache.hadoop.mapred.FileInputFormat.addInputPaths(jobConf,
s"hdfs://$master:9000/data.xml")

// Load documents (one per line).
val documents = sparkContext.hadoopRDD(jobConf,
classOf[org.apache.hadoop.streaming.StreamInputFormat],
classOf[org.apache.hadoop.io.Text],
classOf[org.apache.hadoop.io.Text])


*3/ use the result as XML doc*
import scala.xml.XML
val texts = documents.map(_._1.toString)
 .map{ s =>
   val xml = XML.loadString(s)
   val id = (xml \ "id").text.toDouble
   val title = (xml \ "title").text
   val text = (xml \ "revision" \
"text").text.replaceAll("\\W", " ")
   val tknzed = text.split("\\W").filter(_.size >
3).toList
   (id, title, tknzed )
 }

HTH
andy
On Tue Nov 18 2014 at 2:52:00 AM Tobias Pfeiffer  wrote:

> Hi,
>
> see https://www.mail-archive.com/dev@spark.apache.org/msg03520.html for
> one solution.
>
> One issue with those XML files is that they cannot be processed line by
> line in parallel; plus you inherently need shared/global state to parse XML
> or check for well-formedness, I think. (Same issue with multi-line JSON, by
> the way.)
>
> Tobias
>
>


Re: JVM Memory Woes

2014-11-21 Thread Peter Thai
Quick update: 
It is a filter job that creates the error above, not the reduceByKey

Why would a filter cause an out of memory? 

Here is my code

val inputgsup
="hdfs://"+sparkmasterip+"/user/sense/datasets/gsup/binary/30/2014/11/0[1-9]/part*";
val gsupfile =
sc.newAPIHadoopFile[BytesWritable,BytesWritable,SequenceFileAsBinaryInputFormat](inputgsup)
val gsup = gsupfile.map(x => (GsupHandler.DeserializeKey( x._1.getBytes
),GsupHandler.DeserializeValue( x._2.getBytes ))).map(x =>
(x._1._1,x._1._2,x._2._1, x._2._2))
val gsup_results_geod = gsup.flatMap(x=> doQueryGSUP(has_expo_criteria,
has_fence_criteria, timerange_start_expo, timerange_end_expo,
timerange_start_fence, timerange_end_fence, expo_pois, fence_pois,x))
val gsup_results_reduced =
gsup_results_geod.reduceByKey((a,b)=>((a._1.toShort | b._1.toShort).toByte,
a._2+b._2))

*val gsup_results = gsup_results_reduced.filter(x=>(criteria_filter.value
contains x._2._1.toInt))*



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JVM-Memory-Woes-tp19496p19510.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Many retries for Python job

2014-11-21 Thread Brett Meyer
According to the web UI I don¹t see any executors dying during Stage 2.  I
looked over the YARN logs and didn¹t see anything suspicious, but I may not
have been looking closely enough.  Stage 2 seems to complete just fine, it¹s
just when it enters Stage 3 that the results from the previous stage seem to
be missing in many cases and result in FetchFailure errors.  I should
probably also mention that I have the spark.storage.memoryFraction set to
0.2.

From:  Sandy Ryza 
Date:  Friday, November 21, 2014 at 1:41 PM
To:  Brett Meyer 
Cc:  "user@spark.apache.org" 
Subject:  Re: Many retries for Python job

Hi Brett, 

Are you noticing executors dying?  Are you able to check the YARN
NodeManager logs and see whether YARN is killing them for exceeding memory
limits?

-Sandy

On Fri, Nov 21, 2014 at 9:47 AM, Brett Meyer 
wrote:
> I¹m running a Python script with spark-submit on top of YARN on an EMR cluster
> with 30 nodes.  The script reads in approximately 3.9 TB of data from S3, and
> then does some transformations and filtering, followed by some aggregate
> counts.  During Stage 2 of the job, everything looks to complete just fine
> with no executor failures or resubmissions, but when Stage 3 starts up, many
> Stage 2 tasks have to be rerun due to FetchFailure errors.  Actually, I
> usually see at least 3-4 retries on Stage 2 before Stage 3 can successfully
> start.  The whole application eventually completes, but there is an addition
> of about 1+ hour overhead for all of the retries.
> 
> I¹m trying to determine why there were FetchFailure exceptions, since anything
> computed in the job that could not fit in the available memory cache should be
> by default spilled to disk for further retrieval.  I also see some
> "java.net.ConnectException: Connection refused² and "java.io.IOException:
> sendMessageReliably failed without being ACK¹d" errors in the logs after a
> CancelledKeyException followed by a ClosedChannelException, but I have no idea
> why the nodes in the EMR cluster would suddenly stop being able to
> communicate.
> 
> If anyone has ideas as to why the data needs to be rerun several times in this
> job, please let me know as I am fairly bewildered about this behavior.





smime.p7s
Description: S/MIME cryptographic signature


RE: tableau spark sql cassandra

2014-11-21 Thread Mohammed Guller
Thanks, Jerome.

BTW, have you tried the CalliopeServer2 from tuplejump? I was able to quickly 
connect from beeline/Squirrel to my Cassandra cluster using CalliopeServer2, 
which extends Spark SQL Thrift Server. It was very straight forward.

Next step is to connect from Tableau, but I can't find Tableau's Spark 
connector. Where did you download it from?

Mohammed

-Original Message-
From: jererc [mailto:jer...@gmail.com] 
Sent: Friday, November 21, 2014 5:27 AM
To: u...@spark.incubator.apache.org
Subject: RE: tableau spark sql cassandra

Hi!

Sure, I'll post the info I grabbed once the cassandra tables values appear in 
Tableau.

Best,
Jerome



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282p19480.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLLib: LinearRegressionWithSGD performance

2014-11-21 Thread Jayant Shekhar
Hi Sameer,

You can try increasing the number of executor-cores.

-Jayant





On Fri, Nov 21, 2014 at 11:18 AM, Sameer Tilak  wrote:

> Hi All,
> I have been using MLLib's linear regression and I have some question
> regarding the performance. We have a cluster of 10 nodes -- each node has
> 24 cores and 148GB memory. I am running my app as follows:
>
> time spark-submit --class medslogistic.MedsLogistic --master yarn-client
> --executor-memory 6G --num-executors 10 /pathtomyapp/myapp.jar
>
> I am also going to play with number of executors (reduce it) may be that
> will give us different results.
>
> The input is a 800MB sparse file in LibSVNM format. Total number of
> features is 150K. It takes approximately 70 minutes for the regression to
> finish. The job imposes very little load on CPU, memory, network, and disk. 
> Total
> number of tasks is 104.  Total time gets divided fairly uniformly across
> these tasks each task. I was wondering, is it possible to reduce the
> execution time further?
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: MLLib: LinearRegressionWithSGD performance

2014-11-21 Thread Jayant Shekhar
Hi Sameer,

You can also use repartition to create a higher number of tasks.

-Jayant


On Fri, Nov 21, 2014 at 12:02 PM, Jayant Shekhar 
wrote:

> Hi Sameer,
>
> You can try increasing the number of executor-cores.
>
> -Jayant
>
>
>
>
>
> On Fri, Nov 21, 2014 at 11:18 AM, Sameer Tilak  wrote:
>
>> Hi All,
>> I have been using MLLib's linear regression and I have some question
>> regarding the performance. We have a cluster of 10 nodes -- each node has
>> 24 cores and 148GB memory. I am running my app as follows:
>>
>> time spark-submit --class medslogistic.MedsLogistic --master yarn-client
>> --executor-memory 6G --num-executors 10 /pathtomyapp/myapp.jar
>>
>> I am also going to play with number of executors (reduce it) may be that
>> will give us different results.
>>
>> The input is a 800MB sparse file in LibSVNM format. Total number of
>> features is 150K. It takes approximately 70 minutes for the regression to
>> finish. The job imposes very little load on CPU, memory, network, and disk. 
>> Total
>> number of tasks is 104.  Total time gets divided fairly uniformly across
>> these tasks each task. I was wondering, is it possible to reduce the
>> execution time further?
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


Spark SQL with Apache Phoenix lower and upper Bound

2014-11-21 Thread Alaa Ali
I want to run queries on Apache Phoenix which has a JDBC driver. The query
that I want to run is:

select ts,ename from random_data_date limit 10

But I'm having issues with the JdbcRDD upper and lowerBound parameters
(that I don't actually understand).

Here's what I have so far:

import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}

val url="jdbc:phoenix:zookeeper"
val sql = "select ts,ename from random_data_date limit ?"
val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url), sql, 5,
10, 2, r => r.getString("ts") + ", " + r.getString("ename"))

But this doesn't work because the sql expression that the JdbcRDD expects
has to have two ?s to represent the lower and upper bound.

How can I run my query through the JdbcRDD?

Regards,
Alaa Ali


Re: Spark SQL with Apache Phoenix lower and upper Bound

2014-11-21 Thread Josh Mahonin
Hi Alaa Ali,

In order for Spark to split the JDBC query in parallel, it expects an upper
and lower bound for your input data, as well as a number of partitions so
that it can split the query across multiple tasks.

For example, depending on your data distribution, you could set an upper
and lower bound on your timestamp range, and spark should be able to create
new sub-queries to split up the data.

Another option is to load up the whole table using the PhoenixInputFormat
as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
functions, but it does let you load up whole tables as RDDs.

I've previously posted example code here:
http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E

There's also an example library implementation here, although I haven't had
a chance to test it yet:
https://github.com/simplymeasured/phoenix-spark

Josh

On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali  wrote:

> I want to run queries on Apache Phoenix which has a JDBC driver. The query
> that I want to run is:
>
> select ts,ename from random_data_date limit 10
>
> But I'm having issues with the JdbcRDD upper and lowerBound parameters
> (that I don't actually understand).
>
> Here's what I have so far:
>
> import org.apache.spark.rdd.JdbcRDD
> import java.sql.{Connection, DriverManager, ResultSet}
>
> val url="jdbc:phoenix:zookeeper"
> val sql = "select ts,ename from random_data_date limit ?"
> val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url), sql,
> 5, 10, 2, r => r.getString("ts") + ", " + r.getString("ename"))
>
> But this doesn't work because the sql expression that the JdbcRDD expects
> has to have two ?s to represent the lower and upper bound.
>
> How can I run my query through the JdbcRDD?
>
> Regards,
> Alaa Ali
>


Re: MongoDB Bulk Inserts

2014-11-21 Thread Benny Thompson
I tried using RDD#mapPartitions but my job completes prematurely and
without error as if nothing gets done.  What I have is fairly simple

sc
.textFile(inputFile)
.map(parser.parse)
.mapPartitions(bulkLoad)

But the Iterator[T] of mapPartitions is always empty, even though I know
map is generating records.


On Thu Nov 20 2014 at 9:25:54 PM Soumya Simanta 
wrote:

> On Thu, Nov 20, 2014 at 10:18 PM, Benny Thompson 
> wrote:
>
>> I'm trying to use MongoDB as a destination for an ETL I'm writing in
>> Spark.  It appears I'm gaining a lot of overhead in my system databases
>> (and possibly in the primary documents themselves);  I can only assume it's
>> because I'm left to using PairRDD.saveAsNewAPIHadoopFile.
>>
>> - Is there a way to batch some of the data together and use Casbah
>> natively so I can use bulk inserts?
>>
>
> Why cannot you write Mongo in a RDD#mapPartition ?
>
>
>>
>> - Is there maybe a less "hacky" way to load to MongoDB (instead of
>> using saveAsNewAPIHadoopFile)?
>>
>>
> If the latency (time by which all data should be in Mongo) is not a
> concern you can try a separate process that uses Akka/Casbah to write from
> HDFS into Mongo.
>
>
>
>


Running Spark application from Tomcat

2014-11-21 Thread Andreas Koch
I have a Spark java application that I run in local-mode. As such it runs
without any issues.

Now, I would like to run it as a webservice from Tomcat. The first issue I
had with this was that the spark-assembly jar contains javax.servlet, which
Tomcat does not allow. Therefore I removed javax.servlet from the jar file.

Now, I get an Exception like this:

java.lang.RuntimeException: class
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
org.apache.hadoop.security.GroupMappingServiceProvider
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1921)
org.apache.hadoop.security.Groups.(Groups.java:64)

org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)

org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)

org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36)

org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109)
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala)
org.apache.spark.SparkContext.(SparkContext.scala:228)

org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:53)


Any ideas?


Thanks,

Andreas


Re: Spark SQL with Apache Phoenix lower and upper Bound

2014-11-21 Thread Alaa Ali
Awesome, thanks Josh, I missed that previous post of yours! But your code
snippet shows a select statement, so what I can do is just run a simple
select with a where clause if I want to, and then run my data processing on
the RDD to mimic the aggregation I want to do with SQL, right? Also,
another question, I still haven't tried this out, but I'll actually be
using this with PySpark, so I'm guessing the PhoenixPigConfiguration and
newHadoopRDD can be defined in PySpark as well?

Regards,
Alaa Ali

On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin  wrote:

> Hi Alaa Ali,
>
> In order for Spark to split the JDBC query in parallel, it expects an
> upper and lower bound for your input data, as well as a number of
> partitions so that it can split the query across multiple tasks.
>
> For example, depending on your data distribution, you could set an upper
> and lower bound on your timestamp range, and spark should be able to create
> new sub-queries to split up the data.
>
> Another option is to load up the whole table using the PhoenixInputFormat
> as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
> functions, but it does let you load up whole tables as RDDs.
>
> I've previously posted example code here:
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E
>
> There's also an example library implementation here, although I haven't
> had a chance to test it yet:
> https://github.com/simplymeasured/phoenix-spark
>
> Josh
>
> On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali  wrote:
>
>> I want to run queries on Apache Phoenix which has a JDBC driver. The
>> query that I want to run is:
>>
>> select ts,ename from random_data_date limit 10
>>
>> But I'm having issues with the JdbcRDD upper and lowerBound parameters
>> (that I don't actually understand).
>>
>> Here's what I have so far:
>>
>> import org.apache.spark.rdd.JdbcRDD
>> import java.sql.{Connection, DriverManager, ResultSet}
>>
>> val url="jdbc:phoenix:zookeeper"
>> val sql = "select ts,ename from random_data_date limit ?"
>> val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url), sql,
>> 5, 10, 2, r => r.getString("ts") + ", " + r.getString("ename"))
>>
>> But this doesn't work because the sql expression that the JdbcRDD expects
>> has to have two ?s to represent the lower and upper bound.
>>
>> How can I run my query through the JdbcRDD?
>>
>> Regards,
>> Alaa Ali
>>
>
>


Re: Spark SQL with Apache Phoenix lower and upper Bound

2014-11-21 Thread Alex Kamil
Ali,

just create a BIGINT column with numeric values in phoenix and use sequences
 to populate it automatically

I included the setup below in case someone starts from scratch

Prerequisites:
- export JAVA_HOME, SCALA_HOME and install sbt
- install hbase in standalone mode

- add phoenix jar  to hbase lib
directory
- start hbase and create a table in phoenix
 to verify
everything is working
- install spark in standalone mode, and verify that it works using spark
shell 

1. create a sequence  in phoenix:
$PHOENIX_HOME/hadoop1/bin/sqlline.py localhost

 > CREATE SEQUENCE IF NOT EXISTS my_schema.my_sequence;

2.add a BIGINT column called e.g. "id" to your table in phoenix

> CREATE TABLE test.orders ( id BIGINT not null primary key, name VARCHAR);

3. add some values
>UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'foo');
>UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'bar');

4. create jdbc adapter (following SimpleApp setup in
Spark->GettingStarted->StandAlone
applications

):

//SparkToJDBC.scala

import java.sql.DriverManager
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import java.util.Date;

import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD

object SparkToJDBC {

  def main(args: Array[String]) {
val sc = new SparkContext("local", "phoenix")
try{
val rdd = new JdbcRDD(sc,() => {

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver").newInstance()

DriverManager.getConnection("jdbc:phoenix:localhost", "", "")
   },
   "SELECT id, name  FROM test.orders WHERE id >= ? AND id
<= ?",
1, 100, 3,
(r:ResultSet) => {
processResultSet(r)
}
).cache()

println("#");
println(rdd.count());
println("#");
 } catch {
  case _: Throwable => println("Could not connect to database")
 }
 sc.stop()
  }

def processResultSet(rs: ResultSet){

  val rsmd = rs.getMetaData()
  val numberOfColumns = rsmd.getColumnCount()

  var i = 1
  while (i <= numberOfColumns) {
val colName = rsmd.getColumnName(i)
val tableName = rsmd.getTableName(i)
val name = rsmd.getColumnTypeName(i)
val caseSen = rsmd.isCaseSensitive(i)
val writable = rsmd.isWritable(i)
println("Information for column " + colName)
println("Column is in table " + tableName)
println("column type is " + name)
println("")
i += 1
  }

  while (rs.next()) {
var i = 1
while (i <= numberOfColumns) {
  val s = rs.getString(i)
  System.out.print(s + "  ")
  i += 1
}
println("")
  }
   }

}

5. build SparkToJDBC.scala
sbt package

6. execute spark job:
note: don't forget to add phoenix jar using --jars option like this:

../spark-1.1.0/bin/spark-submit *--jars ../phoenix-3.1.0-bin/hadoop2/*
*phoenix-3.1.0-client-hadoop2.**jar *--class "SparkToJDBC" --master
local[4] target/scala-2.10/simple-project_2.10-1.0.jar

regards
Alex


On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin  wrote:

> Hi Alaa Ali,
>
> In order for Spark to split the JDBC query in parallel, it expects an
> upper and lower bound for your input data, as well as a number of
> partitions so that it can split the query across multiple tasks.
>
> For example, depending on your data distribution, you could set an upper
> and lower bound on your timestamp range, and spark should be able to create
> new sub-queries to split up the data.
>
> Another option is to load up the whole table using the PhoenixInputFormat
> as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
> functions, but it does let you load up whole tables as RDDs.
>
> I've previously posted example code here:
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=q...@mail.gmail.com%3E
>
> There's also an example library implementation here, although I haven't
> had a chance to test it yet:
> https://github.com/simplymeasured/phoenix-spark
>
> Josh
>
> On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali  wrote:
>
>> I want to run queries on Apache Phoenix which has a JDBC driver. The
>> query

Persist kafka streams to text file

2014-11-21 Thread Joanne Contact
Hello I am trying to read kafka stream to a text file by running spark from
my IDE (IntelliJ IDEA) . The code is similar as a previous thread on
persisting stream to a text file.

I am new to spark or scala. I believe the spark is on local mode as the
console shows
14/11/21 14:17:11 INFO spark.SparkContext: Spark configuration:
spark.app.name=local-mode

 I got the following errors. It is related to Tachyon. But I don't know if
I have tachyon or not.

14/11/21 14:17:54 WARN storage.TachyonBlockManager: Attempt 1 to create
tachyon dir null failed
java.io.IOException: Failed to connect to master localhost/127.0.0.1:19998
after 5 attempts
at tachyon.client.TachyonFS.connect(TachyonFS.java:293)
at tachyon.client.TachyonFS.getFileId(TachyonFS.java:1011)
at tachyon.client.TachyonFS.exist(TachyonFS.java:633)
at
org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:117)
at
org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:106)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.storage.TachyonBlockManager.createTachyonDirs(TachyonBlockManager.scala:106)
at
org.apache.spark.storage.TachyonBlockManager.(TachyonBlockManager.scala:57)
at
org.apache.spark.storage.BlockManager.tachyonStore$lzycompute(BlockManager.scala:88)
at org.apache.spark.storage.BlockManager.tachyonStore(BlockManager.scala:82)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:729)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:594)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: tachyon.org.apache.thrift.TException: Failed to connect to
master localhost/127.0.0.1:19998 after 5 attempts
at tachyon.master.MasterClient.connect(MasterClient.java:178)
at tachyon.client.TachyonFS.connect(TachyonFS.java:290)
... 28 more
Caused by: tachyon.org.apache.thrift.transport.TTransportException:
java.net.ConnectException: Connection refused
at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:185)
at
tachyon.org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
at tachyon.master.MasterClient.connect(MasterClient.java:156)
... 29 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:180)
... 31 more
14/11/21 14:17:54 ERROR storage.TachyonBlockManager: Failed 10 attempts to
create tachyon dir in
/tmp_spark_tachyon/spark-3dbec68b-f5b8-45e1-bb68-370439839d4a/

I looked at the code. It has the following part. Is that a problem?

.persist(StorageLevel.OFF_HEAP)

Any advice?

Thank you!

J


Persist kafka streams to text file, tachyon error?

2014-11-21 Thread Joanne Contact
use the right email list.
-- Forwarded message --
From: Joanne Contact 
Date: Fri, Nov 21, 2014 at 2:32 PM
Subject: Persist kafka streams to text file
To: u...@spark.incubator.apache.org


Hello I am trying to read kafka stream to a text file by running spark from
my IDE (IntelliJ IDEA) . The code is similar as a previous thread on
persisting stream to a text file.

I am new to spark or scala. I believe the spark is on local mode as the
console shows
14/11/21 14:17:11 INFO spark.SparkContext: Spark configuration:
spark.app.name=local-mode

 I got the following errors. It is related to Tachyon. But I don't know if
I have tachyon or not.

14/11/21 14:17:54 WARN storage.TachyonBlockManager: Attempt 1 to create
tachyon dir null failed
java.io.IOException: Failed to connect to master localhost/127.0.0.1:19998
after 5 attempts
at tachyon.client.TachyonFS.connect(TachyonFS.java:293)
at tachyon.client.TachyonFS.getFileId(TachyonFS.java:1011)
at tachyon.client.TachyonFS.exist(TachyonFS.java:633)
at
org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:117)
at
org.apache.spark.storage.TachyonBlockManager$$anonfun$createTachyonDirs$2.apply(TachyonBlockManager.scala:106)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.storage.TachyonBlockManager.createTachyonDirs(TachyonBlockManager.scala:106)
at
org.apache.spark.storage.TachyonBlockManager.(TachyonBlockManager.scala:57)
at
org.apache.spark.storage.BlockManager.tachyonStore$lzycompute(BlockManager.scala:88)
at org.apache.spark.storage.BlockManager.tachyonStore(BlockManager.scala:82)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:729)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:594)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: tachyon.org.apache.thrift.TException: Failed to connect to
master localhost/127.0.0.1:19998 after 5 attempts
at tachyon.master.MasterClient.connect(MasterClient.java:178)
at tachyon.client.TachyonFS.connect(TachyonFS.java:290)
... 28 more
Caused by: tachyon.org.apache.thrift.transport.TTransportException:
java.net.ConnectException: Connection refused
at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:185)
at
tachyon.org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
at tachyon.master.MasterClient.connect(MasterClient.java:156)
... 29 more
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at tachyon.org.apache.thrift.transport.TSocket.open(TSocket.java:180)
... 31 more
14/11/21 14:17:54 ERROR storage.TachyonBlockManager: Failed 10 attempts to
create tachyon dir in
/tmp_spark_tachyon/spark-3dbec68b-f5b8-45e1-bb68-370439839d4a/

I looked at the code. It has the following part. Is that a problem?

.persist(StorageLevel.OFF_HEAP)

Any advice?

Thank you!

J


Re: SparkSQL - can we add new column(s) to parquet files

2014-11-21 Thread Evan Chan
I would expect an SQL query on c would fail because c would not be known in
the schema of the older Parquet file.

What I'd be very interested in is how to add a new column as an incremental
new parquet file, and be able to somehow join the existing and new file, in
an efficient way.   IE, somehow guarantee that for every row in the old
parquet file, that the corresponding rows in the new file would be stored
in the same node, so that joins are local.

On Fri, Nov 21, 2014 at 10:03 AM, Sadhan Sood  wrote:

> We create the table definition by reading the parquet file for schema and
> store it in hive metastore. But if someone adds a new column to the schema,
> and if we rescan the schema from the new parquet files and update the table
> definition, would it still work if we run queries on the table ?
>
> So, old table has -> Int a, Int b
> new table -> Int a, Int b, String c
>
> but older parquet files don't have String c, so on querying the table
> would it return me null for column c  from older files and data from newer
> files or fail?
>


Re: Another accumulator question

2014-11-21 Thread Andrew Ash
Hi Nathan,

It sounds like what you're asking for has already been filed as
https://issues.apache.org/jira/browse/SPARK-664  Does that ticket match
what you're proposing?

Andrew

On Fri, Nov 21, 2014 at 12:29 PM, Nathan Kronenfeld <
nkronenf...@oculusinfo.com> wrote:

> We've done this with reduce - that definitely works.
>
> I've reworked the logic to use accumulators because, when it works, it's
> 5-10x faster
>
> On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen  wrote:
>
>> This sounds more like a use case for reduce? or fold? it sounds like
>> you're kind of cobbling together the same function on accumulators,
>> when reduce/fold are simpler and have the behavior you suggest.
>>
>> On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
>>  wrote:
>> > I think I understand what is going on here, but I was hoping someone
>> could
>> > confirm (or explain reality if I don't) what I'm seeing.
>> >
>> > We are collecting data using a rather sizable accumulator -
>> essentially, an
>> > array of tens of thousands of entries.  All told, about 1.3m of data.
>> >
>> > If I understand things correctly, it looks to me like, when our job is
>> done,
>> > a copy of this array is retrieved from each individual task, all at
>> once,
>> > for combination on the client - which means, with 400 tasks to the job,
>> each
>> > collection is using up half a gig of memory on the client.
>> >
>> > Is this true?  If so, does anyone know a way to get accumulators to
>> > accumulate as results collect, rather than all at once at the end, so we
>> > only have to hold a few in memory at a time, rather than all 400?
>> >
>> > Thanks,
>> >   -Nathan
>> >
>> >
>> > --
>> > Nathan Kronenfeld
>> > Senior Visualization Developer
>> > Oculus Info Inc
>> > 2 Berkeley Street, Suite 600,
>> > Toronto, Ontario M5A 4J5
>> > Phone:  +1-416-203-3003 x 238
>> > Email:  nkronenf...@oculusinfo.com
>>
>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com
>


RE: Using TF-IDF from MLlib

2014-11-21 Thread Daniel, Ronald (ELS-SDG)
Thanks for the info Andy. A big help.

One thing - I think you can figure out which document is responsible for which 
vector without checking in more code.
Start with a PairRDD of [doc_id, doc_string] for each document and split that 
into one RDD for each column.
The values in the doc_string RDD get split and turned into a Seq and fed to 
TFIDF.
You can take the resulting RDD[Vector]s and zip them with the doc_id RDD. 
Presto!

Best regards,
Ron





Re: Using TF-IDF from MLlib

2014-11-21 Thread andy petrella
Yeah, I initially used zip but I was wondering how reliable it is. I mean,
it's the order guaranteed? What if some mode fail, and the data is pulled
out from different nodes?
And even if it can work, I found this implicit semantic quite
uncomfortable, don't you?

My0.2c

Le ven 21 nov. 2014 15:26, Daniel, Ronald (ELS-SDG) 
a écrit :

> Thanks for the info Andy. A big help.
>
> One thing - I think you can figure out which document is responsible for
> which vector without checking in more code.
> Start with a PairRDD of [doc_id, doc_string] for each document and split
> that into one RDD for each column.
> The values in the doc_string RDD get split and turned into a Seq and fed
> to TFIDF.
> You can take the resulting RDD[Vector]s and zip them with the doc_id RDD.
> Presto!
>
> Best regards,
> Ron
>
>
>
>


Book: Data Analysis with SparkR

2014-11-21 Thread Emaasit
Is the a book on SparkR for the absolute & terrified beginner?
I use R for my daily analysis and I am interested in a detailed guide to
using SparkR for data analytics: like a book or online tutorials. If there's
any please direct me to the address.

Thanks,
Daniel



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Book-Data-Analysis-with-SparkR-tp19529.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to deal with BigInt in my case class for RDD => SchemaRDD convertion

2014-11-21 Thread Jianshi Huang
Ah yes. I found it too in the manual. Thanks for the help anyway!

Since BigDecimal is just a wrapper around BigInt, let's also convert to
BigInt to Decimal.

I created a ticket. https://issues.apache.org/jira/browse/SPARK-4549

Jianshi

On Fri, Nov 21, 2014 at 11:30 PM, Yin Huai  wrote:

> Hello Jianshi,
>
> The reason of that error is that we do not have a Spark SQL data type for
> Scala BigInt. You can use Decimal for your case.
>
> Thanks,
>
> Yin
>
> On Fri, Nov 21, 2014 at 5:11 AM, Jianshi Huang 
> wrote:
>
>> Hi,
>>
>> I got an error during rdd.registerTempTable(...) saying scala.MatchError:
>> scala.BigInt
>>
>> Looks like BigInt cannot be used in SchemaRDD, is that correct?
>>
>> So what would you recommend to deal with it?
>>
>> Thanks,
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Missing parents for stage (Spark Streaming)

2014-11-21 Thread YaoPau
When I submit a Spark Streaming job, I see these INFO logs printing
frequently:

14/11/21 18:53:17 INFO DAGScheduler: waiting: Set(Stage 216)
14/11/21 18:53:17 INFO DAGScheduler: failed: Set()
14/11/21 18:53:17 INFO DAGScheduler: Missing parents for Stage 216: List()
14/11/21 18:53:17 INFO DAGScheduler: Submitting Stage 216 (MappedRDD[1733]
at map at MappedDStream.scala:35), which is now runnable

I have a feeling this means there is some error with a Map I created as a
broadcast variable, but I'm not sure.  How can I figure out what this is
referring to?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Missing-parents-for-stage-Spark-Streaming-tp19530.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Book: Data Analysis with SparkR

2014-11-21 Thread Zongheng Yang
Hi Daniel,

Thanks for your email! We don't have a book (yet?) specifically on SparkR,
but here's a list of helpful tutorials / links you can check out (I am
listing them in roughly basic -> advanced order):

- AMPCamp5 SparkR exercises
. This covers the
basics of SparkR's API, performs basic analytics, and visualizes the
results.
- SparkR examples
. We have
K-means, logistic regression, MNIST solver, \pi estimation, word count and
other examples available.
- Running SparkR on EC2
.
This entry details the steps to run a SparkR program on an EC2 cluster.

Finally, we have a talk at the AMPCamp 
today on SparkR, whose video & slides will be available soon on the website
-- it covers the basics of the interface & what you can do with it.
Additionally, you could direct any SparkR questions to our sparkr-dev
 mailing list.

Let us know if you have further questions.

Zongheng

On Fri Nov 21 2014 at 3:48:53 PM Emaasit  wrote:

> Is the a book on SparkR for the absolute & terrified beginner?
> I use R for my daily analysis and I am interested in a detailed guide to
> using SparkR for data analytics: like a book or online tutorials. If
> there's
> any please direct me to the address.
>
> Thanks,
> Daniel
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Book-Data-Analysis-with-SparkR-tp19529.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: MongoDB Bulk Inserts

2014-11-21 Thread Soumya Simanta
bulkLoad has the connection to MongoDB ?

On Fri, Nov 21, 2014 at 4:34 PM, Benny Thompson 
wrote:

> I tried using RDD#mapPartitions but my job completes prematurely and
> without error as if nothing gets done.  What I have is fairly simple
>
> sc
> .textFile(inputFile)
> .map(parser.parse)
> .mapPartitions(bulkLoad)
>
> But the Iterator[T] of mapPartitions is always empty, even though I know
> map is generating records.
>
>
> On Thu Nov 20 2014 at 9:25:54 PM Soumya Simanta 
> wrote:
>
>> On Thu, Nov 20, 2014 at 10:18 PM, Benny Thompson 
>> wrote:
>>
>>> I'm trying to use MongoDB as a destination for an ETL I'm writing in
>>> Spark.  It appears I'm gaining a lot of overhead in my system databases
>>> (and possibly in the primary documents themselves);  I can only assume it's
>>> because I'm left to using PairRDD.saveAsNewAPIHadoopFile.
>>>
>>> - Is there a way to batch some of the data together and use Casbah
>>> natively so I can use bulk inserts?
>>>
>>
>> Why cannot you write Mongo in a RDD#mapPartition ?
>>
>>
>>>
>>> - Is there maybe a less "hacky" way to load to MongoDB (instead of
>>> using saveAsNewAPIHadoopFile)?
>>>
>>>
>> If the latency (time by which all data should be in Mongo) is not a
>> concern you can try a separate process that uses Akka/Casbah to write from
>> HDFS into Mongo.
>>
>>
>>
>>


allocating different memory to different executor for same application

2014-11-21 Thread tridib
Hello Experts,
I have 5 worker machines with different size of RAM. is there a way to
configure it with different executor memory?

Currently I see that all worker spins up 1 executor with same amount of
memory.

Thanks & Regards
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/allocating-different-memory-to-different-executor-for-same-application-tp19534.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Another accumulator question

2014-11-21 Thread Nathan Kronenfeld
I"m not sure if it's an exact match, or just very close :-)

I don't think our problem is the workload on the driver, I think it's just
memory - so while the solution proposed there would work, it would also be
sufficient for our purposes, I believe, simply to clear each block as soon
as it's added into the canonical version, and try to do so as soon as
possible - but I could be misunderstanding some of the timing, I'm still
investigating.

Though to combine on the worker before returning, as he suggests, would
probably be even better.

On Fri, Nov 21, 2014 at 6:08 PM, Andrew Ash  wrote:

> Hi Nathan,
>
> It sounds like what you're asking for has already been filed as
> https://issues.apache.org/jira/browse/SPARK-664  Does that ticket match
> what you're proposing?
>
> Andrew
>
> On Fri, Nov 21, 2014 at 12:29 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> We've done this with reduce - that definitely works.
>>
>> I've reworked the logic to use accumulators because, when it works, it's
>> 5-10x faster
>>
>> On Fri, Nov 21, 2014 at 4:44 AM, Sean Owen  wrote:
>>
>>> This sounds more like a use case for reduce? or fold? it sounds like
>>> you're kind of cobbling together the same function on accumulators,
>>> when reduce/fold are simpler and have the behavior you suggest.
>>>
>>> On Fri, Nov 21, 2014 at 5:46 AM, Nathan Kronenfeld
>>>  wrote:
>>> > I think I understand what is going on here, but I was hoping someone
>>> could
>>> > confirm (or explain reality if I don't) what I'm seeing.
>>> >
>>> > We are collecting data using a rather sizable accumulator -
>>> essentially, an
>>> > array of tens of thousands of entries.  All told, about 1.3m of data.
>>> >
>>> > If I understand things correctly, it looks to me like, when our job is
>>> done,
>>> > a copy of this array is retrieved from each individual task, all at
>>> once,
>>> > for combination on the client - which means, with 400 tasks to the
>>> job, each
>>> > collection is using up half a gig of memory on the client.
>>> >
>>> > Is this true?  If so, does anyone know a way to get accumulators to
>>> > accumulate as results collect, rather than all at once at the end, so
>>> we
>>> > only have to hold a few in memory at a time, rather than all 400?
>>> >
>>> > Thanks,
>>> >   -Nathan
>>> >
>>> >
>>> > --
>>> > Nathan Kronenfeld
>>> > Senior Visualization Developer
>>> > Oculus Info Inc
>>> > 2 Berkeley Street, Suite 600,
>>> > Toronto, Ontario M5A 4J5
>>> > Phone:  +1-416-203-3003 x 238
>>> > Email:  nkronenf...@oculusinfo.com
>>>
>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


spark-sql broken

2014-11-21 Thread tridib
After taking today's build from master branch I started getting this error
when run spark-sql:

Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.

I used following command for building:
 ./make-distribution.sh --tgz -Pyarn -Dyarn.version=2.4.0 -Phadoop-2.4
-Dhadoop.version=2.4.0 -Phive  -Phive-thriftserver -DskipTests

Is there anything I am missing?

Thanks
Tridib




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-broken-tp19536.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava

2014-11-21 Thread Judy Nash
Hi,

Thrift server is failing to start for me on latest spark 1.2 branch.

I got the error below when I start thrift server.
Exception in thread "main" java.lang.NoClassDefFoundError: com/google/common/bas
e/Preconditions
at org.apache.hadoop.conf.Configuration$DeprecationDelta.(Configur
ation.java:314)

Here is my setup:

1)  Latest spark 1.2 branch build

2)  Used build command:

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver 
-DskipTests clean package

3)  Added hive-site.xml to \conf

4)  Version on the box: Hive 0.13, Hadoop 2.4

Is this a real bug or am I doing something wrong?

---
Full Stacktrace:
Exception in thread "main" java.lang.NoClassDefFoundError: com/google/common/bas
e/Preconditions
at org.apache.hadoop.conf.Configuration$DeprecationDelta.(Configur
ation.java:314)
at org.apache.hadoop.conf.Configuration$DeprecationDelta.(Configur
ation.java:327)
at org.apache.hadoop.conf.Configuration.(Configuration.java:409)

at org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU
til.scala:82)
at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:
42)
at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala
:202)
at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.sca
la)
at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:105)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:180)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)
at org.apache.spark.SparkContext.(SparkContext.scala:230)
at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.
scala:38)
at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh
riftServer2.scala:56)
at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr
iftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.google.common.base.Precondition
s
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)


Spark streaming job failing after some time.

2014-11-21 Thread pankaj channe
I have seen similar posts on this issue but could not find solution.
Apologies if this has been discussed here before.

I am running a spark streaming job with yarn on a 5 node cluster. I am
using following command to submit my streaming job.

spark-submit --class class_name --master yarn-cluster --num-executors 1
--driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar


After running for some time, the job stops. The application log shows
following two errors:

14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve
SparkContext in spite of waiting for 10, maxNumTries = 10
Exception in thread "main" java.lang.NullPointerException
at
org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


and later...

Failed to list files for dir:
/data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20
at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686)
at
org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
at
org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at
org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169)


Note: I am building my jar on my local with spark dependency added in
pom.xml and running it on cluster running spark.


-Pankaj


Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava

2014-11-21 Thread Cheng Lian
Hi Judy, could you please provide the commit SHA1 of the version you're 
using? Thanks!


On 11/22/14 11:05 AM, Judy Nash wrote:


Hi,

Thrift server is failing to start for me on latest spark 1.2 branch.

I got the error below when I start thrift server.

Exception in thread "main" java.lang.NoClassDefFoundError: 
com/google/common/bas


e/Preconditions

at 
org.apache.hadoop.conf.Configuration$DeprecationDelta.(Configur


ation.java:314)….

Here is my setup:

1)Latest spark 1.2 branch build

2)Used build command:

mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive 
-Phive-thriftserver -DskipTests clean package


3)Added hive-site.xml to \conf

4)Version on the box: Hive 0.13, Hadoop 2.4

Is this a real bug or am I doing something wrong?

---

Full Stacktrace:

Exception in thread "main" java.lang.NoClassDefFoundError: 
com/google/common/bas


e/Preconditions

at 
org.apache.hadoop.conf.Configuration$DeprecationDelta.(Configur


ation.java:314)

at 
org.apache.hadoop.conf.Configuration$DeprecationDelta.(Configur


ation.java:327)

at 
org.apache.hadoop.conf.Configuration.(Configuration.java:409)


at 
org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopU


til.scala:82)

at 
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:


42)

at 
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala


:202)

at 
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.sca


la)

at 
org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784)


at 
org.apache.spark.storage.BlockManager.(BlockManager.scala:105)


at 
org.apache.spark.storage.BlockManager.(BlockManager.scala:180)


at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292)

at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)

at org.apache.spark.SparkContext.(SparkContext.scala:230)

at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.


scala:38)

at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveTh


riftServer2.scala:56)

at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThr


iftServer2.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.


java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces


sorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353)


at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.ClassNotFoundException: 
com.google.common.base.Precondition


s

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)





Extracting values from a Collecion

2014-11-21 Thread Jey Kottalam
Hi Sanjay,

These are instances of the standard Scala collection type "Set", and its
documentation can be found by googling the phrase "scala set".

Hope that helps,
-Jey

On Fri, Nov 21, 2014 at 10:41 AM, Sanjay Subramanian
 wrote:
> hey guys
>
> names.txt
> =
> 1,paul
> 2,john
> 3,george
> 4,ringo
>
>
> songs.txt
> =
> 1,Yesterday
> 2,Julia
> 3,While My Guitar Gently Weeps
> 4,With a Little Help From My Friends
> 1,Michelle
> 2,Nowhere Man
> 3,Norwegian Wood
> 4,Octopus's Garden
>
> What I want to do is real simple
>
> Desired Output
> ==
> (4,(With a Little Help From My Friends, Octopus's Garden))
> (2,(Julia, Nowhere Man))
> (3,(While My Guitar Gently Weeps, Norwegian Wood))
> (1,(Yesterday, Michelle))
>
>
> My Code
> ===
> val file1Rdd =
> sc.textFile("/Users/sansub01/mycode/data/songs/names.txt").map(x =>
> (x.split(",")(0), x.split(",")(1)))
> val file2Rdd =
> sc.textFile("/Users/sansub01/mycode/data/songs/songs.txt").map(x =>
> (x.split(",")(0), x.split(",")(1)))
> val file2RddGrp = file2Rdd.groupByKey()
> file2Rdd.groupByKey().mapValues(names =>
> names.toSet).collect().foreach(println)
>
> Result
> ===
> (4,Set(With a Little Help From My Friends, Octopus's Garden))
> (2,Set(Julia, Nowhere Man))
> (3,Set(While My Guitar Gently Weeps, Norwegian Wood))
> (1,Set(Yesterday, Michelle))
>
>
> How can I extract values from the Set ?
>
> Thanks
>
> sanjay
>