number of "Cached Partitions" v.s. "Total Partitions"

2014-07-22 Thread Haopu Wang
Hi, I'm using local mode and read a text file as RDD using
JavaSparkContext.textFile() API.

And then call "cache()" method on the result RDD.

 

I look at the Storage information and find the RDD has 3 partitions but
2 of them have been cached.

Is this a normal behavior? I assume all of partitions should be cached
or none of them.

If I'm wrong, what are the cases when number of "cached" partitions is
less than the total number of partitions?

 

 



RE: number of "Cached Partitions" v.s. "Total Partitions"

2014-07-22 Thread Shao, Saisai
Yes, it's normal when memory is not enough to put the third partition, as you 
can see in your attached picture.

Thanks
Jerry

From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, July 22, 2014 3:09 PM
To: user@spark.apache.org
Subject: number of "Cached Partitions" v.s. "Total Partitions"


Hi, I'm using local mode and read a text file as RDD using 
JavaSparkContext.textFile() API.

And then call "cache()" method on the result RDD.



I look at the Storage information and find the RDD has 3 partitions but 2 of 
them have been cached.

Is this a normal behavior? I assume all of partitions should be cached or none 
of them.

If I'm wrong, what are the cases when number of "cached" partitions is less 
than the total number of partitions?



[cid:image001.jpg@01CFA5C3.0AE4B440]


Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-22 Thread Victor Sheng
Hi, Yin Huai
I test again with your snippet code.
It works well in spark-1.0.1

Here is my code:
 
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 case class Record(data_date: String, mobile: String, create_time: String)
 val mobile = Record("2014-07-20","1234567","2014-07-19")
 val lm = List(mobile)
 val mobileRDD = sc.makeRDD(lm)
 val mobileSchemaRDD = sqlContext.createSchemaRDD(mobileRDD)
 mobileSchemaRDD.registerAsTable("mobile")
 sqlContext.sql("select count(1) from mobile").collect()
 
The Result is like below:
14/07/22 15:49:53 INFO spark.SparkContext: Job finished: collect at
SparkPlan.scala:52, took 0.296864832 s
res9: Array[org.apache.spark.sql.Row] = Array([1])

   
   But what is the main cause of this exception? And how you find it out by
looking some unknown characters like $line11.$read$
$line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$ ? 

Thanks,
Victor




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10390.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Job aborted due to stage failure: TID x failed for unknown reasons

2014-07-22 Thread Alessandro Lulli
Hi All,

Can someone help on this?

I'm encountering exactly the same issue in a very similar scenario with the
same spark version.

Thanks
Alessandro


On Fri, Jul 18, 2014 at 8:30 PM, Shannon Quinn  wrote:

>  Hi all,
>
> I'm dealing with some strange error messages that I *think* comes down to
> a memory issue, but I'm having a hard time pinning it down and could use
> some guidance from the experts.
>
> I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; one
> has 16GB memory, the other 32GB (which is the master). My application
> involves computing pairwise pixel affinities in images, though the images
> I've tested so far only get as big as 1920x1200, and as small as 16x16.
>
> I did have to change a few memory and parallelism settings, otherwise I
> was getting explicit OutOfMemoryExceptions. In spark-default.conf:
>
> spark.executor.memory14g
> spark.default.parallelism32
> spark.akka.frameSize1000
>
> In spark-env.sh:
>
> SPARK_DRIVER_MEMORY=10G
>
> With those settings, however, I get a bunch of WARN statements about "Lost
> TIDs" (no task is successfully completed) in addition to lost Executors,
> which are repeated 4 times until I finally get the following error message
> and crash:
>
> ---
>
> 14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0
> 14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at
> /home/user/Programming/PySpark-Affinities/affinity.py:243
> Traceback (most recent call last):
>   File "/home/user/Programming/PySpark-Affinities/affinity.py", line 243,
> in 
> lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]])
>   File
> "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py",
> line 583, in collect
> bytesInJava = self._jrdd.collect().iterator()
>   File
> "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 537, in __call__
>   File
> "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0:13 failed 4 times, most recent failure: *TID 32 on host
> master.host.univ.edu  failed for unknown
> reason*
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4)
> 14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove executor
> 4 from BlockManagerMaster.
> 14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in
> removeExecutor
> user@master:~/Programming/PySpark-Affinities$
>
> ---
>
> If I run the really small image instead (16x16), it *appears* to run to
> completion (gives me the output I expect without any exceptions being
> thrown). However, in the stderr logs for the app that was run, it lists the
> state as "KILLED" with the final message a "ERROR
> CoarseGrainedExecutorBackend: Driver Disassociated". If I run any larger
> images, I get the exception I pasted above.
>
> Furthermore, if I just do a spark-submit with master=local[*], aside from
>

Re: Why spark-submit command hangs?

2014-07-22 Thread Earthson
I've just have the same problem.

I'm using


$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode client $JOBJAR
--class $JOBCLASS


It's really strange, because the log shows that 


14/07/22 16:16:58 INFO ui.SparkUI: Started SparkUI at
http://k1227.mzhen.cn:4040
14/07/22 16:16:58 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/07/22 16:16:58 INFO spark.SparkContext: Added JAR
/home/workspace/ci-demo/target/scala-2.10/SemiData-CIDemo-Library-assembly-0.1.jar
at http://192.168.7.37:53050/jars/SemiData-CIDemo-Library-assembly-0.1.jar
with timestamp 1406017018666
14/07/22 16:16:58 INFO cluster.YarnClusterScheduler: Created
YarnClusterScheduler
14/07/22 16:16:58 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook
for context org.apache.spark.SparkContext@41ecfc8c


Why cluster.YarnClusterScheduler start? where's the Client?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10392.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: gain access to persisted rdd

2014-07-22 Thread mrm
Ok, thanks for the answers. Unfortunately, there is no sc.getPersistentRDDs
for pyspark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313p10393.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Why spark-submit command hangs?

2014-07-22 Thread Earthson
That's what my problem is:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10394.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: saveAsSequenceFile for DStream

2014-07-22 Thread Sean Owen
What about simply:

dstream.foreachRDD(_.saveAsSequenceFile(...))

?

On Tue, Jul 22, 2014 at 2:06 AM, Barnaby  wrote:
> First of all, I do not know Scala, but learning.
>
> I'm doing a proof of concept by streaming content from a socket, counting
> the words and write it to a Tachyon disk. A different script will read the
> file stream and print out the results.
>
>  val lines = ssc.socketTextStream(args(0), args(1).toInt,
> StorageLevel.MEMORY_AND_DISK_SER)
>  val words = lines.flatMap(_.split(" "))
>  val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>  wordCounts.saveAs???Files("tachyon://localhost:19998/files/WordCounts")
>  ssc.start()
>  ssc.awaitTermination()
>
> I already did a proof of concept to write and read sequence files but there
> doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the
> best way to write out an RDD to a stream so that the timestamps are in the
> filenames and so there is minimal overhead in reading the data back in as
> "objects", see below.
>
> My simple successful proof was the following:
> val rdd =  sc.parallelize(Array(("a",2), ("b",3), ("c",1)))
> rdd.saveAsSequenceFile("tachyon://.../123.sf2")
> val rdd2 = sc.sequenceFile[String,Int]("tachyon://.../123.sf2")
>
> How can I do something similar with streaming?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Executor metrics in spark application

2014-07-22 Thread Denes
Hi Jerry,

I know that way of registering a metrics, but it seems defeat the whole
purpose. I'd like to define a source that is set within the application, for
example number of parsed messages. 
If I register it in the metrics.properties, how can I obtain the instance?
(or instances?)
How can I set the property? Is there a way to read an accumulator values
from a Source?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark over graphviz (SPARK-1015, SPARK-975)

2014-07-22 Thread jay vyas
Hi spark.

I see there has been some work around graphviz visualization for spark jobs.

1) I'm wondering if anyone actively maintaining this stuff, and if so what
the best docs are for it - or else, if there is interest in an upstream
JIRA for updating the graphviz APIs it.

2) Also, am curious about utilities for visualizing/optimizing the flow of
data through an RDD at runtime and where those are in the existing codebase.

Any thoughts around pipeline visualization for spark would be appreciated.
I see some conversations about it in JIRAs but not sure what the future is
for this , possibly I could lend a hand if there are any loose ends needing
to be tied.

-- 
jay vyas


RE: Executor metrics in spark application

2014-07-22 Thread Shao, Saisai
Yeah, I start to know your purpose. Original design purpose of customized 
metrics source is focused on self-contained source, seems you need to rely on 
outer variable, so the way you mentioned may be is the only way to register.

Besides, as you cannot see the source in Ganglia, I think you can enable 
console sink to verify the outputs, also seems you want to register this source 
in driver, so you need to enable Ganglia sink on driver side and make sure 
Ganglia client can connect your driver.

Thanks
Jerry

-Original Message-
From: Denes [mailto:te...@outlook.com] 
Sent: Tuesday, July 22, 2014 6:38 PM
To: u...@spark.incubator.apache.org
Subject: RE: Executor metrics in spark application

Hi Jerry,

I know that way of registering a metrics, but it seems defeat the whole 
purpose. I'd like to define a source that is set within the application, for 
example number of parsed messages. 
If I register it in the metrics.properties, how can I obtain the instance?
(or instances?)
How can I set the property? Is there a way to read an accumulator values from a 
Source?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


collect() on small group of Avro files causes plain NullPointerException

2014-07-22 Thread Sparky
Running a simple collect method on a group of Avro objects causes a plain
NullPointerException.  Does anyone know what may be wrong?

>files.collect()

Press ENTER or type command to continue
Exception in thread "Executor task launch worker-0"
java.lang.NullPointerException
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
at scala.Option.flatMap(Option.scala:170)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-group-of-Avro-files-causes-plain-NullPointerException-tp10400.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: collect() on small list causes NullPointerException

2014-07-22 Thread Sparky
For those curious I was using KryoRegistrator it was causing some null
pointer exception.  I removed the code and problem went away.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-list-causes-NullPointerException-tp10400p10402.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: collect() on small group of Avro files causes plain NullPointerException

2014-07-22 Thread Eugen Cepoi
Do you have a list/array in your avro record? If yes this could cause the
problem. I experienced this kind of problem and solved it by providing
custom kryo ser/de for avro lists. Also be carefull spark reuses records,
so if you just read and then don't copy/transform them you would end up
with the records having same values.


2014-07-22 15:01 GMT+02:00 Sparky :

