spark table to hive table

2014-05-27 Thread 정재부
Title: Samsung Enterprise Portal mySingle


Hi all,
 
I'm trying to compare functions available in Spark1.0 hql to original HiveQL.
But, when I tested functions such as 'rank', Spark didn't support some HiveQL functions.
In case of Shark, it supports functions as well as Hive so I want to convert parquet file, Spark SQL table to Hive Table and analyze it with Shark.
Is there any way to do this?
 
Thanks,
Kevin
_ 
Kevin JungAssistant Engineer / BDA Lab T +82-2-6155-8349 M +82-10-9288-1984 F +82-2-6155-0251 E itsjb.j...@samsung.com
 www.sds.samsung.com
 






Re: maprfs and spark libraries

2014-05-27 Thread nelson
As simple as that. Indeed, the spark jar i was linking to wasn't the mapr
version. I just added spark-assembly-0.9.1-hadoop1.0.3-mapr-3.0.3.jar to the
lib directory of my project as a unmanaged dependency for sbt.
Thank you Cafe au Lait and to all of you guys.

Regards,
Nelson.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/maprfs-and-spark-libraries-tp6392p6414.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Map failed [dupliacte 1] error

2014-05-27 Thread Joe L
Hi, I am getting the following error but I don't understand what the problem
is. 


14/05/27 17:44:29 INFO TaskSetManager: Loss was due to java.io.IOException:
Map failed [duplicate 15]
14/05/27 17:44:30 INFO TaskSetManager: Starting task 47.0:43 as TID 60281 on
executor 0: cm07 (PROCESS_LOCAL)
14/05/27 17:44:30 INFO TaskSetManager: Serialized task 47.0:43 as 2132 bytes
in 0 ms
14/05/27 17:44:30 WARN TaskSetManager: Lost TID 60235 (task 47.0:3)
14/05/27 17:44:30 INFO TaskSetManager: Loss was due to java.io.IOException:
Map failed [duplicate 16]
14/05/27 17:44:30 INFO TaskSetManager: Starting task 47.0:3 as TID 60282 on
executor 3: cm04 (PROCESS_LOCAL)
14/05/27 17:44:30 INFO TaskSetManager: Serialized task 47.0:3 as 2132 bytes
in 0 ms
14/05/27 17:44:30 WARN TaskSetManager: Lost TID 60273 (task 47.0:29)
14/05/27 17:44:30 INFO TaskSetManager: Loss was due to java.io.IOException:
Map failed [duplicate 17]
14/05/27 17:44:30 ERROR TaskSetManager: Task 47.0:29 failed 4 times;
aborting job
14/05/27 17:44:30 INFO DAGScheduler: Failed to run count at
reasoner1.scala:144
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
47.0:29 failed 4 times (most recent failure: Except 
  
ion failure: java.io.IOException: Map failed)
org.apache.spark.SparkException: Job aborted: Task 47.0:29 failed 4 times
(most recent failure: Exception failure: java.io.   

IOException: Map failed)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAG
   
Scheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAG
   
Scheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1
   
018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.sca
   
la:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[trace] Stack trace suppressed: run last compile:run for the full output.
14/05/27 17:44:30 INFO ConnectionManager: Selector thread was interrupted!
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 172 s, completed 2014. 5. 27 오후 5:44:30




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Map-failed-dupliacte-1-error-tp6415.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: how to set task number?

2014-05-27 Thread qingyang li
when i using "create table bigtable002 tblproperties('shark.cache'='
tachyon') as select * from bigtable001 limit 40;" ,  there will be 4
files created on tachyon.
but when i using "create table bigtable002 tblproperties('shark.cache'='
tachyon') as select * from bigtable001 ;" ,  there will be 35 files created
on tachyon.
so, I think spark/shark know how to split files when creating table,  could
i control it's spliting by setting some configuration ,such as setting
"map.split.size=64M" ?


2014-05-26 12:14 GMT+08:00 qingyang li :

> I using " create table bigtable002 tblproperties('shark.cache'='tachyon')
> as select * from bigtable001"  to create table bigtable002; while
> bigtable001 is load from hdfs, it's format is text file ,  so i think
> bigtable002's is text.
>
>
> 2014-05-26 11:14 GMT+08:00 Aaron Davidson :
>
> What is the format of your input data, prior to insertion into Tachyon?
>>
>>
>> On Sun, May 25, 2014 at 7:52 PM, qingyang li wrote:
>>
>>> i tried "set mapred.map.tasks=30" , it does not work, it seems shark
>>> does not support this setting.
>>> i also tried "SET mapred.max.split.size=6400", it does not work,too.
>>> is there other way to control task number in shark CLI ?
>>>
>>>
>>>
>>> 2014-05-26 10:38 GMT+08:00 Aaron Davidson :
>>>
>>> You can try setting "mapred.map.tasks" to get Hive to do the right
 thing.


 On Sun, May 25, 2014 at 7:27 PM, qingyang li 
 wrote:

> Hi, Aaron, thanks for sharing.
>
> I am using shark to execute query , and table is created on tachyon. I
> think i can not using RDD#repartition() in shark CLI;
> if shark support "SET mapred.max.split.size" to control file size ?
> if yes,  after i create table, i can control file num,  then   I can
> control task number.
> if not , do anyone know other way to control task number in shark CLI?
>
>
> 2014-05-26 9:36 GMT+08:00 Aaron Davidson :
>
> How many partitions are in your input data set? A possibility is that
>> your input data has 10 unsplittable files, so you end up with 10
>> partitions. You could improve this by using RDD#repartition().
>>
>> Note that mapPartitionsWithIndex is sort of the "main processing
>> loop" for many Spark functions. It is iterating through all the elements 
>> of
>> the partition and doing some computation (probably running your user 
>> code)
>> on it.
>>
>> You can see the number of partitions in your RDD by visiting the
>> Spark driver web interface. To access this, visit port 8080 on host 
>> running
>> your Standalone Master (assuming you're running standalone mode), which
>> will have a link to the application web interface. The Tachyon master 
>> also
>> has a useful web interface, available at port 1.
>>
>>
>> On Sun, May 25, 2014 at 5:43 PM, qingyang li <
>> liqingyang1...@gmail.com> wrote:
>>
>>> hi, Mayur, thanks for replying.
>>> I know spark application should take all cores by default. My
>>> question is  how to set task number on each core ?
>>> If one silce, one task,  how can i set silce file size ?
>>>
>>>
>>> 2014-05-23 16:37 GMT+08:00 Mayur Rustagi :
>>>
>>> How many cores do you see on your spark master (8080 port).
 By default spark application should take all cores when you launch
 it. Unless you have set max core configuration.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi 



 On Thu, May 22, 2014 at 4:07 PM, qingyang li <
 liqingyang1...@gmail.com> wrote:

> my aim of setting task number is to increase the query speed,
> and I have also found " mapPartitionsWithIndex at
> Operator.scala:333 "
> is costing much time.  so, my another question is :
> how to tunning 
> mapPartitionsWithIndex
> to make the costing time down?
>
>
>
>
> 2014-05-22 18:09 GMT+08:00 qingyang li :
>
> i have added  SPARK_JAVA_OPTS+="-Dspark.
>> default.parallelism=40 "  in shark-env.sh,
>> but i find there are only10 tasks on the cluster and 2 tasks each
>> machine.
>>
>>
>> 2014-05-22 18:07 GMT+08:00 qingyang li 
>> :
>>
>> i have added  SPARK_JAVA_OPTS+="-Dspark.default.parallelism=40 "
>>> in shark-env.sh
>>>
>>>
>>> 2014-05-22 17:50 GMT+08:00 qingyang li >> >:
>>>
>>> i am using tachyon as storage system and using to shark to query
 a table which is a bigtable, i have 5 machines as a spark cluster, 
 there
 are 4 cores on eac

Re: how to control task number?

2014-05-27 Thread qingyang li
when i using "create table bigtable002 tblproperties('shark.cache'='
tachyon') as select * from bigtable001 limit 40;" ,  there will be 4
files created on tachyon.
but when i using "create table bigtable002 tblproperties('shark.cache'='
tachyon') as select * from bigtable001 ;" ,  there will be 35 files created
on tachyon.
so, I think spark/shark  know how to split files when creating table,
spark/shark will partition table into many parts on tatchyon?  how
spark/shark split table into many parts?  could i control it's spliting by
setting some configuration ,such as setting "map.split.size=64M" ?


2014-05-27 16:59 GMT+08:00 qingyang li :

> when i using "create table bigtable002 tblproperties('shark.cache'='
> tachyon') as select * from bigtable001 limit 40;" ,  there will be 4
> files created on tachyon.
> but when i using "create table bigtable002 tblproperties('shark.cache'='
> tachyon') as select * from bigtable001 ;" ,  there will be 35 files
> created on tachyon.
> so, I think spark/shark know how to split files when creating table,
> could i control it's spliting by setting some configuration ,such as
> setting "map.split.size=64M" ?
>
>
> 2014-05-26 12:14 GMT+08:00 qingyang li :
>
> I using " create table bigtable002 tblproperties('shark.cache'='tachyon')
>> as select * from bigtable001"  to create table bigtable002; while
>> bigtable001 is load from hdfs, it's format is text file ,  so i think
>> bigtable002's is text.
>>
>>
>> 2014-05-26 11:14 GMT+08:00 Aaron Davidson :
>>
>> What is the format of your input data, prior to insertion into Tachyon?
>>>
>>>
>>> On Sun, May 25, 2014 at 7:52 PM, qingyang li 
>>> wrote:
>>>
 i tried "set mapred.map.tasks=30" , it does not work, it seems shark
 does not support this setting.
 i also tried "SET mapred.max.split.size=6400", it does not
 work,too.
 is there other way to control task number in shark CLI ?



 2014-05-26 10:38 GMT+08:00 Aaron Davidson :

 You can try setting "mapred.map.tasks" to get Hive to do the right
> thing.
>
>
> On Sun, May 25, 2014 at 7:27 PM, qingyang li  > wrote:
>
>> Hi, Aaron, thanks for sharing.
>>
>> I am using shark to execute query , and table is created on tachyon.
>> I think i can not using RDD#repartition() in shark CLI;
>> if shark support "SET mapred.max.split.size" to control file size ?
>> if yes,  after i create table, i can control file num,  then   I can
>> control task number.
>> if not , do anyone know other way to control task number in shark CLI?
>>
>>
>> 2014-05-26 9:36 GMT+08:00 Aaron Davidson :
>>
>> How many partitions are in your input data set? A possibility is that
>>> your input data has 10 unsplittable files, so you end up with 10
>>> partitions. You could improve this by using RDD#repartition().
>>>
>>> Note that mapPartitionsWithIndex is sort of the "main processing
>>> loop" for many Spark functions. It is iterating through all the 
>>> elements of
>>> the partition and doing some computation (probably running your user 
>>> code)
>>> on it.
>>>
>>> You can see the number of partitions in your RDD by visiting the
>>> Spark driver web interface. To access this, visit port 8080 on host 
>>> running
>>> your Standalone Master (assuming you're running standalone mode), which
>>> will have a link to the application web interface. The Tachyon master 
>>> also
>>> has a useful web interface, available at port 1.
>>>
>>>
>>> On Sun, May 25, 2014 at 5:43 PM, qingyang li <
>>> liqingyang1...@gmail.com> wrote:
>>>
 hi, Mayur, thanks for replying.
 I know spark application should take all cores by default. My
 question is  how to set task number on each core ?
 If one silce, one task,  how can i set silce file size ?


 2014-05-23 16:37 GMT+08:00 Mayur Rustagi :

 How many cores do you see on your spark master (8080 port).
> By default spark application should take all cores when you launch
> it. Unless you have set max core configuration.
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
>  @mayur_rustagi 
>
>
>
> On Thu, May 22, 2014 at 4:07 PM, qingyang li <
> liqingyang1...@gmail.com> wrote:
>
>> my aim of setting task number is to increase the query speed,
>> and I have also found " mapPartitionsWithIndex at
>> Operator.scala:333 "
>> is costing much time.  so, my another question is :
>> how to tunning 
>> mapPartitionsWithIndex
>> to make the costing time down?
>>
>>
>>
>>>

too many temporary app files left after app finished

2014-05-27 Thread Cheney Sun
Hi,

We use spark 0.9.1 in standalone mode.
We found lots of app temporary files didn't get removed in each worker
local file system even while the job was finished. These folder have names
such as "app-20140516120842-0203".

These files occupied so many disk storage that we have to run a deamon
script to remove them periodically. I thinks this method is ugly.
Did anybody run into this issue as well? Is there any conf to delete the
app temporary files automatically once job finished?

Thanks,
Cheney


Re: KryoDeserialization getting java.io.EOFException

2014-05-27 Thread jaranda
I am experiencing the same issue. Did you manage to get rid of this?

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KryoDeserialization-getting-java-io-EOFException-tp1559p6419.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark table to hive table

2014-05-27 Thread John Omernik
Did you try the Hive Context? Look under Hive Support here:

http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html




On Tue, May 27, 2014 at 2:09 AM, 정재부  wrote:

>  Hi all,
>
>
>
> I'm trying to compare functions available in Spark1.0 hql to original
> HiveQL.
>
> But, when I tested functions such as 'rank', Spark didn't support some
> HiveQL functions.
>
> In case of Shark, it supports functions as well as Hive so I want to
> convert parquet file, Spark SQL table to Hive Table and analyze it with
> Shark.
>
> Is there any way to do this?
>
>
>
> Thanks,
>
> Kevin
>
> _
>
> *Kevin Jung*
> Assistant Engineer / BDA Lab
> *T* +82-2-6155-8349 *M* +82-10-9288-1984
> *F* +82-2-6155-0251 *E* itsjb.j...@samsung.com
>
>
> www.sds.samsung.com
>
>
>
>


Re: K-nearest neighbors search in Spark

2014-05-27 Thread Carter
Any suggestion is very much appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/K-nearest-neighbors-search-in-Spark-tp6393p6421.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark streaming issue

2014-05-27 Thread Sourav Chandra
HI,

I am facing a weird issue. I am using spark 0.9 and running a streaming
application.

In the UI, the duration shows order of seconds but if I dig into that
particular stage details, it shows total time taken across all tasks for
the stage is much much less (in milliseconds)

I am using Fair scheduling policy and pool name is counter-metric-persistor.

What could the reason for this?

*Stage screenshot: Stage 97*


 97 
counter-metric-persistor
foreach
at 
RealTimeAnalyticsApplication.scala:332014/05/27
07:22:2314.5 s
6/6

*Stage details screenshot: Stage 97*

Details for Stage 97

   - *Total task time across all tasks: *154 ms

Summary Metrics for 6 Completed Tasks
 MetricMin 25th percentileMedian 75th percentile Max Result serialization
time 0 ms 0 ms 0 ms 0 ms 0 ms Duration 12 ms 13 ms 23 ms 30 ms 54 ms Time
spent fetching task results 0 ms 0 ms 0 ms 0 ms 0 ms Scheduler delay 7 ms 7
ms 8 ms 8 ms 8 ms
Aggregated Metrics by Executor Executor ID Address Task TimeTotal
TasksFailed TasksSucceeded
Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill
(Disk)0ls230-127-p.nyc0.ls.local:53463199 ms6060.0 B0.0 B0.0 B0.0 B
Tasks
 Task IndexTask ID StatusLocality Level ExecutorLaunch Time DurationGC
TimeResult Ser 
TimeErrors0408SUCCESSPROCESS_LOCALls230-127-p.nyc0.ls.local2014/05/27
07:22:3730 ms
1 411 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3722 ms
2 412 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3723 ms
3 414 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3713 ms
4 415 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3712 ms
5 416 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3754 ms


Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Computing cosine similiarity using pyspark

2014-05-27 Thread Jeremy Freeman
Hi Jamal,

One nice feature of PySpark is that you can easily use existing functions
from NumPy and SciPy inside your Spark code. For a simple example, the
following uses Spark's cartesian operation (which combines pairs of vectors
into tuples), followed by NumPy's corrcoef to compute the pearson
correlation coefficient between every pair of a set of vectors. The vectors
are an RDD of numpy arrays.

>> from numpy import array, corrcoef

>> data = sc.parallelize([array([1,2,3]),array([2,4,6.1]),array([3,2,1.1])])
>> corrs = data.cartesian(data).map(lambda (x,y):
corrcoef(x,y)[0,1]).collect()
>> corrs
[1.0, 0.0086740991746, -0.99953863896044948 ...

This just returns a list of the correlation coefficients, you could also
add a key to each array, to keep track of which pair is which

>> data_with_keys =
sc.parallelize([(0,array([1,2,3])),(1,array([2,4,6.1])),(2,array([3,2,1.1]))])
>> corrs_with_keys = data_with_keys.cartesian(data_with_keys).map(lambda
((k1,v1),(k2,v2)): ((k1,k2),corrcoef(v1,v2)[0,1])).collect()
>> corrs_with_keys
[((0, 0), 1.0), ((0, 1), 0.0086740991746), ((0, 2),
-0.99953863896044948) ...

Finally, you could just replace corrcoef in either of the above
with scipy.spatial.distance.cosine to get your cosine similarity.

Hope that's useful, as Andrei said, the answer partly depends on exactly
what you're trying to do.

-- Jeremy


On Fri, May 23, 2014 at 2:41 PM, Andrei  wrote:

> Do you need cosine distance and correlation between vectors or between
> variables (elements of vector)? It would be helpful if you could tell us
> details of your task.
>
>
> On Thu, May 22, 2014 at 5:49 PM, jamal sasha wrote:
>
>> Hi,
>>   I have bunch of vectors like
>> [0.1234,-0.231,0.23131]
>>  and so on.
>>
>> and  I want to compute cosine similarity and pearson correlation using
>> pyspark..
>> How do I do this?
>> Any ideas?
>> Thanks
>>
>
>


Re: Spark Summit 2014 (Hotel suggestions)

2014-05-27 Thread Pierre B
Hi everyone!

Any recommendation anyone?


Pierre



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Summit-2014-Hotel-suggestions-tp5457p6424.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark On Mesos

2014-05-27 Thread Gileny
Hello,I’ve installed Spark Cluster spark-0.9.0-incubating-bin-hadoop1, which
works fine.Also, on the same cluster I’ve installed Mesos cluster, using
mesos_0.18.2_x86_64.rpm, which works fine as well.Now,I was trying to
followed the instructions from
https://spark.apache.org/docs/0.9.0/running-on-mesos.htmland while trying to
get JETTY from the web, it can’t find it.I have checked the URL, and
obviously I got file not found.How can I overcome/bypass this issue?Here how
the errors looks like:[info] Loading project definition from
/usr/spark-0.9.0-incubating-bin-hadoop1/project/project[info] Loading
project definition from
/usr/spark-0.9.0-incubating-bin-hadoop1/project[info] Set current project to
root (in build file:/usr/spark-0.9.0-incubating-bin-hadoop1/)[info] Updating
{file:/usr/spark-0.9.0-incubating-bin-hadoop1/}core...[info] Resolving
org.eclipse.jetty#jetty-http;7.6.8.v20121106 ...[error] Server access Error:
Connection timed out
url=https://oss.sonatype.org/content/repositories/snapshots/org/eclipse/jetty/jetty-http/7.6.8.v20121106/jetty-http-7.6.8.v20121106.pom...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-On-Mesos-tp6425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark Summit 2014 (Hotel suggestions)

2014-05-27 Thread Gerard Maas
+1


On Tue, May 27, 2014 at 3:22 PM, Pierre B <
pierre.borckm...@realimpactanalytics.com> wrote:

> Hi everyone!
>
> Any recommendation anyone?
>
>
> Pierre
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Summit-2014-Hotel-suggestions-tp5457p6424.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Working with Avro Generic Records in the interactive scala shell

2014-05-27 Thread Jeremy Lewi
I was able to work around this by switching to the SpecificDatum interface
and following this example:
https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java

As in the example, I defined a subclass of my Avro type which implemented
the Serializable interface using Avro serialization methods.

I also defined a copy constructor which converted from the actual avro type
to my subclass.

In spark, after reading the Avro file, I ran a map operation to convert
from the avro type to my serializable subclass.
This worked although I'm not sure its the most efficient solution.

Here's a gist of what I run in the console:
https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific

I haven't gotten Kryo registration to work yet but it seems like setting
the registrator before launching the console using the environment variable
SPARK_JAVA_OPTS might be better than shutting down and restarting the spark
context in the console.

J


On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi  wrote:

> Hi Josh,
>
> Thanks for the help.
>
> The class should be on the path on all nodes. Here's what I did:
> 1) I built a jar from my scala code.
> 2) I copied that jar to a location on all nodes in my cluster
> (/usr/local/spark)
> 3) I edited bin/compute-classpath.sh  to add my jar to the class path.
> 4) I repeated the process with the avro mapreduce jar to provide AvroKey.
>
> I doubt this is the best way to set the classpath but it seems to work.
>
> J
>
>
> On Sat, May 24, 2014 at 9:26 AM, Josh Marcus  wrote:
>
>> Jeremy,
>>
>> Just to be clear, are you assembling a jar with that class compiled
>> (with its dependencies) and including the path to that jar on the command
>> line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)?
>>
>> --j
>>
>>
>> On Saturday, May 24, 2014, Jeremy Lewi  wrote:
>>
>>> Hi Spark Users,
>>>
>>> I'm trying to read and process an Avro dataset using the interactive
>>> spark scala shell. When my pipeline executes I get the
>>> ClassNotFoundException pasted at the end of this email.
>>> I'm trying to use the Generic Avro API (not the Specific API).
>>>
>>> Here's a gist of the commands I'm running in the spark console:
>>> https://gist.github.com/jlewi/2c853e0ceee5f00c
>>>
>>> Here's my registrator for kryo.
>>>
>>> https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala
>>>
>>> Any help or suggestions would be greatly appreciated.
>>>
>>> Thanks
>>> Jeremy
>>>
>>> Here's the log message that is spewed out.
>>>
>>> 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to
>>> java.lang.ClassNotFoundException
>>> java.lang.ClassNotFoundException:
>>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>  at java.security.AccessController.doPrivileged(Native Method)
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>>  at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:270)
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>>  at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>>  at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>  at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>  at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>  at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>  at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>  at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>  at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>  at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>  at java.io.ObjectInputStream.readObject0(ObjectI

Re: KryoSerializer Exception

2014-05-27 Thread jaranda
I am experiencing the same issue (I tried both using Kryo as serializer and
increasing the buffer size up to 256M, my objects are much smaller though).
I share my registrator class just in case:

https://gist.github.com/JordiAranda/5cc16cf102290c413c82

Any hints would be highly appreciated.

Thanks,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KryoSerializer-Exception-tp5435p6428.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Summit 2014 (Hotel suggestions)

2014-05-27 Thread Gary Malouf
Go to expedia/orbitz and look for hotels in the union square neighborhood.
 In my humble opinion having visited San Francisco, it is worth any extra
cost to be as close as possible to the conference vs having to travel from
other parts of the city.


On Tue, May 27, 2014 at 9:36 AM, Gerard Maas  wrote:

> +1
>
>
> On Tue, May 27, 2014 at 3:22 PM, Pierre B <
> pierre.borckm...@realimpactanalytics.com> wrote:
>
>> Hi everyone!
>>
>> Any recommendation anyone?
>>
>>
>> Pierre
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Summit-2014-Hotel-suggestions-tp5457p6424.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Spark Summit 2014 (Hotel suggestions)

2014-05-27 Thread Jerry Lam
Hi guys,

I ended up reserving a room at the Phoenix (Hotel:
http://www.jdvhotels.com/hotels/california/san-francisco-hotels/phoenix-hotel)
recommended by my friend who has been in SF.

According to Google, it takes 11min to walk to the conference which is not
too bad.

Hope this helps!

Jerry


On Tue, May 27, 2014 at 10:35 AM, Gary Malouf  wrote:

> Go to expedia/orbitz and look for hotels in the union square neighborhood.
>  In my humble opinion having visited San Francisco, it is worth any extra
> cost to be as close as possible to the conference vs having to travel from
> other parts of the city.
>
>
> On Tue, May 27, 2014 at 9:36 AM, Gerard Maas wrote:
>
>> +1
>>
>>
>> On Tue, May 27, 2014 at 3:22 PM, Pierre B <
>> pierre.borckm...@realimpactanalytics.com> wrote:
>>
>>> Hi everyone!
>>>
>>> Any recommendation anyone?
>>>
>>>
>>> Pierre
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Summit-2014-Hotel-suggestions-tp5457p6424.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>


Re: Working with Avro Generic Records in the interactive scala shell

2014-05-27 Thread Matt Massie
I really should update that blog post. I created a gist (see
https://gist.github.com/massie/7224868) which explains a cleaner, more
efficient approach.

--
Matt 
Massie
UC, Berkeley AMPLab 


On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi  wrote:

> I was able to work around this by switching to the SpecificDatum interface
> and following this example:
>
> https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java
>
> As in the example, I defined a subclass of my Avro type which implemented
> the Serializable interface using Avro serialization methods.
>
> I also defined a copy constructor which converted from the actual avro
> type to my subclass.
>
> In spark, after reading the Avro file, I ran a map operation to convert
> from the avro type to my serializable subclass.
> This worked although I'm not sure its the most efficient solution.
>
> Here's a gist of what I run in the console:
> https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific
>
> I haven't gotten Kryo registration to work yet but it seems like setting
> the registrator before launching the console using the environment variable
> SPARK_JAVA_OPTS might be better than shutting down and restarting the spark
> context in the console.
>
> J
>
>
> On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi  wrote:
>
>> Hi Josh,
>>
>> Thanks for the help.
>>
>> The class should be on the path on all nodes. Here's what I did:
>> 1) I built a jar from my scala code.
>> 2) I copied that jar to a location on all nodes in my cluster
>> (/usr/local/spark)
>> 3) I edited bin/compute-classpath.sh  to add my jar to the class path.
>> 4) I repeated the process with the avro mapreduce jar to provide AvroKey.
>>
>> I doubt this is the best way to set the classpath but it seems to work.
>>
>> J
>>
>>
>> On Sat, May 24, 2014 at 9:26 AM, Josh Marcus  wrote:
>>
>>> Jeremy,
>>>
>>> Just to be clear, are you assembling a jar with that class compiled
>>> (with its dependencies) and including the path to that jar on the command
>>> line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)?
>>>
>>> --j
>>>
>>>
>>> On Saturday, May 24, 2014, Jeremy Lewi  wrote:
>>>
 Hi Spark Users,

 I'm trying to read and process an Avro dataset using the interactive
 spark scala shell. When my pipeline executes I get the
 ClassNotFoundException pasted at the end of this email.
 I'm trying to use the Generic Avro API (not the Specific API).

 Here's a gist of the commands I'm running in the spark console:
 https://gist.github.com/jlewi/2c853e0ceee5f00c

 Here's my registrator for kryo.

 https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala

 Any help or suggestions would be greatly appreciated.

 Thanks
 Jeremy

 Here's the log message that is spewed out.

 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to
 java.lang.ClassNotFoundException
 java.lang.ClassNotFoundException:
 $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
  at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
  at sun.reflect.GeneratedMethodAccessor