> Running a simple collect method on a group of Avro objects causes a plain
> NullPointerException.  Does anyone know what may be wrong?
>
> >files.collect()
>
> Press ENTER or type command to continue
> Exception in thread "Executor task launch worker-0"
> java.lang.NullPointerException
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$2.apply(Executor.scala:254)
> at scala.Option.flatMap(Option.scala:170)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-small-group-of-Avro-files-causes-plain-NullPointerException-tp10400.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


hadoop version

2014-07-22 Thread mrm
Hi,

Where can I find the version of Hadoop my cluster is using? I launched my
ec2 cluster using the spark-ec2 script with the "--hadoop-major-version=2"
option. However, the folder "hadoop-native/lib" in the master node only
contains files that end in 1.0.0. Does that mean that I have Hadoop version
1?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-version-tp10405.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


the implications of some items in webUI

2014-07-22 Thread Yifan LI
Hi,

I am analysing the application processing on Spark(GraphX), but feeling a 
little confused on some items of webUI.

1) what is the difference between "Duration"(Stages -> Completed Stages) and 
"Task Time"(Executors) ?
for instance, "43s" VS. "5.6 m"
"Task Time" is approximated to "Duration" multiplied with "Total Tasks"?

2) what are the exact meanings of "Shuffle Read/Shuffle Write"?


Best,
Yifan LI




Using case classes as keys does not seem to work.

2014-07-22 Thread Gerard Maas
Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

A minimal example:

case class P(name:String)
val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
[Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
(P(bob),1), (P(abe),1), (P(charly),1))

In contrast to the expected behavior, that should be equivalent to:
sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

Any ideas why this doesn't work?

-kr, Gerard.


Re: Is there anyone who use streaming join to filter spam as guide mentioned?

2014-07-22 Thread hawkwang

Hi TD,

Eventually I found that I made a mistake - the RDD I used for join does 
not contain any content.

Now it works.

Thanks,
Hawk

On 2014年07月21日 17:58, Tathagata Das wrote:

Could you share your code snippet so that we can take a look?

TD



On Mon, Jul 21, 2014 at 7:23 AM, hawkwang > wrote:


Hello guys,

I'm just trying to use spark streaming features.
I noticed that there is join example for filtering spam, so I just
want to try.
But, nothing happens after join, the output JavaPairDStream
content is same as before.
So, is there any examples that I can refer to?

Thanks for any suggestions.

Regards,
Hawk






Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Gerard Maas
Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
and derivates like 'distinct'.

groupByKey() seems to work

sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
(abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))



On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas  wrote:

> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>
> A minimal example:
>
> case class P(name:String)
> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
> (P(bob),1), (P(abe),1), (P(charly),1))
>
> In contrast to the expected behavior, that should be equivalent to:
> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>
> Any ideas why this doesn't work?
>
> -kr, Gerard.
>


Spark Streaming - How to save all items in batchs from beginning to a single stream rdd?

2014-07-22 Thread hawkwang

Hi guys,

Is it possible to generate a single stream rdd which can be updated with 
new batch rdd content?


I know that we can use updateStateByKey to make aggregation,
but here just want to keep tracking all historical original content.

I also noticed that we can save to redis or other storage system,
but can we just use spark streaming mechanism to make it happen?

Thanks for any suggestion.

Regards,
Hawk



Re: Problem running Spark shell (1.0.0) on EMR

2014-07-22 Thread Martin Goodson
I am also having exactly the same problem, calling using pyspark. Has
anyone managed to get this script to work?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Wed, Jul 16, 2014 at 2:10 PM, Ian Wilkinson  wrote:

> Hi,
>
> I’m trying to run the Spark (1.0.0) shell on EMR and encountering a
> classpath issue.
> I suspect I’m missing something gloriously obviously, but so far it is
> eluding me.
>
> I launch the EMR Cluster (using the aws cli) with:
>
> aws emr create-cluster --name "Test Cluster"  \
> --ami-version 3.0.3 \
> --no-auto-terminate \
> --ec2-attributes KeyName=<...> \
> --bootstrap-actions
> Path=s3://elasticmapreduce/samples/spark/1.0.0/install-spark-shark.rb \
> --instance-groups
> InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m1.medium  \
> InstanceGroupType=CORE,InstanceCount=1,InstanceType=m1.medium
> --region eu-west-1
>
> then,
>
> $ aws emr ssh --cluster-id <...> --key-pair-file <...> --region eu-west-1
>
> On the master node, I then launch the shell with:
>
> [hadoop@ip-... spark]$ ./bin/spark-shell
>
> and try performing:
>
> scala> val logs = sc.textFile("s3n://.../“)
>
> this produces:
>
> 14/07/16 12:40:35 WARN storage.BlockManager: Putting block broadcast_0
> failed
> java.lang.NoSuchMethodError:
> com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
>
>
> Any help mighty welcome,
> ian
>
>


Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Daniel Siegmann
I can confirm this bug. The behavior for groupByKey is the same as
reduceByKey - your example is actually grouping on just the name. Try this:

sc.parallelize(ps).map(x=> (x,1)).groupByKey().collect
res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
(P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
(P(charly),ArrayBuffer(1)))


On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas  wrote:

> Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
> and derivates like 'distinct'.
>
> groupByKey() seems to work
>
> sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
> res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
> (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))
>
>
>
> On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas 
> wrote:
>
>> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>>
>> A minimal example:
>>
>> case class P(name:String)
>> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
>> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
>> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
>> (P(bob),1), (P(abe),1), (P(charly),1))
>>
>> In contrast to the expected behavior, that should be equivalent to:
>> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
>> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>>
>> Any ideas why this doesn't work?
>>
>> -kr, Gerard.
>>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Tranforming flume events using Spark transformation functions

2014-07-22 Thread Sundaram, Muthu X.
Hi All,
  I am getting events from flume using following line.

  JavaDStream flumeStream = FlumeUtils.createStream(ssc, host, 
port);

Each event is a delimited record. I like to use some of the transformation 
functions like map and reduce on this. Do I need to convert the 
JavaDStream to JavaDStream or can I apply these 
function directly on this?

I need to do following kind of operations

 AA
YDelta
TAA
 Southwest
 AA

Unique tickets are  , Y, , , .
Count is  2,  1, T 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 
ticket.

I have to do transformations like this. Right now I am able to receives 
records. But I am struggling to transform them using spark transformation 
functions since they are not of type JavaRDD.

Can I create new JavaRDD? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new Function,Void> () {
  @Override
  public Void call(JavaRDD eventsData) throws 
Exception {
 String logRecord = null;
 List events = eventsData.collect();
 Iterator batchedEvents = 
events.iterator();
 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;
 // All the user level data is carried as payload in Flume 
Event
 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println("LOG RECORD = " + 
logRecord);
}

Where do I create new JavaRDD? DO I do it before this loop? How do I 
create this JavaRDD?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu




Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Gerard Maas
Yes, right. 'sc.parallelize(ps).map(x=> (**x.name**,1)).groupByKey().collect
'
An oversight from my side.

Thanks!,  Gerard.


On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann 
wrote:

> I can confirm this bug. The behavior for groupByKey is the same as
> reduceByKey - your example is actually grouping on just the name. Try
> this:
>
> sc.parallelize(ps).map(x=> (x,1)).groupByKey().collect
> res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
> (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
> (P(charly),ArrayBuffer(1)))
>
>
> On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas 
> wrote:
>
>> Just to narrow down the issue, it looks like the issue is in
>> 'reduceByKey' and derivates like 'distinct'.
>>
>> groupByKey() seems to work
>>
>> sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
>> res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
>> (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))
>>
>>
>>
>> On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas 
>> wrote:
>>
>>> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>>>
>>> A minimal example:
>>>
>>> case class P(name:String)
>>> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
>>> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
>>> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
>>> (P(bob),1), (P(abe),1), (P(charly),1))
>>>
>>> In contrast to the expected behavior, that should be equivalent to:
>>> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
>>> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>>>
>>> Any ideas why this doesn't work?
>>>
>>> -kr, Gerard.
>>>
>>
>>
>
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegm...@velos.io W: www.velos.io
>


Re: saveAsSequenceFile for DStream

2014-07-22 Thread Barnaby Falls
Thanks Sean! I got that working last night similar to how you solved it. Any 
ideas about how to monitor that same folder in another script by creating a 
stream? I can use sc.sequenceFile() to read in the RDD, but how do I get the 
name of the file that got added since there is no sequenceFileStream() method? 
Thanks again for your help.

> On Jul 22, 2014, at 1:57, "Sean Owen"  wrote:
> 
> What about simply:
> 
> dstream.foreachRDD(_.saveAsSequenceFile(...))
> 
> ?
> 
>> On Tue, Jul 22, 2014 at 2:06 AM, Barnaby  wrote:
>> First of all, I do not know Scala, but learning.
>> 
>> I'm doing a proof of concept by streaming content from a socket, counting
>> the words and write it to a Tachyon disk. A different script will read the
>> file stream and print out the results.
>> 
>> val lines = ssc.socketTextStream(args(0), args(1).toInt,
>> StorageLevel.MEMORY_AND_DISK_SER)
>> val words = lines.flatMap(_.split(" "))
>> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>> wordCounts.saveAs???Files("tachyon://localhost:19998/files/WordCounts")
>> ssc.start()
>> ssc.awaitTermination()
>> 
>> I already did a proof of concept to write and read sequence files but there
>> doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the
>> best way to write out an RDD to a stream so that the timestamps are in the
>> filenames and so there is minimal overhead in reading the data back in as
>> "objects", see below.
>> 
>> My simple successful proof was the following:
>> val rdd =  sc.parallelize(Array(("a",2), ("b",3), ("c",1)))
>> rdd.saveAsSequenceFile("tachyon://.../123.sf2")
>> val rdd2 = sc.sequenceFile[String,Int]("tachyon://.../123.sf2")
>> 
>> How can I do something similar with streaming?
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Gerard Maas
I created https://issues.apache.org/jira/browse/SPARK-2620 to track this.

Maybe useful to know, this is a regression on Spark 1.0.0. I tested the
same sample code on 0.9.1 and it worked (we have several jobs using case
classes as key aggregators, so it better does)

-kr, Gerard.


On Tue, Jul 22, 2014 at 5:37 PM, Gerard Maas  wrote:

> Yes, right. 'sc.parallelize(ps).map(x=> (**x.name**,1)).groupByKey().
> collect'
> An oversight from my side.
>
> Thanks!,  Gerard.
>
>
> On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann  > wrote:
>
>> I can confirm this bug. The behavior for groupByKey is the same as
>> reduceByKey - your example is actually grouping on just the name. Try
>> this:
>>
>> sc.parallelize(ps).map(x=> (x,1)).groupByKey().collect
>> res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
>> (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
>> (P(charly),ArrayBuffer(1)))
>>
>>
>> On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas 
>> wrote:
>>
>>> Just to narrow down the issue, it looks like the issue is in
>>> 'reduceByKey' and derivates like 'distinct'.
>>>
>>> groupByKey() seems to work
>>>
>>> sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
>>> res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
>>> (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))
>>>
>>>
>>>
>>> On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas 
>>> wrote:
>>>
 Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

 A minimal example:

 case class P(name:String)
 val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
 sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
 (P(bob),1), (P(abe),1), (P(charly),1))

 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) =>
 x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

 Any ideas why this doesn't work?

 -kr, Gerard.

>>>
>>>
>>
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
>> E: daniel.siegm...@velos.io W: www.velos.io
>>
>
>


Re: Spark Streaming source from Amazon Kinesis

2014-07-22 Thread Chris Fregly
i took this over from parviz.

i recently submitted a new PR for Kinesis Spark Streaming support:
https://github.com/apache/spark/pull/1434

others have tested it with good success, so give it a whirl!

waiting for it to be reviewed/merged.  please put any feedback into the PR
directly.

thanks!

-chris


On Mon, Apr 21, 2014 at 2:39 PM, Matei Zaharia 
wrote:

> No worries, looking forward to it!
>
> Matei
>
> On Apr 21, 2014, at 1:59 PM, Parviz Deyhim  wrote:
>
> sorry Matei. Will definitely start working on making the changes soon :)
>
>
> On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia 
> wrote:
>
>> There was a patch posted a few weeks ago (
>> https://github.com/apache/spark/pull/223), but it needs a few changes in
>> packaging because it uses a license that isn’t fully compatible with
>> Apache. I’d like to get this merged when the changes are made though — it
>> would be a good input source to support.
>>
>> Matei
>>
>>
>> On Apr 21, 2014, at 1:00 PM, Nicholas Chammas 
>> wrote:
>>
>> I'm looking to start experimenting with Spark Streaming, and I'd like to
>> use Amazon Kinesis  as my data source.
>> Looking at the list of supported Spark Streaming sources
>> ,
>> I don't see any mention of Kinesis.
>>
>> Is it possible to use Spark Streaming with Amazon Kinesis? If not, are
>> there plans to add such support in the future?
>>
>> Nick
>>
>>
>> --
>> View this message in context: Spark Streaming source from Amazon Kinesis
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com
>> .
>>
>>
>>
>
>


Re: data locality

2014-07-22 Thread Sandy Ryza
On standalone there is still special handling for assigning tasks within
executors.  There just isn't special handling for where to place executors,
because standalone generally places an executor on every node.


On Mon, Jul 21, 2014 at 7:42 PM, Haopu Wang  wrote:

>Sandy,
>
>
>
> I just tried the standalone cluster and didn't have chance to try Yarn yet.
>
> So if I understand correctly, there are **no** special handling of task
> assignment according to the HDFS block's location when Spark is running as
> a **standalone** cluster.
>
> Please correct me if I'm wrong. Thank you for your patience!
>
>
>  --
>
> *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
> *Sent:* 2014年7月22日 9:47
>
> *To:* user@spark.apache.org
> *Subject:* Re: data locality
>
>
>
> This currently only works for YARN.  The standalone default is to place an
> executor on every node for every job.
>
>
>
> The total number of executors is specified by the user.
>
>
>
> -Sandy
>
>
>
> On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang  wrote:
>
> Sandy,
>
>
>
> Do you mean the “preferred location” is working for standalone cluster
> also? Because I check the code of SparkContext and see comments as below:
>
>
>
>   // This is used only by YARN for now, but should be relevant to other
> cluster types (*Mesos*,
>
>   // etc) too. This is typically generated from
> InputFormatInfo.computePreferredLocations. It
>
>   // contains a map from *hostname* to a list of input format splits on
> the host.
>
>   *private*[spark] *var* preferredNodeLocationData: Map[String,
> Set[SplitInfo]] = Map()
>
>
>
> BTW, even with the preferred hosts, how does Spark decide how many total
> executors to use for this application?
>
>
>
> Thanks again!
>
>
>  --
>
> *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
> *Sent:* Friday, July 18, 2014 3:44 PM
> *To:* user@spark.apache.org
> *Subject:* Re: data locality
>
>
>
> Hi Haopu,
>
>
>
> Spark will ask HDFS for file block locations and try to assign tasks based
> on these.
>
>
>
> There is a snag.  Spark schedules its tasks inside of "executor" processes
> that stick around for the lifetime of a Spark application.  Spark requests
> executors before it runs any jobs, i.e. before it has any information about
> where the input data for the jobs is located.  If the executors occupy
> significantly fewer nodes than exist in the cluster, it can be difficult
> for Spark to achieve data locality.  The workaround for this is an API that
> allows passing in a set of preferred locations when instantiating a Spark
> context.  This API is currently broken in Spark 1.0, and will likely
> changed to be something a little simpler in a future release.
>
>
>
> val locData = InputFormatInfo.computePreferredLocations
>
>   (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new
> Path(“myfile.txt”)))
>
>
>
> val sc = new SparkContext(conf, locData)
>
>
>
> -Sandy
>
>
>
>
>
> On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang  wrote:
>
> I have a standalone spark cluster and a HDFS cluster which share some of
> nodes.
>
>
>
> When reading HDFS file, how does spark assign tasks to nodes? Will it ask
> HDFS the location for each file block in order to get a right worker node?
>
>
>
> How about a spark cluster on Yarn?
>
>
>
> Thank you very much!
>
>
>
>
>
>
>


Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-22 Thread Andre Schumacher

Hi,

I don't think anybody has been testing importing of Impala tables
directly. Is there any chance to export these first, say as
unpartitioned Hive tables and import these? Just an idea..

Andre

On 07/21/2014 11:46 PM, chutium wrote:
> no, something like this
> 
> 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2
> on 02.xxx: remote Akka client disassociated
> 
> ...
> ...
> 
> 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186)
> 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to
> java.io.IOException
> java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
> at
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
> at java.io.DataInputStream.readFully(DataInputStream.java:195)
> at java.io.DataInputStream.readFully(DataInputStream.java:169)
> at
> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
> at
> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
> at
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
> at
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
> at
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> 
> 
> ulimit is increased
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 



Spark sql with hive table running on Yarn-cluster mode

2014-07-22 Thread Jenny Zhao
Hi,

For running spark sql, the dataneuclus*.jar are automatically added in
classpath, this works fine for spark standalone mode and yarn-client mode,
however, for Yarn-cluster mode, I have to explicitly put these jars using
--jars option when submitting this job, otherwise, the job will fail, why
it won't  work for yarn-cluster mode?

Thank you for your help!

Jenny


Spark app vs SparkSQL app

2014-07-22 Thread buntu
I could possible use Spark API and write an batch app to provide some per web
page stats such as views, uniques etc. The same can be achieved using
SparkSQL, so wanted to check:

* what are the best practices and pros/cons of either of the approaches?
* Does SparkSQL require registerAsTable for every batch or the table created
persists?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-app-vs-SparkSQL-app-tp10422.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark sql with hive table running on Yarn-cluster mode

2014-07-22 Thread Andrew Or
Hi Jenny,

It won't work for yarn-cluster simply because this part of the code is
missing. For now, you will have to manually add them using `--jars`. Thanks
for letting us know; we will fix this by 1.1.

Andrew


2014-07-22 10:19 GMT-07:00 Jenny Zhao :

> Hi,
>
> For running spark sql, the dataneuclus*.jar are automatically added in
> classpath, this works fine for spark standalone mode and yarn-client mode,
> however, for Yarn-cluster mode, I have to explicitly put these jars using
> --jars option when submitting this job, otherwise, the job will fail, why
> it won't  work for yarn-cluster mode?
>
> Thank you for your help!
>
> Jenny
>


Re: Why spark-submit command hangs?

2014-07-22 Thread Andrew Or
Hi Earthson,

Is your problem resolved? The way you submit your application looks alright
to me; spark-submit should be able to parse the combination of --master and
--deploy-mode correctly. I suspect you might have hard-coded "yarn-cluster"
or something in your application.

Andrew


2014-07-22 1:25 GMT-07:00 Earthson :

> That's what my problem is:)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-spark-submit-command-hangs-tp10308p10394.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: hadoop version

2014-07-22 Thread Andrew Or
Hi Maria,

Having files that end with 1.0.0 means you're Spark 1.0, not Hadoop 1.0.
You can check your hadoop version by running "$HADOOP_HOME/bin/hadoop
version", where HADOOP_HOME is set to your installation of hadoop. On the
clusters started by the Spark ec2 scripts, this should be
"/root/ephemeral-hdfs".

Andrew


2014-07-22 7:07 GMT-07:00 mrm :

> Hi,
>
> Where can I find the version of Hadoop my cluster is using? I launched my
> ec2 cluster using the spark-ec2 script with the "--hadoop-major-version=2"
> option. However, the folder "hadoop-native/lib" in the master node only
> contains files that end in 1.0.0. Does that mean that I have Hadoop version
> 1?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-version-tp10405.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: spark streaming rate limiting from kafka

2014-07-22 Thread Bill Jay
Hi Tobias,

I tried to use 10 as numPartition. The number of executors allocated is the
number of DStream. Therefore, it seems the parameter does not spread data
into many partitions. In order to to that, it seems we have to do
repartition. If numPartitions will distribute the data to multiple
executors/partitions, then I will be able to save the running time incurred
by repartition.

Bill




On Mon, Jul 21, 2014 at 6:43 PM, Tobias Pfeiffer  wrote:

> Bill,
>
> numPartitions means the number of Spark partitions that the data received
> from Kafka will be split to. It has nothing to do with Kafka partitions, as
> far as I know.
>
> If you create multiple Kafka consumers, it doesn't seem like you can
> specify which consumer will consume which Kafka partitions. Instead, Kafka
> (at least with the interface that is exposed by the Spark Streaming API)
> will do something called rebalance and assign Kafka partitions to consumers
> evenly, you can see this in the client logs.
>
> When using multiple Kafka consumers with auto.offset.reset = true, please
> expect to run into this one:
> https://issues.apache.org/jira/browse/SPARK-2383
>
> Tobias
>
>
> On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay 
> wrote:
>
>> Hi Tathagata,
>>
>> I am currentlycreating multiple DStream to consumefrom different topics.
>> How can I let each consumer consume from different partitions. I find the
>> following parameters from Spark API:
>>
>> createStream[K, V, U <: Decoder[_], T <: Decoder[_]](jssc:
>> JavaStreamingContext
>> 
>> , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class
>> [U], valueDecoderClass: Class[T], kafkaParams: Map[String, String],
>> topics: Map[String, Integer],storageLevel: StorageLevel
>> 
>> ): JavaPairReceiverInputDStream
>> 
>> [K, V]
>>
>> Create an input stream that pulls messages form a Kafka Broker.
>>
>>
>>
>>
>> The topics parameter is:
>> *Map of (topic_name -> numPartitions) to consume. Each partition is
>> consumed in its own thread*
>>
>> Does numPartitions mean the total number of partitions to consume from
>> topic_name or the index of the partition? How can we specify for each
>> createStream which partition of the Kafka topic to consume? I think if so,
>> I will get a lot of parallelism from the source of the data. Thanks!
>>
>> Bill
>>
>> On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> You can create multiple kafka stream to partition your topics across
>>> them, which will run multiple receivers or multiple executors. This is
>>> covered in the Spark streaming guide.
>>> 
>>>
>>> And for the purpose of this thread, to answer the original question, we now
>>> have the ability
>>> 
>>> to limit the receiving rate. Its in the master branch, and will be
>>> available in Spark 1.1. It basically sets the limits at the receiver level
>>> (so applies to all sources) on what is the max records per second that can
>>> will be received by the receiver.
>>>
>>> TD
>>>
>>>
>>> On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer 
>>> wrote:
>>>
 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay 
 wrote:

> I also have an issue consuming from Kafka. When I consume from Kafka,
> there are always a single executor working on this job. Even I use
> repartition, it seems that there is still a single executor. Does anyone
> has an idea how to add parallelism to this job?
>
>
>
> On Thu, Jul 17, 2014 at 2:06 PM, Chen Song 
> wrote:
>
>> Thanks Luis and Tobias.
>>
>>
>> On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer 
>> wrote:
>>
>>> Hi,
>>>
>>> On Wed, Jul 2, 2014 at 1:57 AM, Chen Song 
>>> wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.

>>>
>>> Please see the post at
>>>
>>> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
>>> Kafka's auto.offset.reset parameter may be what you are looking 

Re: Spark job tracker.

2014-07-22 Thread Marcelo Vanzin
I don't understand what you're trying to do.

The code will use log4j under the covers. The default configuration
means writing log messages to stderr. In yarn-client mode that is your
terminal screen, in yarn-cluster mode that is redirected to a file by
Yarn. For the executors, that will always be redirected to a file
(since they're launched by yarn).

I don't know what you mean by "port". But if neither of those options
is what you're looking for, you need to look at providing a custom
log4j configuration that does what you want.


On Sun, Jul 20, 2014 at 11:05 PM, abhiguruvayya
 wrote:
> Hello Marcelo Vanzin,
>
> Can you explain bit more on this? I tried using client mode but can you
> explain how can i use this port to write the log or output to this
> port?Thanks in advance!
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10287.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Marcelo


combineByKey at ShuffledDStream.scala

2014-07-22 Thread Bill Jay
Hi all,

I am currently running a Spark Streaming program, which consumes data from
Kakfa and does the group by operation on the data. I try to optimize the
running time of the program because it looks slow to me. It seems the stage
named:

* combineByKey at ShuffledDStream.scala:42 *

always takes the longest running time. And If I open this stage, I only see
two executors on this stage. Does anyone has an idea what this stage does
and how to increase the speed for this stage? Thanks!

Bill


Very wierd behavior

2014-07-22 Thread Nathan Kronenfeld
I was wondering if anyone could provide an explanation for the behavior I'm
seeing.

I have an RDD, call it foo, not too complex, with a maybe 8 level deep DAG
with 2 shuffles, not empty, not even terribly big - small enough that some
partitions could be empty.

When I run foo.first, I get workers disconnecting, and applications die
When I run foo.mapPartitions.saveAsHadoopDataset, it works fine.

Anyone got an explanation for why that might be?

-Thanks, Nathan


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-22 Thread Sandy Ryza
I haven't had a chance to look at the details of this issue, but we have
seen Spark successfully read Parquet tables created by Impala.


On Tue, Jul 22, 2014 at 10:10 AM, Andre Schumacher 
wrote:

>
> Hi,
>
> I don't think anybody has been testing importing of Impala tables
> directly. Is there any chance to export these first, say as
> unpartitioned Hive tables and import these? Just an idea..
>
> Andre
>
> On 07/21/2014 11:46 PM, chutium wrote:
> > no, something like this
> >
> > 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost
> executor 2
> > on 02.xxx: remote Akka client disassociated
> >
> > ...
> > ...
> >
> > 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task
> 1.2:186)
> > 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to
> > java.io.IOException
> > java.io.IOException: Filesystem closed
> > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
> > at
> >
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
> > at
> > org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
> > at java.io.DataInputStream.readFully(DataInputStream.java:195)
> > at java.io.DataInputStream.readFully(DataInputStream.java:169)
> > at
> >
> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
> > at
> >
> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
> > at
> >
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
> > at
> >
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
> > at
> >
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
> > at
> > org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
> > at
> >
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> > at
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> > at org.apache.spark.scheduler.Task.run(Task.scala:51)
> > at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > at java.lang.Thread.run(Thread.java:744)
> >
> >
> > ulimit is increased
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
>
>


Need info on log4j.properties for apache spark.

2014-07-22 Thread abhiguruvayya
Hello All,

Basically i need to edit the log4j.properties to filter some of the
unnecessary logs in spark on yarn-client mode. I am not sure where can i
find log4j.properties file (location). Can any one help me on this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Need-info-on-log4j-properties-for-apache-spark-tp10431.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark job tracker.

2014-07-22 Thread abhiguruvayya
I fixed the error with the yarn-client mode issue which i mentioned in my
earlier post. Now i want to edit the log4j.properties to filter some of the
unnecessary logs. Can you let me know where can i find this properties file.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10433.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark Streaming: no job has started yet

2014-07-22 Thread Bill Jay
Hi all,

I am running a spark streaming job. The job hangs on one stage, which shows
as follows:

Details for Stage 4
Summary MetricsNo tasks have started yetTasksNo tasks have started yet



Does anyone have an idea on this?

Thanks!

Bill
Bill


What if there are large, read-only variables shared by all map functions?

2014-07-22 Thread Parthus
Hi there,

I was wondering if anybody could help me find an efficient way to make a
MapReduce program like this:

1) For each map function, it need access some huge files, which is around
6GB

2) These files are READ-ONLY. Actually they are like some huge look-up
table, which will not change during 2~3 years.

I tried two ways to make the program work, but neither of them is efficient:

1) The first approach I tried is to let each map function load those files
independently, like this:

map (...) { load(files); DoMapTask(...)}

2) The second approach I tried is to load the files before RDD.map(...) and
broadcast the files. However, because the files are too large, the
broadcasting overhead is 30min ~ 1 hour.

Could anybody help me find an efficient way to solve it?

Thanks very much.










--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Very wierd behavior

2014-07-22 Thread Matei Zaharia
Is the first() being computed locally on the driver program? Maybe it's to hard 
to compute with the memory, etc available there. Take a look at the driver's 
log and see whether it has the message "Computing the requested partition 
locally". 

Matei

On Jul 22, 2014, at 12:04 PM, Nathan Kronenfeld  
wrote:

> I was wondering if anyone could provide an explanation for the behavior I'm 
> seeing.
> 
> I have an RDD, call it foo, not too complex, with a maybe 8 level deep DAG 
> with 2 shuffles, not empty, not even terribly big - small enough that some 
> partitions could be empty.
> 
> When I run foo.first, I get workers disconnecting, and applications die
> When I run foo.mapPartitions.saveAsHadoopDataset, it works fine.
> 
> Anyone got an explanation for why that might be?
> 
> -Thanks, Nathan
> 
> 
> -- 
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com



RE: Hive From Spark

2014-07-22 Thread Andrew Lee
Hi Sean,
Thanks for clarifying. I re-read SPARK-2420 and now have a better understanding.
>From a user perspective, what would you recommend to build Spark with Hive 
>0.12 / 0.13+ libraries moving forward and deploy to production cluster that 
>runs on a older version of Hadoop (e.g. 2.2 or 2.4) ?
My concern is that there's going to be a lag for technology adoption and since 
Spark is moving fast, the libraries may always be newer. Protobuf is one good 
example, shading. From a biz point of view, if there is no benefit to upgrade 
the library, the chances that this will happen with a higher priority is low 
due to stability concern and re-running the entire test suite. Just by 
observation, there's still a lot of ppl running Hadoop 2.2 instead of 2.4 or 
2.5 and the release and upgrade is depending on other big players such as 
Cloudera, Hortonwork, etc for their distro. Not to mention the process of 
upgrading.
Is there any benefit to use Guava 14 in Spark? I believe there is usually some 
competitive reason why Spark choose Guava 14, however, I'm not sure if anyone 
raise that in the conversation so I don't know if that is necessary.
Looking forward to seeing Hive on Spark to work soon. Please let me know if 
there's any help or feedback I can provide.
Thanks Sean.


> From: so...@cloudera.com
> Date: Mon, 21 Jul 2014 18:36:10 +0100
> Subject: Re: Hive From Spark
> To: user@spark.apache.org
> 
> I haven't seen anyone actively 'unwilling' -- I hope not. See
> discussion at https://issues.apache.org/jira/browse/SPARK-2420 where I
> sketch what a downgrade means. I think it just hasn't gotten a looking
> over.
> 
> Contrary to what I thought earlier, the conflict does in fact cause
> problems in theory, and you show it causes a problem in practice. Not
> to mention it causes issues for Hive-on-Spark now.
> 
> On Mon, Jul 21, 2014 at 6:27 PM, Andrew Lee  wrote:
> > Hive and Hadoop are using an older version of guava libraries (11.0.1) where
> > Spark Hive is using guava 14.0.1+.
> > The community isn't willing to downgrade to 11.0.1 which is the current
> > version for Hadoop 2.2 and Hive 0.12.
  

Re: Spark job tracker.

2014-07-22 Thread Marcelo Vanzin
You can upload your own log4j.properties using spark-submit's
"--files" argument.

On Tue, Jul 22, 2014 at 12:45 PM, abhiguruvayya
 wrote:
> I fixed the error with the yarn-client mode issue which i mentioned in my
> earlier post. Now i want to edit the log4j.properties to filter some of the
> unnecessary logs. Can you let me know where can i find this properties file.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10433.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Marcelo


RE: Tranforming flume events using Spark transformation functions