Re: Running a spark-submit compatible app in spark-shell

2014-05-27 Thread Roger Hoover
Thanks, Andrew.  I'll give it a try.


On Mon, May 26, 2014 at 2:22 PM, Andrew Or  wrote:

> Hi Roger,
>
> This was due to a bug in the Spark shell code, and is fixed in the latest
> master (and RC11). Here is the commit that fixed it:
> https://github.com/apache/spark/commit/8edbee7d1b4afc192d97ba192a5526affc464205.
> Try it now and it should work. :)
>
> Andrew
>
>
> 2014-05-26 10:35 GMT+02:00 Perttu Ranta-aho :
>
> Hi Roger,
>>
>> Were you able to solve this?
>>
>> -Perttu
>>
>>
>> On Tue, Apr 29, 2014 at 8:11 AM, Roger Hoover wrote:
>>
>>> Patrick,
>>>
>>> Thank you for replying.  That didn't seem to work either.  I see the
>>> option parsed using verbose mode.
>>>
>>> Parsed arguments:
>>>  ...
>>>   driverExtraClassPath
>>>  /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar
>>>
>>> But the jar still doesn't show up if I run ":cp" in the repl and the
>>> import still fails.
>>>
>>> scala> import etl._
>>> :7: error: not found: value etl
>>>import etl._
>>>
>>> Not sure if this helps, but I noticed with Spark 0.9.1 that the import
>>> only seems to work went I add the -usejavacp option to the spark-shell
>>> command.  I don't really understand why.
>>>
>>> With the latest code, I tried adding these options to the spark-shell
>>> command without success: -usejavacp -Dscala.usejavacp=true
>>>
>>>
>>> On Mon, Apr 28, 2014 at 6:30 PM, Patrick Wendell wrote:
>>>
 What about if you run ./bin/spark-shell
 --driver-class-path=/path/to/your/jar.jar

 I think either this or the --jars flag should work, but it's possible
 there is a bug with the --jars flag when calling the Repl.


 On Mon, Apr 28, 2014 at 4:30 PM, Roger Hoover 
 wrote:

> A couple of issues:
> 1) the jar doesn't show up on the classpath even though SparkSubmit
> had it in the --jars options.  I tested this by running > :cp in 
> spark-shell
> 2) After adding it the classpath using (:cp
> /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar), 
> it
> still fails.  When I do that in the scala repl, it works.
>
> BTW, I'm using the latest code from the master branch
> (8421034e793c0960373a0a1d694ce334ad36e747)
>
>
> On Mon, Apr 28, 2014 at 3:40 PM, Roger Hoover 
> wrote:
>
>> Matei,  thank you.  That seemed to work but I'm not able to import a
>> class from my jar.
>>
>> Using the verbose options, I can see that my jar should be included
>>
>> Parsed arguments:
>> ...
>>   jars
>>  /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar
>>
>> And I see the class I want to load in the jar:
>>
>> jar -tf
>> /Users/rhoover/Work/spark-etl/target/scala-2.10/spark-etl_2.10-1.0.jar |
>> grep IP2IncomeJob
>> etl/IP2IncomeJob$$anonfun$1.class
>> etl/IP2IncomeJob$$anonfun$4.class
>> etl/IP2IncomeJob$.class
>> etl/IP2IncomeJob$$anonfun$splitOverlappingRange$1.class
>> etl/IP2IncomeJob.class
>> etl/IP2IncomeJob$$anonfun$3.class
>> etl/IP2IncomeJob$$anonfun$2.class
>>
>> But the import fails
>>
>> scala> import etl.IP2IncomeJob
>> :10: error: not found: value etl
>>import etl.IP2IncomeJob
>>
>> Any ideas?
>>
>>
>>
>> On Sun, Apr 27, 2014 at 3:46 PM, Matei Zaharia <
>> matei.zaha...@gmail.com> wrote:
>>
>>> Hi Roger,
>>>
>>> You should be able to use the --jars argument of spark-shell to add
>>> JARs onto the classpath and then work with those classes in the shell. 
>>> (A
>>> recent patch, https://github.com/apache/spark/pull/542, made
>>> spark-shell use the same command-line arguments as spark-submit). But 
>>> this
>>> is a great question, we should test it out and see whether anything else
>>> would make development easier.
>>>
>>> SBT also has an interactive shell where you can run classes in your
>>> project, but unfortunately Spark can’t deal with closures typed 
>>> directly in
>>> that the right way. However you write your Spark logic in a method and 
>>> just
>>> call that method from the SBT shell, that should work.
>>>
>>> Matei
>>>
>>> On Apr 27, 2014, at 3:14 PM, Roger Hoover 
>>> wrote:
>>>
>>> > Hi,
>>> >
>>> > From the meetup talk about the 1.0 release, I saw that
>>> spark-submit will be the preferred way to launch apps going forward.
>>> >
>>> > How do you recommend launching such jobs in a development cycle?
>>>  For example, how can I load an app that's expecting to a given to
>>> spark-submit into spark-shell?
>>> >
>>> > Also, can anyone recommend other tricks for rapid development?
>>>  I'm new to Scala, sbt, etc.  I think sbt can watch for changes in 
>>> source
>>> files and compile them automatically.
>>> >
>>> > I want to be able to make code changes and quickly get into a

Re: Working with Avro Generic Records in the interactive scala shell

2014-05-27 Thread Jeremy Lewi
Thanks that's super helpful.

J


On Tue, May 27, 2014 at 8:01 AM, Matt Massie  wrote:

> I really should update that blog post. I created a gist (see
> https://gist.github.com/massie/7224868) which explains a cleaner, more
> efficient approach.
>
> --
> Matt  
> Massie
> UC, Berkeley AMPLab 
>
>
> On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi  wrote:
>
>> I was able to work around this by switching to the SpecificDatum
>> interface and following this example:
>>
>> https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java
>>
>> As in the example, I defined a subclass of my Avro type which implemented
>> the Serializable interface using Avro serialization methods.
>>
>> I also defined a copy constructor which converted from the actual avro
>> type to my subclass.
>>
>> In spark, after reading the Avro file, I ran a map operation to convert
>> from the avro type to my serializable subclass.
>> This worked although I'm not sure its the most efficient solution.
>>
>> Here's a gist of what I run in the console:
>> https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific
>>
>> I haven't gotten Kryo registration to work yet but it seems like setting
>> the registrator before launching the console using the environment variable
>> SPARK_JAVA_OPTS might be better than shutting down and restarting the spark
>> context in the console.
>>
>> J
>>
>>
>> On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi  wrote:
>>
>>> Hi Josh,
>>>
>>> Thanks for the help.
>>>
>>> The class should be on the path on all nodes. Here's what I did:
>>> 1) I built a jar from my scala code.
>>> 2) I copied that jar to a location on all nodes in my cluster
>>> (/usr/local/spark)
>>> 3) I edited bin/compute-classpath.sh  to add my jar to the class path.
>>> 4) I repeated the process with the avro mapreduce jar to provide AvroKey.
>>>
>>> I doubt this is the best way to set the classpath but it seems to work.
>>>
>>> J
>>>
>>>
>>> On Sat, May 24, 2014 at 9:26 AM, Josh Marcus  wrote:
>>>
 Jeremy,

 Just to be clear, are you assembling a jar with that class compiled
 (with its dependencies) and including the path to that jar on the command
 line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)?

 --j


 On Saturday, May 24, 2014, Jeremy Lewi  wrote:

> Hi Spark Users,
>
> I'm trying to read and process an Avro dataset using the interactive
> spark scala shell. When my pipeline executes I get the
> ClassNotFoundException pasted at the end of this email.
> I'm trying to use the Generic Avro API (not the Specific API).
>
> Here's a gist of the commands I'm running in the spark console:
> https://gist.github.com/jlewi/2c853e0ceee5f00c
>
> Here's my registrator for kryo.
>
> https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala
>
> Any help or suggestions would be greatly appreciated.
>
> Thanks
> Jeremy
>
> Here's the log message that is spewed out.
>
> 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException
> java.lang.ClassNotFoundException:
> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>  at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>  at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:270)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>  at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.rea

Re: Working with Avro Generic Records in the interactive scala shell

2014-05-27 Thread Andrew Ash
Also see this context from February.  We started working with Chill to get
Avro records automatically registered with Kryo.  I'm not sure the final
status, but from the Chill PR #172 it looks like this might be much less
friction than before.

Issue we filed: https://github.com/twitter/chill/issues/171
Pull request that adds an AvroSerializer to Chill:
https://github.com/twitter/chill/pull/172
Issue on the old Spark tracker:
https://spark-project.atlassian.net/browse/SPARK-746

Matt can you comment if this change helps you streamline that gist even
further?

Andrew




On Tue, May 27, 2014 at 8:49 AM, Jeremy Lewi  wrote:

> Thanks that's super helpful.
>
> J
>
>
> On Tue, May 27, 2014 at 8:01 AM, Matt Massie  wrote:
>
>> I really should update that blog post. I created a gist (see
>> https://gist.github.com/massie/7224868) which explains a cleaner, more
>> efficient approach.
>>
>> --
>> Matt  
>> Massie
>> UC, Berkeley AMPLab 
>>
>>
>> On Tue, May 27, 2014 at 7:18 AM, Jeremy Lewi  wrote:
>>
>>> I was able to work around this by switching to the SpecificDatum
>>> interface and following this example:
>>>
>>> https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java
>>>
>>> As in the example, I defined a subclass of my Avro type which
>>> implemented the Serializable interface using Avro serialization methods.
>>>
>>> I also defined a copy constructor which converted from the actual avro
>>> type to my subclass.
>>>
>>> In spark, after reading the Avro file, I ran a map operation to convert
>>> from the avro type to my serializable subclass.
>>> This worked although I'm not sure its the most efficient solution.
>>>
>>> Here's a gist of what I run in the console:
>>> https://gist.github.com/jlewi/59b0dec90b639d5d568e#file-avrospecific
>>>
>>> I haven't gotten Kryo registration to work yet but it seems like setting
>>> the registrator before launching the console using the environment variable
>>> SPARK_JAVA_OPTS might be better than shutting down and restarting the spark
>>> context in the console.
>>>
>>> J
>>>
>>>
>>> On Sat, May 24, 2014 at 11:16 AM, Jeremy Lewi  wrote:
>>>
 Hi Josh,

 Thanks for the help.

 The class should be on the path on all nodes. Here's what I did:
 1) I built a jar from my scala code.
 2) I copied that jar to a location on all nodes in my cluster
 (/usr/local/spark)
 3) I edited bin/compute-classpath.sh  to add my jar to the class path.
 4) I repeated the process with the avro mapreduce jar to provide
 AvroKey.

 I doubt this is the best way to set the classpath but it seems to work.

 J


 On Sat, May 24, 2014 at 9:26 AM, Josh Marcus wrote:

> Jeremy,
>
> Just to be clear, are you assembling a jar with that class compiled
> (with its dependencies) and including the path to that jar on the command
> line in an environment variable (e.g. SPARK_CLASSPATH=path ./spark-shell)?
>
> --j
>
>
> On Saturday, May 24, 2014, Jeremy Lewi  wrote:
>
>> Hi Spark Users,
>>
>> I'm trying to read and process an Avro dataset using the interactive
>> spark scala shell. When my pipeline executes I get the
>> ClassNotFoundException pasted at the end of this email.
>> I'm trying to use the Generic Avro API (not the Specific API).
>>
>> Here's a gist of the commands I'm running in the spark console:
>> https://gist.github.com/jlewi/2c853e0ceee5f00c
>>
>> Here's my registrator for kryo.
>>
>> https://github.com/jlewi/cloud/blob/master/spark/src/main/scala/contrail/AvroGenericRegistrator.scala
>>
>> Any help or suggestions would be greatly appreciated.
>>
>> Thanks
>> Jeremy
>>
>> Here's the log message that is spewed out.
>>
>> 14/05/24 02:00:48 WARN TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException
>> java.lang.ClassNotFoundException:
>> $line16.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>  at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>  at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:270)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>  at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>> at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>  at
>> java.io.ObjectInputStream.readOr

Re: K-nearest neighbors search in Spark

2014-05-27 Thread Andrew Ash
Hi Carter,

In Spark 1.0 there will be an implementation of k-means available as part
of MLLib.  You can see the documentation for that below (until 1.0 is fully
released).

https://people.apache.org/~pwendell/spark-1.0.0-rc9-docs/mllib-clustering.html

Maybe diving into the source here will help get you started?
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala

Cheers,
Andrew


On Tue, May 27, 2014 at 4:10 AM, Carter  wrote:

> Any suggestion is very much appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/K-nearest-neighbors-search-in-Spark-tp6393p6421.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Persist and unpersist

2014-05-27 Thread Daniel Darabos
I keep bumping into a problem with persisting RDDs. Consider this (silly)
example:

def everySecondFromBehind(input: RDD[Int]): RDD[Int] = {
  val count = input.count
  if (count % 2 == 0) {
return input.filter(_ % 2 == 1)
  } else {
return input.filter(_ % 2 == 0)
  }
}


The situation is that we want to do two things with an RDD (a "count" and a
"filter" in the example). The "input" RDD may represent a very expensive
calculation. So it would make sense to add an "input.cache()" line at the
beginning. But where do we put "input.unpersist()"?

input.cache()val count = input.countval result = input.filter(...)
input.unpersist()return result


"input.filter()" is lazy, so this does not work as expected. We only want
to release "input" from the cache once nothing depends on it anymore. Maybe
"result" was garbage collected. Maybe "result" itself has been cached. But
there is no way to detect such conditions.

Our current approach is to just leave the RDD cached, and it will get
dumped at some point anyway. Is there a better solution? Thanks for any
tips.


Re: file not found

2014-05-27 Thread jaranda
Thanks for the heads up, I also experienced this issue.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/file-not-found-tp1854p6438.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Persist and unpersist

2014-05-27 Thread Nicholas Chammas
Daniel,

Is SPARK-1103  related to
your example? Automatic unpersist()-ing of unreferenced RDDs would be nice.

Nick
​


On Tue, May 27, 2014 at 12:28 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> I keep bumping into a problem with persisting RDDs. Consider this (silly)
> example:
>
> def everySecondFromBehind(input: RDD[Int]): RDD[Int] = {
>   val count = input.count
>   if (count % 2 == 0) {
> return input.filter(_ % 2 == 1)
>   } else {
> return input.filter(_ % 2 == 0)
>   }
> }
>
>
> The situation is that we want to do two things with an RDD (a "count" and
> a "filter" in the example). The "input" RDD may represent a very expensive
> calculation. So it would make sense to add an "input.cache()" line at the
> beginning. But where do we put "input.unpersist()"?
>
> input.cache()val count = input.countval result = input.filter(...)
> input.unpersist()return result
>
>
> "input.filter()" is lazy, so this does not work as expected. We only want
> to release "input" from the cache once nothing depends on it anymore. Maybe
> "result" was garbage collected. Maybe "result" itself has been cached. But
> there is no way to detect such conditions.
>
> Our current approach is to just leave the RDD cached, and it will get
> dumped at some point anyway. Is there a better solution? Thanks for any
> tips.
>


Re: Akka disassociation on Java SE Embedded

2014-05-27 Thread Aaron Davidson
Sorry, to clarify: Spark *does* effectively turn Akka's failure detector
off.


On Tue, May 27, 2014 at 10:47 AM, Aaron Davidson  wrote:

> Spark should effectively turn Akka's failure detector off, because we
> historically had problems with GCs and other issues causing
> disassociations. The only thing that should cause these messages nowadays
> is if the TCP connection (which Akka sustains between Actor Systems on
> different machines) actually drops. TCP connections are pretty resilient,
> so one common cause of this is actual Executor failure -- recently, I have
> experienced a similar-sounding problem due to my machine's OOM killer
> terminating my Executors, such that they didn't produce any error output.
>
>
> On Thu, May 22, 2014 at 9:19 AM, Chanwit Kaewkasi wrote:
>
>> Hi all,
>>
>> On an ARM cluster, I have been testing a wordcount program with JRE 7
>> and everything is OK. But when changing to the embedded version of
>> Java SE (Oracle's eJRE), the same program cannot complete all
>> computing stages.
>>
>> It is failed by many Akka's disassociation.
>>
>> - I've been trying to increase Akka's timeout but still stuck. I am
>> not sure what is the right way to do so? (I suspected that GC pausing
>> the world is causing this).
>>
>> - Another question is that how could I properly turn on Akka's logging
>> to see what's the root cause of this disassociation problem? (If my
>> guess about GC is wrong).
>>
>> Best regards,
>>
>> -chanwit
>>
>> --
>> Chanwit Kaewkasi
>> linkedin.com/in/chanwit
>>
>
>


Re: Akka disassociation on Java SE Embedded

2014-05-27 Thread Aaron Davidson
Spark should effectively turn Akka's failure detector off, because we
historically had problems with GCs and other issues causing
disassociations. The only thing that should cause these messages nowadays
is if the TCP connection (which Akka sustains between Actor Systems on
different machines) actually drops. TCP connections are pretty resilient,
so one common cause of this is actual Executor failure -- recently, I have
experienced a similar-sounding problem due to my machine's OOM killer
terminating my Executors, such that they didn't produce any error output.


On Thu, May 22, 2014 at 9:19 AM, Chanwit Kaewkasi  wrote:

> Hi all,
>
> On an ARM cluster, I have been testing a wordcount program with JRE 7
> and everything is OK. But when changing to the embedded version of
> Java SE (Oracle's eJRE), the same program cannot complete all
> computing stages.
>
> It is failed by many Akka's disassociation.
>
> - I've been trying to increase Akka's timeout but still stuck. I am
> not sure what is the right way to do so? (I suspected that GC pausing
> the world is causing this).
>
> - Another question is that how could I properly turn on Akka's logging
> to see what's the root cause of this disassociation problem? (If my
> guess about GC is wrong).
>
> Best regards,
>
> -chanwit
>
> --
> Chanwit Kaewkasi
> linkedin.com/in/chanwit
>


proximity of events within the next group of events instead of time

2014-05-27 Thread Navarro, John
Hi,
Spark newbie here with a general question    In a stream consisting of 
several types of events, how can I detect if event X happened within Z 
transactions  of event Y?  is it just a matter of iterating thru all the RDDs, 
when event type Y found, take the next Z transactions and check if there is an 
event type X?  What if the next Z transactions crosses into the next RDD?

Thanks.