2014-07-22 Thread Sundaram, Muthu X.
I tried to map SparkFlumeEvents to String of RDDs like below. But that map and 
call are not at all executed. I might be doing this in a wrong way. Any help 
would be appreciated.

flumeStream.foreach(new Function,Void> () {
  @Override
  public Void call(JavaRDD eventsData) throws 
Exception {
System.out.println("<>>>");

JavaRDD records = eventsData.map(
new Function() {
@Override
public String call(SparkFlumeEvent flume) throws Exception {
String logRecord = null;
AvroFlumeEvent avroEvent = null;
  ByteBuffer bytePayload = null;


System.out.println("<>>>");
/* List events = flume.collect();
 Iterator batchedEvents = 
events.iterator(); 

SparkFlumeEvent flumeEvent = batchedEvents.next();*/
avroEvent = flume.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());


System.out.println("<< () {
  @Override
  public Void call(JavaRDD eventsData) throws 
Exception {
 String logRecord = null;
 List events = eventsData.collect();
 Iterator batchedEvents = 
events.iterator();
 long t1 = System.currentTimeMillis();
 AvroFlumeEvent avroEvent = null;
 ByteBuffer bytePayload = null;
 // All the user level data is carried as payload in Flume 
Event
 while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());

System.out.println("LOG RECORD = " + 
logRecord); }

Where do I create new JavaRDD? DO I do it before this loop? How do I 
create this JavaRDD?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu




Re: Spark job tracker.

2014-07-22 Thread abhiguruvayya
Thanks i am able to load the file now. Can i turn off specific logs using
log4j.properties. I don't want to see the below logs. How can i do this.

14/07/22 14:01:24 INFO scheduler.TaskSetManager: Starting task 2.0:129 as
TID 129 on executor 3: ** (NODE_LOCAL)
14/07/22 14:01:24 INFO scheduler.TaskSetManager: Serialized task 2.0:129 as
14708 bytes in 0 ms

*current log4j.properties entry:*

# make a file appender and a console appender
# Print the date in ISO 8601 format
log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c -
%m%n
log4j.appender.myFileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.myFileAppender.File=spark.log
log4j.appender.myFileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myFileAppender.layout.ConversionPattern=%d [%t] %-5p %c -
%m%n



# By default, everything goes to console and file
log4j.rootLogger=INFO, myConsoleAppender, myFileAppender

# The noisier spark logs go to file only
log4j.logger.spark.storage=INFO, myFileAppender
log4j.additivity.spark.storage=false
log4j.logger.spark.scheduler=INFO, myFileAppender
log4j.additivity.spark.scheduler=false
log4j.logger.spark.CacheTracker=INFO, myFileAppender
log4j.additivity.spark.CacheTracker=false
log4j.logger.spark.CacheTrackerActor=INFO, myFileAppender
log4j.additivity.spark.CacheTrackerActor=false
log4j.logger.spark.MapOutputTrackerActor=INFO, myFileAppender
log4j.additivity.spark.MapOutputTrackerActor=false
log4j.logger.spark.MapOutputTracker=INFO, myFileAppender
log4j.additivty.spark.MapOutputTracker=false



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10440.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Apache kafka + spark + Parquet

2014-07-22 Thread buntu
> Now we are storing Data direct from Kafka to Parquet.

We are currently using Camus and wanted to know how you went about storing
to Parquet?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-kafka-spark-Parquet-tp10037p10441.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark job tracker.

2014-07-22 Thread Marcelo Vanzin
The spark log classes are based on the actual class names. So if you
want to filter out a package's logs you need to specify the full
package name (e.g. "org.apache.spark.storage" instead of just
"spark.storage").

On Tue, Jul 22, 2014 at 2:07 PM, abhiguruvayya
 wrote:
> Thanks i am able to load the file now. Can i turn off specific logs using
> log4j.properties. I don't want to see the below logs. How can i do this.
>
> 14/07/22 14:01:24 INFO scheduler.TaskSetManager: Starting task 2.0:129 as
> TID 129 on executor 3: ** (NODE_LOCAL)
> 14/07/22 14:01:24 INFO scheduler.TaskSetManager: Serialized task 2.0:129 as
> 14708 bytes in 0 ms
>
> *current log4j.properties entry:*
>
> # make a file appender and a console appender
> # Print the date in ISO 8601 format
> log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
> log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c -
> %m%n
> log4j.appender.myFileAppender=org.apache.log4j.RollingFileAppender
> log4j.appender.myFileAppender.File=spark.log
> log4j.appender.myFileAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.myFileAppender.layout.ConversionPattern=%d [%t] %-5p %c -
> %m%n
>
>
>
> # By default, everything goes to console and file
> log4j.rootLogger=INFO, myConsoleAppender, myFileAppender
>
> # The noisier spark logs go to file only
> log4j.logger.spark.storage=INFO, myFileAppender
> log4j.additivity.spark.storage=false
> log4j.logger.spark.scheduler=INFO, myFileAppender
> log4j.additivity.spark.scheduler=false
> log4j.logger.spark.CacheTracker=INFO, myFileAppender
> log4j.additivity.spark.CacheTracker=false
> log4j.logger.spark.CacheTrackerActor=INFO, myFileAppender
> log4j.additivity.spark.CacheTrackerActor=false
> log4j.logger.spark.MapOutputTrackerActor=INFO, myFileAppender
> log4j.additivity.spark.MapOutputTrackerActor=false
> log4j.logger.spark.MapOutputTracker=INFO, myFileAppender
> log4j.additivty.spark.MapOutputTracker=false
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10440.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Marcelo


How to do an interactive Spark SQL

2014-07-22 Thread hsy...@gmail.com
Hi guys,

I'm able to run some Spark SQL example but the sql is static in the code. I
would like to know is there a way to read sql from somewhere else (shell
for example)

I could read sql statement from kafka/zookeeper, but I cannot share the sql
to all workers. broadcast seems not working for updating values.

Moreover if I use some non-serializable class(DataInputStream etc) to read
sql from other source, I always get "Task not serializable:
java.io.NotSerializableException"


Best,
Siyuan


Re: How to do an interactive Spark SQL

2014-07-22 Thread Zongheng Yang
Do you mean that the texts of the SQL queries being hardcoded in the
code? What do you mean by "cannot shar the sql to all workers"?

On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com  wrote:
> Hi guys,
>
> I'm able to run some Spark SQL example but the sql is static in the code. I
> would like to know is there a way to read sql from somewhere else (shell for
> example)
>
> I could read sql statement from kafka/zookeeper, but I cannot share the sql
> to all workers. broadcast seems not working for updating values.
>
> Moreover if I use some non-serializable class(DataInputStream etc) to read
> sql from other source, I always get "Task not serializable:
> java.io.NotSerializableException"
>
>
> Best,
> Siyuan


Spark clustered client

2014-07-22 Thread Asaf Lahav
Hi Folks,

I have been trying to dig up some information in regards to what are the
possibilities when wanting to deploy more than one client process that
consumes Spark.

Let's say I have a Spark Cluster of 10 servers, and would like to setup 2
additional servers which are sending requests to it through a Spark
context, referencing one specific file of 1TB of data.

Each client process, has its own SparkContext instance.
Currently, the result is that that same file is loaded into memory twice
because the Spark Context resources are not shared between processes/jvms.


I wouldn't like to have that same file loaded over and over again with
every new client being introduced.
What would be the best practice here? Am I missing something?

Thank you,
Asaf


Re: How to do an interactive Spark SQL

2014-07-22 Thread hsy...@gmail.com
Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how
do I broadcast the sql to all workers that is doing sql analysis.

Best,
Siyuan


On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang  wrote:

> Do you mean that the texts of the SQL queries being hardcoded in the
> code? What do you mean by "cannot shar the sql to all workers"?
>
> On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com 
> wrote:
> > Hi guys,
> >
> > I'm able to run some Spark SQL example but the sql is static in the
> code. I
> > would like to know is there a way to read sql from somewhere else (shell
> for
> > example)
> >
> > I could read sql statement from kafka/zookeeper, but I cannot share the
> sql
> > to all workers. broadcast seems not working for updating values.
> >
> > Moreover if I use some non-serializable class(DataInputStream etc) to
> read
> > sql from other source, I always get "Task not serializable:
> > java.io.NotSerializableException"
> >
> >
> > Best,
> > Siyuan
>


Re: How to do an interactive Spark SQL

2014-07-22 Thread Zongheng Yang
Can you paste a small code example to illustrate your questions?

On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com  wrote:
> Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how
> do I broadcast the sql to all workers that is doing sql analysis.
>
> Best,
> Siyuan
>
>
> On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang  wrote:
>>
>> Do you mean that the texts of the SQL queries being hardcoded in the
>> code? What do you mean by "cannot shar the sql to all workers"?
>>
>> On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com 
>> wrote:
>> > Hi guys,
>> >
>> > I'm able to run some Spark SQL example but the sql is static in the
>> > code. I
>> > would like to know is there a way to read sql from somewhere else (shell
>> > for
>> > example)
>> >
>> > I could read sql statement from kafka/zookeeper, but I cannot share the
>> > sql
>> > to all workers. broadcast seems not working for updating values.
>> >
>> > Moreover if I use some non-serializable class(DataInputStream etc) to
>> > read
>> > sql from other source, I always get "Task not serializable:
>> > java.io.NotSerializableException"
>> >
>> >
>> > Best,
>> > Siyuan
>
>


Re: the implications of some items in webUI

2014-07-22 Thread Ankur Dave
On Tue, Jul 22, 2014 at 7:08 AM, Yifan LI  wrote:

> 1) what is the difference between "Duration"(Stages -> Completed Stages)
> and "Task Time"(Executors) ?


Stages are composed of tasks that run on executors. Tasks within a stage
may run concurrently, since there are multiple executors and each executor
may run more than one task at a time.

 An executor's task time is the sum of the durations of all of its tasks.
Because this is a simple sum, it does not take parallelism into account: if
an executor runs 8 tasks concurrently and each takes a minute, it has only
spent one minute of wallclock time, but the reported task time will be 8
minutes.

A stage's duration is how much wallclock time elapsed between when the
first task launched and when the last task finished. This does take
parallelism into account, so in the above example the stage duration would
be 1 minute.

2) what are the exact meanings of "Shuffle Read/Shuffle Write"?


Stages communicate using shuffles. Each task may start by reading shuffle
inputs across the network, and may end by writing shuffle outputs to disk
locally. See page 7 of the Spark NSDI paper

for details.

Shuffle read and shuffle write refer to the total amount of data that a
stage read across the network and wrote to disk.

Ankur 


RE: Joining by timestamp.

2014-07-22 Thread durga
Thanks Chen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10449.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How could I start new spark cluster with hadoop2.0.2

2014-07-22 Thread durga
Hi,

I am trying to create spark cluster using spark-ec2 file under spark1.0.1
directory.

1) I noticed that It is always creating hadoop version 1.0.4.Is there a way
I can override that?I would like to have hadoop2.0.2

2) I also wants install Oozie along with. Is there any scrips available
along with spark-ec2, which can create oozie instances for me.

Thanks,
D.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-start-new-spark-cluster-with-hadoop2-0-2-tp10450.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to do an interactive Spark SQL

2014-07-22 Thread hsy...@gmail.com
For example, this is what I tested and work on local mode, what it does is
it get data and sql query both from kafka and do sql on each RDD and output
the result back to kafka again
I defined a var called *sqlS. * In the streaming part as you can see I
change the sql statement if it consumes a sql message from kafka then next
time when you do *sql(sqlS) *it execute the updated sql query.

But this code doesn't work in cluster because sqlS is not updated on all
the workers from what I understand.

So my question is how do I change the sqlS value at runtime and make all
the workers pick the latest value.


*var sqlS = "select count(*) from records"*
val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
args
val sparkConf = new SparkConf().setAppName("KafkaSpark")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new SQLContext(sc)

// Importing the SQL context gives access to all the SQL functions and
implicit conversions.
import sqlContext._
import sqlContext.createSchemaRDD

//val tt = Time(5000)
val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt,
sqltopic -> 2)
val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS =
t._2;* false } else true }).map(t => getRecord(t._2.split("#")))

val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer)

val brokerString =
ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")

KafkaSpark.props.put("metadata.broker.list", brokerString)
val config = new ProducerConfig(KafkaSpark.props)
val producer = new Producer[String, String](config)

val result = recordsStream.foreachRDD((recRDD) => {
  val schemaRDD = sqlContext.createSchemaRDD(recRDD)
  schemaRDD.registerAsTable(tName)
  val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => { s
+ r.mkString(",") + "\n" })
  producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
$sqlS \n $result"))
})
ssc.start()
ssc.awaitTermination()


On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang  wrote:

> Can you paste a small code example to illustrate your questions?
>
> On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com 
> wrote:
> > Sorry, typo. What I mean is sharing. If the sql is changing at runtime,
> how
> > do I broadcast the sql to all workers that is doing sql analysis.
> >
> > Best,
> > Siyuan
> >
> >
> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang 
> wrote:
> >>
> >> Do you mean that the texts of the SQL queries being hardcoded in the
> >> code? What do you mean by "cannot shar the sql to all workers"?
> >>
> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com 
> >> wrote:
> >> > Hi guys,
> >> >
> >> > I'm able to run some Spark SQL example but the sql is static in the
> >> > code. I
> >> > would like to know is there a way to read sql from somewhere else
> (shell
> >> > for
> >> > example)
> >> >
> >> > I could read sql statement from kafka/zookeeper, but I cannot share
> the
> >> > sql
> >> > to all workers. broadcast seems not working for updating values.
> >> >
> >> > Moreover if I use some non-serializable class(DataInputStream etc) to
> >> > read
> >> > sql from other source, I always get "Task not serializable:
> >> > java.io.NotSerializableException"
> >> >
> >> >
> >> > Best,
> >> > Siyuan
> >
> >
>


Re: How to do an interactive Spark SQL

2014-07-22 Thread Tobias Pfeiffer
Hi,

as far as I know, after the Streaming Context has started, the processing
pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
statement is transformed into RDD operations when the Streaming Context
starts, I think there is no way to change the statement that is executed on
the current stream after the StreamingContext has started.

Tobias


On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com  wrote:

> For example, this is what I tested and work on local mode, what it does is
> it get data and sql query both from kafka and do sql on each RDD and output
> the result back to kafka again
> I defined a var called *sqlS. * In the streaming part as you can see I
> change the sql statement if it consumes a sql message from kafka then next
> time when you do *sql(sqlS) *it execute the updated sql query.
>
> But this code doesn't work in cluster because sqlS is not updated on all
> the workers from what I understand.
>
> So my question is how do I change the sqlS value at runtime and make all
> the workers pick the latest value.
>
>
> *var sqlS = "select count(*) from records"*
> val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
> args
> val sparkConf = new SparkConf().setAppName("KafkaSpark")
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(2))
> val sqlContext = new SQLContext(sc)
>
> // Importing the SQL context gives access to all the SQL functions and
> implicit conversions.
> import sqlContext._
> import sqlContext.createSchemaRDD
>
> //val tt = Time(5000)
> val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt,
> sqltopic -> 2)
> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS =
> t._2;* false } else true }).map(t => getRecord(t._2.split("#")))
>
> val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer)
>
> val brokerString =
> ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")
>
> KafkaSpark.props.put("metadata.broker.list", brokerString)
> val config = new ProducerConfig(KafkaSpark.props)
> val producer = new Producer[String, String](config)
>
> val result = recordsStream.foreachRDD((recRDD) => {
>   val schemaRDD = sqlContext.createSchemaRDD(recRDD)
>   schemaRDD.registerAsTable(tName)
>   val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => {
> s + r.mkString(",") + "\n" })
>   producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
> $sqlS \n $result"))
> })
> ssc.start()
> ssc.awaitTermination()
>
>
> On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang 
> wrote:
>
>> Can you paste a small code example to illustrate your questions?
>>
>> On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com 
>> wrote:
>> > Sorry, typo. What I mean is sharing. If the sql is changing at runtime,
>> how
>> > do I broadcast the sql to all workers that is doing sql analysis.
>> >
>> > Best,
>> > Siyuan
>> >
>> >
>> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang 
>> wrote:
>> >>
>> >> Do you mean that the texts of the SQL queries being hardcoded in the
>> >> code? What do you mean by "cannot shar the sql to all workers"?
>> >>
>> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com 
>> >> wrote:
>> >> > Hi guys,
>> >> >
>> >> > I'm able to run some Spark SQL example but the sql is static in the
>> >> > code. I
>> >> > would like to know is there a way to read sql from somewhere else
>> (shell
>> >> > for
>> >> > example)
>> >> >
>> >> > I could read sql statement from kafka/zookeeper, but I cannot share
>> the
>> >> > sql
>> >> > to all workers. broadcast seems not working for updating values.
>> >> >
>> >> > Moreover if I use some non-serializable class(DataInputStream etc) to
>> >> > read
>> >> > sql from other source, I always get "Task not serializable:
>> >> > java.io.NotSerializableException"
>> >> >
>> >> >
>> >> > Best,
>> >> > Siyuan
>> >
>> >
>>
>
>


streaming window not behaving as advertised (v1.0.1)

2014-07-22 Thread Alan Ngai
I have a sample application pumping out records 1 per second.  The batch 
interval is set to 5 seconds.  Here’s a list of “observed window intervals” vs 
what was actually set

window=25, slide=25 : observed-window=25, overlapped-batches=0
window=25, slide=20 : observed-window=20, overlapped-batches=0
window=25, slide=15 : observed-window=15, overlapped-batches=0
window=25, slide=10 : observed-window=20, overlapped-batches=2
window=25, slide=5 : observed-window=25, overlapped-batches=3

can someone explain this behavior to me?  I’m trying to aggregate metrics by 
time batches, but want to skip partial batches.  Therefore, I’m trying to find 
a combination which results in 1 overlapped batch, but no combination I tried 
gets me there.  

Alan



Re: How to do an interactive Spark SQL

2014-07-22 Thread hsy...@gmail.com
But how do they do the interactive sql in the demo?
https://www.youtube.com/watch?v=dJQ5lV5Tldw

And if it can work in the local mode. I think it should be able to work in
cluster mode, correct?


On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer  wrote:

> Hi,
>
> as far as I know, after the Streaming Context has started, the processing
> pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
> statement is transformed into RDD operations when the Streaming Context
> starts, I think there is no way to change the statement that is executed on
> the current stream after the StreamingContext has started.
>
> Tobias
>
>
> On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com 
> wrote:
>
>> For example, this is what I tested and work on local mode, what it does
>> is it get data and sql query both from kafka and do sql on each RDD and
>> output the result back to kafka again
>> I defined a var called *sqlS. * In the streaming part as you can see I
>> change the sql statement if it consumes a sql message from kafka then next
>> time when you do *sql(sqlS) *it execute the updated sql query.
>>
>> But this code doesn't work in cluster because sqlS is not updated on all
>> the workers from what I understand.
>>
>> So my question is how do I change the sqlS value at runtime and make all
>> the workers pick the latest value.
>>
>>
>> *var sqlS = "select count(*) from records"*
>> val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
>> args
>> val sparkConf = new SparkConf().setAppName("KafkaSpark")
>> val sc = new SparkContext(sparkConf)
>> val ssc = new StreamingContext(sc, Seconds(2))
>> val sqlContext = new SQLContext(sc)
>>
>> // Importing the SQL context gives access to all the SQL functions
>> and implicit conversions.
>> import sqlContext._
>> import sqlContext.createSchemaRDD
>>
>> //val tt = Time(5000)
>> val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt,
>> sqltopic -> 2)
>> val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS =
>> t._2;* false } else true }).map(t => getRecord(t._2.split("#")))
>>
>> val zkClient = new ZkClient(zkQuorum, 3, 3,
>> ZKStringSerializer)
>>
>> val brokerString =
>> ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")
>>
>> KafkaSpark.props.put("metadata.broker.list", brokerString)
>> val config = new ProducerConfig(KafkaSpark.props)
>> val producer = new Producer[String, String](config)
>>
>> val result = recordsStream.foreachRDD((recRDD) => {
>>   val schemaRDD = sqlContext.createSchemaRDD(recRDD)
>>   schemaRDD.registerAsTable(tName)
>>   val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => {
>> s + r.mkString(",") + "\n" })
>>   producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
>> $sqlS \n $result"))
>> })
>> ssc.start()
>> ssc.awaitTermination()
>>
>>
>> On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang 
>> wrote:
>>
>>> Can you paste a small code example to illustrate your questions?
>>>
>>> On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com 
>>> wrote:
>>> > Sorry, typo. What I mean is sharing. If the sql is changing at
>>> runtime, how
>>> > do I broadcast the sql to all workers that is doing sql analysis.
>>> >
>>> > Best,
>>> > Siyuan
>>> >
>>> >
>>> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang 
>>> wrote:
>>> >>
>>> >> Do you mean that the texts of the SQL queries being hardcoded in the
>>> >> code? What do you mean by "cannot shar the sql to all workers"?
>>> >>
>>> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com 
>>> >> wrote:
>>> >> > Hi guys,
>>> >> >
>>> >> > I'm able to run some Spark SQL example but the sql is static in the
>>> >> > code. I
>>> >> > would like to know is there a way to read sql from somewhere else
>>> (shell
>>> >> > for
>>> >> > example)
>>> >> >
>>> >> > I could read sql statement from kafka/zookeeper, but I cannot share
>>> the
>>> >> > sql
>>> >> > to all workers. broadcast seems not working for updating values.
>>> >> >
>>> >> > Moreover if I use some non-serializable class(DataInputStream etc)
>>> to
>>> >> > read
>>> >> > sql from other source, I always get "Task not serializable:
>>> >> > java.io.NotSerializableException"
>>> >> >
>>> >> >
>>> >> > Best,
>>> >> > Siyuan
>>> >
>>> >
>>>
>>
>>
>