This email is intended solely for the recipient. It may contain privileged, 
proprietary or confidential information or material. If you are not the 
intended recipient, please delete this email and any attachments and notify the 
sender of the error.


Re: Akka disassociation on Java SE Embedded

2014-05-27 Thread Chanwit Kaewkasi
May be that's explaining mine too.
Thank you very much, Aaron !!

Best regards,

-chanwit

--
Chanwit Kaewkasi
linkedin.com/in/chanwit


On Wed, May 28, 2014 at 12:47 AM, Aaron Davidson  wrote:
> Spark should effectively turn Akka's failure detector off, because we
> historically had problems with GCs and other issues causing disassociations.
> The only thing that should cause these messages nowadays is if the TCP
> connection (which Akka sustains between Actor Systems on different machines)
> actually drops. TCP connections are pretty resilient, so one common cause of
> this is actual Executor failure -- recently, I have experienced a
> similar-sounding problem due to my machine's OOM killer terminating my
> Executors, such that they didn't produce any error output.
>
>
> On Thu, May 22, 2014 at 9:19 AM, Chanwit Kaewkasi  wrote:
>>
>> Hi all,
>>
>> On an ARM cluster, I have been testing a wordcount program with JRE 7
>> and everything is OK. But when changing to the embedded version of
>> Java SE (Oracle's eJRE), the same program cannot complete all
>> computing stages.
>>
>> It is failed by many Akka's disassociation.
>>
>> - I've been trying to increase Akka's timeout but still stuck. I am
>> not sure what is the right way to do so? (I suspected that GC pausing
>> the world is causing this).
>>
>> - Another question is that how could I properly turn on Akka's logging
>> to see what's the root cause of this disassociation problem? (If my
>> guess about GC is wrong).
>>
>> Best regards,
>>
>> -chanwit
>>
>> --
>> Chanwit Kaewkasi
>> linkedin.com/in/chanwit
>
>


Running Jars on Spark, program just hanging there

2014-05-27 Thread Min Li

Hi all,

I've a single machine with 8 cores and 8g mem. I've deployed the 
standalone spark on the machine and successfully run the examples.


Now I'm trying to write some simple java codes. I just read a local file 
(23M) into string list and use JavaRDD rdds = 
sparkContext.paralellize() method to get the corresponding rdd. And I 
asked to run rdds.count(). But the program just stopped on the count(). 
The last log info is:


   14/05/27 14:13:16 INFO SparkContext: Starting job: count at
   RDDTest.java:40
   14/05/27 14:13:16 INFO DAGScheduler: Got job 0 (count at
   RDDTest.java:40) with 2 output partitions (allowLocal=false)
   14/05/27 14:13:16 INFO DAGScheduler: Final stage: Stage 0 (count at
   RDDTest.java:40)
   14/05/27 14:13:16 INFO DAGScheduler: Parents of final stage: List()
   14/05/27 14:13:16 INFO DAGScheduler: Missing parents: List()
   14/05/27 14:13:16 INFO DAGScheduler: Submitting Stage 0
   (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37), which
   has no missing parents
   14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Connected to
   Spark cluster with app ID app-20140527141316-0003
   14/05/27 14:13:16 INFO AppClient$ClientActor: Executor added:
   app-20140527141316-0003/0 on worker-20140526221107-spark-35303
   (spark:35303) with 8 cores
   14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Granted executor
   ID app-20140527141316-0003/0 on hostPort spark:35303 with 8 cores,
   1024.0 MB RAM
   14/05/27 14:13:16 INFO AppClient$ClientActor: Executor updated:
   app-20140527141316-0003/0 is now RUNNING
   14/05/27 14:13:16 INFO DAGScheduler: Submitting 2 missing tasks from
   Stage 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37)
   14/05/27 14:13:16 INFO TaskSchedulerImpl: Adding task set 0.0 with 2
   tasks
   14/05/27 14:13:17 INFO SparkDeploySchedulerBackend: Registered
   executor:
   Actor[akka.tcp://sparkExecutor@spark:34279/user/Executor#196489168]
   with ID 0
   14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:0 as TID 0
   on executor 0: spark (PROCESS_LOCAL)
   14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:0 as
   12993529 bytes in 127 ms
   14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:1 as TID 1
   on executor 0: spark (PROCESS_LOCAL)
   14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:1 as
   13006417 bytes in 74 ms
   14/05/27 14:13:17 INFO BlockManagerMasterActor$BlockManagerInfo:
   Registering block manager spark:37617 with 589.2 MB RAM

I tried to figure out what's going on, but just can't. Could any please 
give me some suggestions and point out some possible issues?


Best Regards,
Min


Re: Broadcast Variables

2014-05-27 Thread Puneet Lakhina
To answer my own question, that does seem to be the right way. I was
concerned about whether the data that a broadcast variable would end up
getting serialized if I used it as an instance variable of the function. I
realized that doesnt happen because the broadcast variable's value is
marked as transient.

1. Http -
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
2. Torrent -
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala


On Thu, May 22, 2014 at 6:58 PM, Puneet Lakhina wrote:

> Hi,
>
> Im confused on what is the right way to use broadcast variables from java.
>
> My code looks something like this:
>
> Map<> val = //build Map to be broadcast
> Broadcast> broadastVar = sc.broadcast(val);
>
>
> sc.textFile(...).map(new SomeFunction()) {
> //Do something here using broadcastVar
> }
>
> My question is, should I pass the broadcastVar to the SomeFunction as a
> constructor parameter that it can keep around as an instance variable i.e.
>
> sc.textFile(...).map(new SomeFunction(broadcastVar)) {
> //Do something here using broadcastVar
> }
>
> class SomeFunction extends Function {
>  public SomeFunction(Broadcast> var) {
>this.var = var
>  }
>
>  public T call() {
>   //Do something
>  }
> }
>
> Is above the right way to utilize broadcast Variables when not using
> anonymous inner classes as functions?
> --
> Regards,
> Puneet
>
>


-- 
Regards,
Puneet


Re: Invalid Class Exception

2014-05-27 Thread Suman Somasundar


I am running this on a Solaris machine with logical partitions. All the 
partitions (workers) access the same Spark folder.


Thanks,
Suman.

On 5/23/2014 9:44 PM, Andrew Or wrote:
That means not all of your driver and executors have the same version 
of Spark. Are you on a standalone EC2 cluster? If so, one way to fix 
this is to run the following on the master node:


/root/spark-ec2/copy-dir --delete /root/spark

This syncs all of Spark across your cluster, configs, jars and everything.


2014-05-23 15:20 GMT-07:00 Suman Somasundar 
mailto:suman.somasun...@oracle.com>>:


Hi,

I get the following exception when using Spark to run various
programs.

java.io.InvalidClassException:
org.apache.spark.SerializableWritable; local class incompatible:
stream classdesc serialVersionUID = 6301214776158303468, local
class serialVersionUID = -7785455416944904980
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

s

Re: Invalid Class Exception

2014-05-27 Thread Marcelo Vanzin
On Tue, May 27, 2014 at 1:05 PM, Suman Somasundar
 wrote:
> I am running this on a Solaris machine with logical partitions. All the
> partitions (workers) access the same Spark folder.

Can you check whether you have multiple versions of the offending
class (org.apache.spark.SerializableWritable) in the classpath of your
apps? Maybe you do and different nodes are loading jars in different
order.

> On 5/23/2014 9:44 PM, Andrew Or wrote:
>
> That means not all of your driver and executors have the same version of
> Spark. Are you on a standalone EC2 cluster? If so, one way to fix this is to
> run the following on the master node:
>
> /root/spark-ec2/copy-dir --delete /root/spark
>
> This syncs all of Spark across your cluster, configs, jars and everything.
>
>
> 2014-05-23 15:20 GMT-07:00 Suman Somasundar :
>>
>> Hi,
>>
>> I get the following exception when using Spark to run various programs.
>>
>> java.io.InvalidClassException: org.apache.spark.SerializableWritable;
>> local class incompatible: stream classdesc serialVersionUID =
>> 6301214776158303468, local class serialVersionUID = -7785455416944904980
>> at
>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604)
>> at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
>> at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>> at
>> org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
>> at
>> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:601)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
>> at
>> scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:601)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
>> at
>> scala.collection.immutable

Re: Running Jars on Spark, program just hanging there