Where is the "PowerGraph abstraction"

2014-07-22 Thread shijiaxin
I download the spark 1.0.1, but I cannot find the "PowerGraph abstraction"
mentioned in the GraphX paper.
What I can find is the pregel abstraction.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-is-the-PowerGraph-abstraction-tp10457.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming source from Amazon Kinesis

2014-07-22 Thread Tathagata Das
I will take a look at it tomorrow!

TD


On Tue, Jul 22, 2014 at 9:30 AM, Chris Fregly  wrote:

> i took this over from parviz.
>
> i recently submitted a new PR for Kinesis Spark Streaming support:
> https://github.com/apache/spark/pull/1434
>
> others have tested it with good success, so give it a whirl!
>
> waiting for it to be reviewed/merged.  please put any feedback into the PR
> directly.
>
> thanks!
>
> -chris
>
>
> On Mon, Apr 21, 2014 at 2:39 PM, Matei Zaharia 
> wrote:
>
>> No worries, looking forward to it!
>>
>> Matei
>>
>> On Apr 21, 2014, at 1:59 PM, Parviz Deyhim  wrote:
>>
>> sorry Matei. Will definitely start working on making the changes soon :)
>>
>>
>> On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia 
>> wrote:
>>
>>> There was a patch posted a few weeks ago (
>>> https://github.com/apache/spark/pull/223), but it needs a few changes
>>> in packaging because it uses a license that isn’t fully compatible with
>>> Apache. I’d like to get this merged when the changes are made though — it
>>> would be a good input source to support.
>>>
>>> Matei
>>>
>>>
>>> On Apr 21, 2014, at 1:00 PM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
>>> I'm looking to start experimenting with Spark Streaming, and I'd like to
>>> use Amazon Kinesis  as my data source.
>>> Looking at the list of supported Spark Streaming sources
>>> ,
>>> I don't see any mention of Kinesis.
>>>
>>> Is it possible to use Spark Streaming with Amazon Kinesis? If not, are
>>> there plans to add such support in the future?
>>>
>>> Nick
>>>
>>>
>>> --
>>> View this message in context: Spark Streaming source from Amazon Kinesis
>>> 
>>> Sent from the Apache Spark User List mailing list archive
>>>  at Nabble.com
>>> .
>>>
>>>
>>>
>>
>>
>


Re: combineByKey at ShuffledDStream.scala

2014-07-22 Thread Tathagata Das
Can you give an idea of the streaming program? Rest of the transformation
you are doing on the input streams?


On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay 
wrote:

> Hi all,
>
> I am currently running a Spark Streaming program, which consumes data from
> Kakfa and does the group by operation on the data. I try to optimize the
> running time of the program because it looks slow to me. It seems the stage
> named:
>
> * combineByKey at ShuffledDStream.scala:42 *
>
> always takes the longest running time. And If I open this stage, I only
> see two executors on this stage. Does anyone has an idea what this stage
> does and how to increase the speed for this stage? Thanks!
>
> Bill
>


Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-22 Thread Yin Huai
It is caused by a bug in Spark REPL. I still do not know which part of the
REPL code causes it... I think people working REPL may have better idea.

Regarding how I found it, based on exception, it seems we pulled in some
irrelevant stuff and that import was pretty suspicious.

Thanks,

Yin


On Tue, Jul 22, 2014 at 12:53 AM, Victor Sheng 
wrote:

> Hi, Yin Huai
> I test again with your snippet code.
> It works well in spark-1.0.1
>
> Here is my code:
>
>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>  case class Record(data_date: String, mobile: String, create_time: String)
>  val mobile = Record("2014-07-20","1234567","2014-07-19")
>  val lm = List(mobile)
>  val mobileRDD = sc.makeRDD(lm)
>  val mobileSchemaRDD = sqlContext.createSchemaRDD(mobileRDD)
>  mobileSchemaRDD.registerAsTable("mobile")
>  sqlContext.sql("select count(1) from mobile").collect()
>
> The Result is like below:
> 14/07/22 15:49:53 INFO spark.SparkContext: Job finished: collect at
> SparkPlan.scala:52, took 0.296864832 s
> res9: Array[org.apache.spark.sql.Row] = Array([1])
>
>
>But what is the main cause of this exception? And how you find it out by
> looking some unknown characters like $line11.$read$
> $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$ ?
>
> Thanks,
> Victor
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10390.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: streaming window not behaving as advertised (v1.0.1)

2014-07-22 Thread Tathagata Das
It could be related to this bug that is currently open.
https://issues.apache.org/jira/browse/SPARK-1312

Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and
try these combos again?

TD


On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai  wrote:

> I have a sample application pumping out records 1 per second.  The batch
> interval is set to 5 seconds.  Here’s a list of “observed window intervals”
> vs what was actually set
>
> window=25, slide=25 : observed-window=25, overlapped-batches=0
> window=25, slide=20 : observed-window=20, overlapped-batches=0
> window=25, slide=15 : observed-window=15, overlapped-batches=0
> window=25, slide=10 : observed-window=20, overlapped-batches=2
> window=25, slide=5 : observed-window=25, overlapped-batches=3
>
> can someone explain this behavior to me?  I’m trying to aggregate metrics
> by time batches, but want to skip partial batches.  Therefore, I’m trying
> to find a combination which results in 1 overlapped batch, but no
> combination I tried gets me there.
>
> Alan
>
>


Re: Tranforming flume events using Spark transformation functions

2014-07-22 Thread Tathagata Das
This is because of the RDD's lazy evaluation! Unless you force a
transformed (mapped/filtered/etc.) RDD to give you back some data (like
RDD.count) or output the data (like RDD.saveAsTextFile()), Spark will not
do anything.

So after the eventData.map(...), if you do take(10) and then print the
result, you should seem 10 items from each batch be printed.

Also you can do the same map operation on the Dstream as well. FYI.

inputDStream.map(...).foreachRDD(...) is equivalent to
 inputDStream.foreachRDD( // call rdd.map(...) )

Either way you have to call some RDD "action" (count, collect, take,
saveAsHadoopFile, etc.)  that asks the system to something concrete with
the data.

TD




On Tue, Jul 22, 2014 at 1:55 PM, Sundaram, Muthu X. <
muthu.x.sundaram@sabre.com> wrote:

> I tried to map SparkFlumeEvents to String of RDDs like below. But that map
> and call are not at all executed. I might be doing this in a wrong way. Any
> help would be appreciated.
>
> flumeStream.foreach(new Function,Void> () {
>   @Override
>   public Void call(JavaRDD eventsData) throws
> Exception {
> System.out.println("< each...call");
>
> JavaRDD records = eventsData.map(
> new Function() {
> @Override
> public String call(SparkFlumeEvent flume) throws Exception
> {
> String logRecord = null;
> AvroFlumeEvent avroEvent = null;
>   ByteBuffer bytePayload = null;
>
>
>   System.out.println("<>>>");
> /* List events = flume.collect();
>  Iterator batchedEvents =
> events.iterator();
>
> SparkFlumeEvent flumeEvent =
> batchedEvents.next();*/
> avroEvent = flume.event();
> bytePayload = avroEvent.getBody();
> logRecord = new String(bytePayload.array());
>
>   System.out.println("<<< logRecord);
>
> return logRecord;
> }
> });
> return null;
> }
>
> -Original Message-
> From: Sundaram, Muthu X. [mailto:muthu.x.sundaram@sabre.com]
> Sent: Tuesday, July 22, 2014 10:24 AM
> To: user@spark.apache.org; d...@spark.incubator.apache.org
> Subject: Tranforming flume events using Spark transformation functions
>
> Hi All,
>   I am getting events from flume using following line.
>
>   JavaDStream flumeStream = FlumeUtils.createStream(ssc,
> host, port);
>
> Each event is a delimited record. I like to use some of the transformation
> functions like map and reduce on this. Do I need to convert the
> JavaDStream to JavaDStream or can I apply these
> function directly on this?
>
> I need to do following kind of operations
>
>  AA
> YDelta
> TAA
>  Southwest
>  AA
>
> Unique tickets are  , Y, , , .
> Count is  2,  1, T 1 and so on...
> AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket,
> Southwest - 1 ticket.
>
> I have to do transformations like this. Right now I am able to receives
> records. But I am struggling to transform them using spark transformation
> functions since they are not of type JavaRDD.
>
> Can I create new JavaRDD? How do I create new JavaRDD?
>
> I loop through  the events like below
>
> flumeStream.foreach(new Function,Void> () {
>   @Override
>   public Void call(JavaRDD eventsData) throws
> Exception {
>  String logRecord = null;
>  List events = eventsData.collect();
>  Iterator batchedEvents =
> events.iterator();
>  long t1 = System.currentTimeMillis();
>  AvroFlumeEvent avroEvent = null;
>  ByteBuffer bytePayload = null;
>  // All the user level data is carried as payload in
> Flume Event
>  while(batchedEvents.hasNext()) {
> SparkFlumeEvent flumeEvent =
> batchedEvents.next();
> avroEvent = flumeEvent.event();
> bytePayload = avroEvent.getBody();
> logRecord = new String(bytePayload.array());
>
> System.out.println("LOG RECORD = " +
> logRecord); }
>
> Where do I create new JavaRDD? DO I do it before this loop? How do
> I create this JavaRDD?
> In the loop I am able to get every record and I am able to print them.
>
> I appreciate any help here.
>
> Thanks,
> Muthu
>
>
>


Re: Caching issue with msg: RDD block could not be dropped from memory as it does not exist

2014-07-22 Thread rindra
Hello Andrew,

Thank you very much for your great tips. Your solution worked perfectly.

In fact, I was not aware that the right option for local mode is
--driver.memory 1g

Cheers,

Rindra


On Mon, Jul 21, 2014 at 11:23 AM, Andrew Or-2 [via Apache Spark User List] <
ml-node+s1001560n10336...@n3.nabble.com> wrote:

> Hi Rindra,
>
> Depending on what you're doing with your groupBy, you may end up inflating
> your data quite a bit. Even if your machine has 16G, by default spark-shell
> only uses 512M, and the amount used for storing blocks is only 60% of that
> (spark.storage.memoryFraction), so this space becomes ~300M. This is still
> many multiples of the size of your dataset, but not by orders of magnitude.
> If you are running Spark 1.0+, you can increase the amount of memory used
> by spark-shell by adding "--driver-memory 1g" as a command line argument in
> local mode, or "--executor-memory 1g" in any other mode.
>
> (Also, it seems that you set your log level to WARN. The cause is most
> probably because the cache is not big enough, but setting the log level to
> INFO will provide you with more information on the exact sizes that are
> being used by the storage and the blocks).
>
> Andrew
>
>
> 2014-07-19 13:01 GMT-07:00 rindra <[hidden email]
> >:
>
>> Hi,
>>
>> I am working with a small dataset about 13Mbyte on the spark-shell. After
>> doing a
>> groupBy on the RDD, I wanted to cache RDD in memory but I keep getting
>> these warnings:
>>
>> scala> rdd.cache()
>> res28: rdd.type = MappedRDD[63] at repartition at :28
>>
>>
>> scala> rdd.count()
>> 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_82 could not be dropped
>> from memory as it does not exist
>> 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_82 failed
>> 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_40 could not be dropped
>> from memory as it does not exist
>> 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_40 failed
>> res29: Long = 5
>>
>> It seems that I could not cache the data in memory even though my local
>> machine has
>> 16Gb RAM and the data is only 13MB with 100 partitions size.
>>
>> How to prevent this caching issue from happening? Thanks.
>>
>> Rindra
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248p10336.html
>  To unsubscribe from Caching issue with msg: RDD block could not be
> dropped from memory as it does not exist, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248p10463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Executor metrics in spark application

2014-07-22 Thread Denes
As far as I understand even if I could register the custom source, there is
no way to have a cluster-wide variable to pass to it, i.e. the accumulator
can be modified by tasks, but only the driver can read it and the broadcast
value is constant.
So it seems this custom metrics/sinks fuctionality is not really thought out
by the developers. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10464.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: akka disassociated on GC

2014-07-22 Thread Makoto Yui

Hi Xiangrui,

By using your treeAggregate and broadcast patch, the evaluation has been 
processed successfully.


I expect that these patches are merged in the next major release 
(v1.1?). Without them, it would be hard to use mllib for a large dataset.


Thanks,
Makoto

(2014/07/16 15:05), Xiangrui Meng wrote:

Hi Makoto,

I don't remember I wrote that but thanks for bringing this issue up!
There are two important settings to check: 1) driver memory (you can
see it from the executor tab), 2) number of partitions (try to use
small number of partitions). I put two PRs to fix the problem:

1) use broadcast in task closure: https://github.com/apache/spark/pull/1427
2) use treeAggregate to get the result:
https://github.com/apache/spark/pull/1110

They are still under review. Once merged, the problem should be fixed.
I will test the KDDB dataset and report back. Thanks!

Best,
Xiangrui

On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui  wrote:

Hello,

(2014/06/19 23:43), Xiangrui Meng wrote:


The execution was slow for more large KDD cup 2012, Track 2 dataset
(235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the
sequential aggregation of dense vectors on a single driver node.

It took about 7.6m for aggregation for an iteration.



When running the above test, I got another error at the beginning of the 2nd
iteration when enabling iterations.

It works fine for the first iteration but the 2nd iteration always fails.

It seems that akka connections are suddenly disassociated when GC happens on
the driver node. Two possible causes can be considered:
1) The driver is under a heavy load because of GC; so executors cannot
connect to the driver. Changing akka timeout setting did not resolve the
issue.
2) akka oddly released valid connections on GC.

I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve
the problem.

[spark-defaults.conf]
spark.akka.frameSize 50
spark.akka.timeout   120
spark.akka.askTimeout120
spark.akka.lookupTimeout 120
spark.akka.heartbeat.pauses 600

It seems this issue is related to one previously discussed in
http://markmail.org/message/p2i34frtf4iusdfn

Are there any preferred configurations or workaround for this issue?

Thanks,
Makoto


[The error log of the driver]

14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as
25300254 bytes in 35 ms
666.108: [GC [PSYoungGen: 6540914K->975362K(7046784K)]
12419091K->7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43,
real=5.22 secs]
14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565)
14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated:
app-20140714180032-0010/8 is now EXITED (Command exited with code 1)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20140714180032-0010/8 removed: Command exited with code 1
14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016)
14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found
672.596: [GC [PSYoungGen: 6642785K->359202K(6059072K)]
13459952K->8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72,
real=2.83 secs]
14/07/14 18:11:41 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc03.mydomain.org,43278)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc02.mydomain.org,54538)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc18.mydomain.org,58100)
14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(dc18.mydomain.org,58100)

The full log is uploaded on
https://dl.dropboxusercontent.com/u/13123103/driver.log


[The error log of a worker]
14/07/14 18:11:38 INFO worker.Worker: Executor app-20140714180032-0010/8
finished with state EXITED message Command exited with code 1 exitStatus 1
14/07/14 18:11:38 INFO actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akk

Re: Spark Streaming: no job has started yet

2014-07-22 Thread Akhil Das
Can you paste the piece of code?

Thanks
Best Regards


On Wed, Jul 23, 2014 at 1:22 AM, Bill Jay 
wrote:

> Hi all,
>
> I am running a spark streaming job. The job hangs on one stage, which
> shows as follows:
>
> Details for Stage 4
> Summary Metrics No tasks have started yetTasksNo tasks have started yet
>
>
>
> Does anyone have an idea on this?
>
> Thanks!
>
> Bill
> Bill
>


Re: How could I start new spark cluster with hadoop2.0.2

2014-07-22 Thread Akhil Das
AFAIK you can use the --hadoop-major-version parameter with the spark-ec2
 script to
switch the hadoop version.

Thanks
Best Regards


On Wed, Jul 23, 2014 at 6:07 AM, durga  wrote:

> Hi,
>
> I am trying to create spark cluster using spark-ec2 file under spark1.0.1
> directory.
>
> 1) I noticed that It is always creating hadoop version 1.0.4.Is there a
> way
> I can override that?I would like to have hadoop2.0.2
>
> 2) I also wants install Oozie along with. Is there any scrips available
> along with spark-ec2, which can create oozie instances for me.
>
> Thanks,
> D.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-start-new-spark-cluster-with-hadoop2-0-2-tp10450.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Spark clustered client

2014-07-22 Thread Nick Pentreath
At the moment your best bet for sharing SparkContexts across jobs will be 
Ooyala job server: https://github.com/ooyala/spark-jobserver


It doesn't yet support spark 1.0 though I did manage to amend it to get it to 
build and run on 1.0
—
Sent from Mailbox

On Wed, Jul 23, 2014 at 1:21 AM, Asaf Lahav  wrote:

> Hi Folks,
> I have been trying to dig up some information in regards to what are the
> possibilities when wanting to deploy more than one client process that
> consumes Spark.
> Let's say I have a Spark Cluster of 10 servers, and would like to setup 2
> additional servers which are sending requests to it through a Spark
> context, referencing one specific file of 1TB of data.
> Each client process, has its own SparkContext instance.
> Currently, the result is that that same file is loaded into memory twice
> because the Spark Context resources are not shared between processes/jvms.
> I wouldn't like to have that same file loaded over and over again with
> every new client being introduced.
> What would be the best practice here? Am I missing something?
> Thank you,
> Asaf

Re: streaming window not behaving as advertised (v1.0.1)

2014-07-22 Thread Alan Ngai
foreachRDD is how I extracted values in the first place, so that’s not going to 
make a difference.  I don’t think it’s related to SPARK-1312 because I’m 
generating data every second in the first place and I’m using foreachRDD right 
after the window operation.  The code looks something like

val batchInterval = 5
val windowInterval = 25
val slideInterval = 15

val windowedStream = inputStream.window(Seconds(windowInterval), 
Seconds(slideInterval))

val outputFunc = (r: RDD[MetricEvent], t: Time) => {
  println(" %s".format(t.milliseconds / 
1000))
  r.foreach{metric =>
val timeKey = metric.timeStamp / batchInterval * batchInterval
println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name, 
metric.value))
  }
}
testWindow.foreachRDD(outputFunc)

On Jul 22, 2014, at 10:13 PM, Tathagata Das  wrote:

> It could be related to this bug that is currently open. 
> https://issues.apache.org/jira/browse/SPARK-1312
> 
> Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and 
> try these combos again?
> 
> TD
> 
> 
> On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai  wrote:
> I have a sample application pumping out records 1 per second.  The batch 
> interval is set to 5 seconds.  Here’s a list of “observed window intervals” 
> vs what was actually set
> 
> window=25, slide=25 : observed-window=25, overlapped-batches=0
> window=25, slide=20 : observed-window=20, overlapped-batches=0
> window=25, slide=15 : observed-window=15, overlapped-batches=0
> window=25, slide=10 : observed-window=20, overlapped-batches=2
> window=25, slide=5 : observed-window=25, overlapped-batches=3
> 
> can someone explain this behavior to me?  I’m trying to aggregate metrics by 
> time batches, but want to skip partial batches.  Therefore, I’m trying to 
> find a combination which results in 1 overlapped batch, but no combination I 
> tried gets me there.
> 
> Alan
> 
> 



Re: Spark 0.9.1 core dumps on Mesos 0.18.0

2014-07-22 Thread Dale Johnson
I'm having the exact same problem.  I tried Mesos 0.19, 0.14.2, 0.14.1,
hadoop 2.3.0, spark 0.9.1.

#  SIGSEGV (0xb) at pc=0x7fab70c55c4d, pid=31012, tid=140366980314880
#
# JRE version: 6.0_31-b31
# Java VM: OpenJDK 64-Bit Server VM (23.25-b01 mixed mode linux-amd64
compressed oops)
# Problematic frame:
# V  [libjvm.so+0x529c4d]  jni_GetByteArrayElements+0x5d

It's on ubuntu.  No hadoop, yarn or zookeeper.  It seems to happen when
launching the connection to the mesos server.  It's a complete showstopper.

Dale.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-1-core-dumps-on-Mesos-0-18-0-tp4392p10470.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.