2014-05-27 Thread Yana Kadiyska
Does the spark UI show your program running? (http://spark-masterIP:8118).
If the program is listed as running you should be able to see details via
the UI. In my experience there are 3 sets of logs -- the log where you're
running your program (the driver), the log on the master node, and the log
on each executor. The master log often has very useful details when one of
your slave executors has an issue. Then you can go and read the logs on
that machine. Of course, if you have a small number of workers in your
cluster you can just read all the logs. That's just general debugging
advice... (I also find it useful to do rdd.partitions.size before anything
else to check how many partitions the RDD is actually partitioned to...)


On Tue, May 27, 2014 at 2:48 PM, Min Li  wrote:

>  Hi all,
>
> I've a single machine with 8 cores and 8g mem. I've deployed the
> standalone spark on the machine and successfully run the examples.
>
> Now I'm trying to write some simple java codes. I just read a local file
> (23M) into string list and use JavaRDD rdds =
> sparkContext.paralellize() method to get the corresponding rdd. And I asked
> to run rdds.count(). But the program just stopped on the count(). The last
> log info is:
>
> 14/05/27 14:13:16 INFO SparkContext: Starting job: count at RDDTest.java:40
> 14/05/27 14:13:16 INFO DAGScheduler: Got job 0 (count at RDDTest.java:40)
> with 2 output partitions (allowLocal=false)
> 14/05/27 14:13:16 INFO DAGScheduler: Final stage: Stage 0 (count at
> RDDTest.java:40)
> 14/05/27 14:13:16 INFO DAGScheduler: Parents of final stage: List()
> 14/05/27 14:13:16 INFO DAGScheduler: Missing parents: List()
> 14/05/27 14:13:16 INFO DAGScheduler: Submitting Stage 0
> (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37), which has no
> missing parents
> 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Connected to Spark
> cluster with app ID app-20140527141316-0003
> 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor added:
> app-20140527141316-0003/0 on worker-20140526221107-spark-35303
> (spark:35303) with 8 cores
> 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Granted executor ID
> app-20140527141316-0003/0 on hostPort spark:35303 with 8 cores, 1024.0 MB
> RAM
> 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor updated:
> app-20140527141316-0003/0 is now RUNNING
> 14/05/27 14:13:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage
> 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37)
> 14/05/27 14:13:16 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
> 14/05/27 14:13:17 INFO SparkDeploySchedulerBackend: Registered executor:
> Actor[akka.tcp://sparkExecutor@spark:34279/user/Executor#196489168] with
> ID 0
> 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor 0: spark (PROCESS_LOCAL)
> 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:0 as 12993529
> bytes in 127 ms
> 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
> executor 0: spark (PROCESS_LOCAL)
> 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:1 as 13006417
> bytes in 74 ms
> 14/05/27 14:13:17 INFO BlockManagerMasterActor$BlockManagerInfo:
> Registering block manager spark:37617 with 589.2 MB RAM
>
> I tried to figure out what's going on, but just can't. Could any please
> give me some suggestions and point out some possible issues?
>
> Best Regards,
> Min
>


Profiling tasks

2014-05-27 Thread Puneet Lakhina
Hi,

Is it possible to increase the logging to get more details on what exactly
are the tasks doing? I have slow operation for which Im trying to find out
where is the time being spent. The operation is a cogroup() followed by a
count(). In the logs on each worker node all I see is the fetch of map
outputs which are not local.

Thanks,
Puneet

-- 
Regards,
Puneet


Spark 1.0: slf4j version conflicts with pig

2014-05-27 Thread Ryan Compton
I use both Pig and Spark. All my code is built with Maven into a giant
*-jar-with-dependencies.jar. I recently upgraded to Spark 1.0 and now
all my pig scripts fail with:

Caused by: java.lang.RuntimeException: Could not resolve error that
occured when launching map reduce job: java.lang.NoSuchMethodError:
org.slf4j.spi.LocationAwareLogger.log(Lorg/slf4j/Marker;Ljava/lang/String;ILjava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher$JobControlThreadExceptionHandler.uncaughtException(MapReduceLauncher.java:598)
at java.lang.Thread.dispatchUncaughtException(Thread.java:1874)


Did Spark 1.0 change the version of slf4j? I can't seem to find it via
mvn dependency:tree


Re: Spark 1.0: slf4j version conflicts with pig

2014-05-27 Thread Sean Owen
Spark uses 1.7.5, and you should probably see 1.7.{4,5} in use through
Hadoop. But those are compatible.

That method appears to have been around since 1.3. What version does Pig want?

I usually do "mvn -Dverbose dependency:tree" to see both what the
final dependencies are, and what got overwritten, to diagnose things
like this.

My hunch is that something is depending on an old slf4j in your build
and it's overwriting Spark et al.

On Tue, May 27, 2014 at 10:45 PM, Ryan Compton  wrote:
> I use both Pig and Spark. All my code is built with Maven into a giant
> *-jar-with-dependencies.jar. I recently upgraded to Spark 1.0 and now
> all my pig scripts fail with:
>
> Caused by: java.lang.RuntimeException: Could not resolve error that
> occured when launching map reduce job: java.lang.NoSuchMethodError:
> org.slf4j.spi.LocationAwareLogger.log(Lorg/slf4j/Marker;Ljava/lang/String;ILjava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
> at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher$JobControlThreadExceptionHandler.uncaughtException(MapReduceLauncher.java:598)
> at java.lang.Thread.dispatchUncaughtException(Thread.java:1874)
>
>
> Did Spark 1.0 change the version of slf4j? I can't seem to find it via
> mvn dependency:tree


Re: Persist and unpersist

2014-05-27 Thread Ankur Dave
I think what's desired here is for input to be unpersisted automatically as
soon as result is materialized. I don't think there's currently a way to do
this, but the usual workaround is to force result to be materialized
immediately and then unpersist input:

input.cache()val count = input.countval result = input.filter(...)
result.cache().foreach(x => {}) // materialize resultinput.unpersist()
// safe because `result` is materialized  // and is
the only RDD that depends on `input`return result


Ankur 


Java RDD structure for Matrix predict?

2014-05-27 Thread Sandeep Parikh
I've got a trained MatrixFactorizationModel via ALS.train(...) and now I'm
trying to use it to predict some ratings like so:

JavaRDD predictions = model.predict(usersProducts.rdd())

Where usersProducts is built from an existing Ratings dataset like so:

JavaPairRDD usersProducts = testRatings.map(
  new PairFunction() {
public Tuple2 call(Rating r) throws Exception {
  return new Tuple2(r.user(), r.product());
}
  }
);

The problem is that model.predict(...) doesn't like usersProducts, claiming
that the method doesn't accept an RDD of type Tuple2 however the docs show
the method signature as follows:

def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]

Am I missing something? The JavaRDD is just a list of Tuple2 elements,
which would match the method signature but the compile is complaining.

Thanks!


Re: Java RDD structure for Matrix predict?

2014-05-27 Thread giive chen
Hi Sandeep

I think you should use  testRatings.mapToPair instead of  testRatings.map.

So the code should be


JavaPairRDD usersProducts = training.mapToPair(
new PairFunction() {
public Tuple2 call(Rating r) throws
Exception {
return new Tuple2(r.user(),
r.product());
}
}
);

It works on my side.


Wisely Chen


On Wed, May 28, 2014 at 6:27 AM, Sandeep Parikh wrote:

> I've got a trained MatrixFactorizationModel via ALS.train(...) and now I'm
> trying to use it to predict some ratings like so:
>
> JavaRDD predictions = model.predict(usersProducts.rdd())
>
> Where usersProducts is built from an existing Ratings dataset like so:
>
> JavaPairRDD usersProducts = testRatings.map(
>   new PairFunction() {
> public Tuple2 call(Rating r) throws Exception {
>   return new Tuple2(r.user(), r.product());
> }
>   }
> );
>
> The problem is that model.predict(...) doesn't like usersProducts,
> claiming that the method doesn't accept an RDD of type Tuple2 however the
> docs show the method signature as follows:
>
> def predict(usersProducts: RDD[(Int, Int)]): RDD[Rating]
>
> Am I missing something? The JavaRDD is just a list of Tuple2 elements,
> which would match the method signature but the compile is complaining.
>
> Thanks!
>
>


Re: K-nearest neighbors search in Spark

2014-05-27 Thread Krishna Sankar
Carter,
   Just as a quick & simple starting point for Spark. (caveats - lots of
improvements reqd for scaling, graceful and efficient handling of RDD et
al):

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import scala.collection.immutable.ListMap

import scala.collection.immutable.SortedMap

object TopK {

  //

  def getCurrentDirectory = new java.io.File( "." ).getCanonicalPath

  //

  def distance(x1:List[Int],x2:List[Int]):Double = {

val dist:Double = math.sqrt(math.pow(x1(1)-x2(1),2) + math.pow(x1(2)-x2(
2),2))

dist

  }

  //

  def main(args: Array[String]): Unit = {

//

println(getCurrentDirectory)

val sc = new SparkContext("local","TopK",
"spark://USS-Defiant.local:7077")

println(s"Running Spark Version ${sc.version}")

val file = sc.textFile("data01.csv")

//

val data = file

  .map(line => line.split(","))

  .map(x1 => List(x1(0).toInt,x1(1).toInt,x1(2).toInt))

//val data1 = data.collect

println("data")

for (d <- data) {

  println(d)

  println(d(0))

}

//

val distList = for (d <- data) yield {d(0)}

//for (d <- distList) (println(d))

val zipList = for (a <- distList.collect; b <- distList.collect)
yield{ List(
a,b)}

zipList.foreach(println(_))

//

val dist = for (l <- zipList) yield {

  println(s"${l(0)} = ${l(1)}")

  val x1a:Array[List[Int]] = data.filter(d => d(0) == l(0)).collect

  val x2a:Array[List[Int]] = data.filter(d => d(0) == l(1)).collect

  val x1:List[Int] = x1a(0)

  val x2:List[Int] = x2a(0)

  val dist = distance(x1,x2)

  Map ( dist -> l )

  }

dist.foreach(println(_)) // sort this for topK

//

  }

}

data01.csv

1,68,93

2,12,90

3,45,76

4,86,54

HTH.

Cheers



On Tue, May 27, 2014 at 4:10 AM, Carter  wrote:

> Any suggestion is very much appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/K-nearest-neighbors-search-in-Spark-tp6393p6421.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Spark Memory Bounds

2014-05-27 Thread Keith Simmons
I'm trying to determine how to bound my memory use in a job working with
more data than can simultaneously fit in RAM.  From reading the tuning
guide, my impression is that Spark's memory usage is roughly the following:

(A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
used by all currently running tasks

I can bound A with spark.storage.memoryFraction and I can bound B with
spark.shuffle.memoryFraction.
 I'm wondering how to bound C.

It's been hinted at a few times on this mailing list that you can reduce
memory use by increasing the number of partitions.  That leads me to
believe that the amount of transient memory is roughly follows:

total_data_set_size/number_of_partitions *
number_of_tasks_simultaneously_running_per_machine

Does this sound right?  In other words, as I increase the number of
partitions, the size of each partition will decrease, and since each task
is processing a single partition and there are a bounded number of tasks in
flight, my memory use has a rough upper limit.

Keith


Re: Re: spark table to hive table

2014-05-27 Thread JaeBoo Jung
Title: Samsung Enterprise Portal mySingle


I already tried HiveContext as well as SqlContext.
But it seems that Spark's HiveContext is not completely same as Apache Hive.
For example, SQL like 'SELECT RANK() OVER(ORDER BY VAL1 ASC) FROM TEST LIMIT 10' works fine in Apache Hive,
but Spark's Hive Context has an error.
That's why I want to use Shark or Apache Hive in special cases.
The followings are error message on Spark.

java.lang.RuntimeException:Unsupported language features in query: SELECT RANK() OVER(order by val1) FROM TEST LIMIT 10TOK_QUERY  TOK_FROM    TOK_TABREF  TOK_TABNAME    TEST  TOK_INSERT    TOK_DESTINATION  TOK_DIR    TOK_TMP_FILE    TOK_SELECT  TOK_SELEXPR    TOK_FUNCTION  RANK  TOK_WINDOWSPEC    TOK_PARTITIONINGSPEC  TOK_ORDERBY    TOK_TABSORTCOLNAMEASC  TOK_TABLE_OR_COL    v1    TOK_LIMIT  10
    at scala.sys.package$.error(package.scala:27)    at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:236)    at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:81)    at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90)    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:18)    at $iwC$$iwC$$iwC$$iwC$$iwC.(:23)    at $iwC$$iwC$$iwC$$iwC.(:25)    at $iwC$$iwC$$iwC.(:27)    at $iwC$$iwC.(:29)    at $iwC.(:31)    at (:33)    at .(:37)    at .()    at .(:7)    at .()    at $print()    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:601)    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:983)    at org.apache.spark.repl.Main$.main(Main.scala:31)    at org.apache.spark.repl.Main.main(Main.scala)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:601)    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 

 
--- Original Message ---
Sender : John Omernik
Date : 2014-05-27 19:28 (GMT+09:00)
Title : Re: spark table to hive table
 
Did you try the Hive Context? Look under Hive Support here: 

http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html 



On Tue, May 27, 2014 at 2:09 AM, 정재부  wrote: 


Hi all,
 
I'm trying to compare functions available in Spark1.0 hql to original HiveQL.
But, when I tested functions such as 'rank', Spark didn't support some HiveQL functions.
In case of Shark, it supports functions as well as Hive so I want to convert parquet file, Spark SQL table to Hive Table and analyze it with Shark.
Is there any way to do this?
 
Thanks,
Kevin
_ 
Kevin Jung Assistant Engineer / BDA Lab  T +82-2-6155-8349 M +82-10-9288-1984 F +82-2-6155-0251 E itsjb.j...@samsung.com
 


 
 


 

AMPCamp Training materials are broken due to overwritten AMIs?

2014-05-27 Thread Toshinari Kureha
Hi,

Has anyone had luck going through previous archives of the AMPCamp
exercises?  Many of the archived bootcamps seem to be broken due to the
fact that it references the same AMIs that is constantly being updated,
which means that it is no longer compatible with the old bootcamp
instructions or the surrounding scripts.

I'm not sure why they don't create separate AMIs so that it doesn't get
overwritten.  Their naming convention seem to indicate that was their
intention, but all of them refer to "
http://s3.amazonaws.com/ampcamp-amis/latest-ampcamp3";

Why do I want to use previous bootcamp?  Beyond the fact that they cover
slightly different materials, it looks like the latest one is yet again
broken due to changes introduced in the AMIs (specifically the MLlib
exercise).

Has anyone else has similar issues?

-Toshi


Re: Spark Memory Bounds

2014-05-27 Thread Christopher Nguyen
Keith, do you mean "bound" as in (a) strictly control to some quantifiable
limit, or (b) try to minimize the amount used by each task?

If "a", then that is outside the scope of Spark's memory management, which
you should think of as an application-level (that is, above JVM) mechanism.
In this scope, Spark "voluntarily" tracks and limits the amount of memory
it uses for explicitly known data structures, such as RDDs. What Spark
cannot do is, e.g., control or manage the amount of JVM memory that a given
piece of user code might take up. For example, I might write some closure
code that allocates a large array of doubles unbeknownst to Spark.

If "b", then your thinking is in the right direction, although quite
imperfect, because of things like the example above. We often experience
OOME if we're not careful with job partitioning. What I think Spark needs
to evolve to is at least to include a mechanism for application-level hints
about task memory requirements. We might work on this and submit a PR for
it.

--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Tue, May 27, 2014 at 5:33 PM, Keith Simmons  wrote:

> I'm trying to determine how to bound my memory use in a job working with
> more data than can simultaneously fit in RAM.  From reading the tuning
> guide, my impression is that Spark's memory usage is roughly the following:
>
> (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
> used by all currently running tasks
>
> I can bound A with spark.storage.memoryFraction and I can bound B with 
> spark.shuffle.memoryFraction.
>  I'm wondering how to bound C.
>
> It's been hinted at a few times on this mailing list that you can reduce
> memory use by increasing the number of partitions.  That leads me to
> believe that the amount of transient memory is roughly follows:
>
> total_data_set_size/number_of_partitions *
> number_of_tasks_simultaneously_running_per_machine
>
> Does this sound right?  In other words, as I increase the number of
> partitions, the size of each partition will decrease, and since each task
> is processing a single partition and there are a bounded number of tasks in
> flight, my memory use has a rough upper limit.
>
> Keith
>


Re: Spark Memory Bounds

2014-05-27 Thread Keith Simmons
A dash of both.  I want to know enough that I can "reason about", rather
than "strictly control", the amount of memory Spark will use.  If I have a
big data set, I want to understand how I can design it so that Spark's
memory consumption falls below my available resources.  Or alternatively,
if it's even possible for Spark to process a data set over a certain size.
 And if I run into memory problems, I want to know which knobs to turn, and
how turning those knobs will affect memory consumption.

It's my understanding that between certain key stages in a Spark DAG (i.e.
group by stages), Spark will serialize all data structures necessary to
continue the computation at the next stage, including closures.  So in
theory, per machine, Spark only needs to hold the transient memory required
to process the partitions assigned to the currently active tasks.  Is my
understanding correct?  Specifically, once a key/value pair is serialized
in the shuffle stage of a task, are the references to the raw java objects
released before the next task is started.



On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen  wrote:

> Keith, do you mean "bound" as in (a) strictly control to some quantifiable
> limit, or (b) try to minimize the amount used by each task?
>
> If "a", then that is outside the scope of Spark's memory management, which
> you should think of as an application-level (that is, above JVM) mechanism.
> In this scope, Spark "voluntarily" tracks and limits the amount of memory
> it uses for explicitly known data structures, such as RDDs. What Spark
> cannot do is, e.g., control or manage the amount of JVM memory that a given
> piece of user code might take up. For example, I might write some closure
> code that allocates a large array of doubles unbeknownst to Spark.
>
> If "b", then your thinking is in the right direction, although quite
> imperfect, because of things like the example above. We often experience
> OOME if we're not careful with job partitioning. What I think Spark needs
> to evolve to is at least to include a mechanism for application-level hints
> about task memory requirements. We might work on this and submit a PR for
> it.
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao 
> linkedin.com/in/ctnguyen
>
>
>
> On Tue, May 27, 2014 at 5:33 PM, Keith Simmons  wrote:
>
>> I'm trying to determine how to bound my memory use in a job working with
>> more data than can simultaneously fit in RAM.  From reading the tuning
>> guide, my impression is that Spark's memory usage is roughly the following:
>>
>> (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
>> used by all currently running tasks
>>
>> I can bound A with spark.storage.memoryFraction and I can bound B with 
>> spark.shuffle.memoryFraction.
>>  I'm wondering how to bound C.
>>
>> It's been hinted at a few times on this mailing list that you can reduce
>> memory use by increasing the number of partitions.  That leads me to
>> believe that the amount of transient memory is roughly follows:
>>
>> total_data_set_size/number_of_partitions *
>> number_of_tasks_simultaneously_running_per_machine
>>
>> Does this sound right?  In other words, as I increase the number of
>> partitions, the size of each partition will decrease, and since each task
>> is processing a single partition and there are a bounded number of tasks in
>> flight, my memory use has a rough upper limit.
>>
>> Keith
>>
>
>