Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY

2014-07-27 Thread lokesh.gidra
Thanks a lot for clarifying this. This explains why there is less
serialization happening with lesser parallelism. There would be less network
communication, and hence less serialization right?

But then if we compare 100 cores in local mode vs. 10 nodes of 10 cores each
in standalone mode, then am I seeing huge improvement in the standalone mode
as compared to local mode? Is the amount of network communication in both
the cases not same?


Thanks,
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10739.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: SparkSQL extensions

2014-07-27 Thread Christos Kozanitis
Thanks Michael for the recommendations. Actually the region-join (or I
could name it range-join or interval-join) that I was thinking should join
the entries of two tables with inequality predicates. For example if table
A(col1 int, col2 int) contains entries (1,4) and (10,12) and table b(c1
int, c2 int) contains entries (3,6) and (43,23) then the region-join of A,
B on (col1 < c1 and c2 < col2) should produce the tuple(1,4,3,6).

Does it make sense?

Actually there is a JIRA on a similar topic for Hive here:
https://issues.apache.org/jira/browse/HIVE-556

Also ADAM implements region-joins here:
https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RegionJoin.scala

I was thinking to provide an improved version of method "partitionAndJoin"
from the ADAM implementation above



On Sat, Jul 26, 2014 at 12:37 PM, Michael Armbrust 
wrote:

> A very simple example of adding a new operator to Spark SQL:
> https://github.com/apache/spark/pull/1366
> An example of adding a new type of join to Spark SQL:
> https://github.com/apache/spark/pull/837
>
> Basically, you will need to add a new physical operator that inherits from
> SparkPlan and a Strategy that causes the query planner to select it.  Maybe
> you can explain a little more what you mean by region-join?  If its only a
> different algorithm, and not a logically different type of join, then you
> will not need to make some of he logical modifications that the second PR
> did.
>
> Often the hardest part here is going to be figuring out when to use one
> join over another.  Right now the rules are pretty straightforward: The
> joins that are picked first are the most efficient but only handle certain
> cases (inner joins with equality predicates).  When that is not the case it
> falls back on slower, but more general operators.  If there are more subtle
> trade offs involved then we may need to wait until we have more statistics
> to help us make the choice.
>
> I'd suggest opening a JIRA and proposing a design before going too far.
>
> Michael
>
>
> On Sat, Jul 26, 2014 at 3:32 AM, Christos Kozanitis <
> kozani...@berkeley.edu> wrote:
>
>> Hello
>>
>> I was wondering is it easy for you guys to point me to what modules I
>> need to update if I had to add extra functionality to sparkSQL?
>>
>> I was thinking to implement a region-join operator and I guess I should
>> add the implementation details under joins.scala but what else do I need to
>> modify?
>>
>> thanks
>> Christos
>>
>
>


Re: akka 2.3.x?

2014-07-27 Thread yardena
Thanks Matei! 

I looked at the pull request, but we are not yet ready to move to Scala 2.11
and at this point prefer to upgrade only Akka, so I filed
https://issues.apache.org/jira/browse/SPARK-2707 as a separate issue.

  Yardena



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/akka-2-3-x-tp10513p10741.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY

2014-07-27 Thread Aaron Davidson
What are you comparing in your last experiment? Time spent in writeObject0
on a 100-core machine (!) vs a cluster?


On Sat, Jul 26, 2014 at 11:59 PM, lokesh.gidra 
wrote:

> Thanks a lot for clarifying this. This explains why there is less
> serialization happening with lesser parallelism. There would be less
> network
> communication, and hence less serialization right?
>
> But then if we compare 100 cores in local mode vs. 10 nodes of 10 cores
> each
> in standalone mode, then am I seeing huge improvement in the standalone
> mode
> as compared to local mode? Is the amount of network communication in both
> the cases not same?
>
>
> Thanks,
> Lokesh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10739.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY

2014-07-27 Thread lokesh.gidra
I am comparing the total time spent in finishing the job. And What I am
comparing, to be precise, is on a 48-core machine. I am comparing the
performance of local[48] vs. standalone mode with 8 nodes of 6 cores each
(totalling 48 cores) on localhost. In this comparison, the standalone mode
outperforms local[48] substantially. When I did some troublshooting using
oprofile, I found that local[48] was spending much more time in writeObject0
as compared to standalone mode.

I am running the PageRank example provided in the package. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY

2014-07-27 Thread Aaron Davidson
I see. There should not be a significant algorithmic difference between
those two cases, as far as I can think, but there is a good bit of
"local-mode-only" logic in Spark.

One typical problem we see on large-heap, many-core JVMs, though, is much
more time spent in garbage collection. I'm not sure how oprofile gathers
its statistics, but it's possible the stop-the-world pauses just appear as
pausing inside regular methods. You could see if this is happening by
adding "-XX:+PrintGCDetails" to spark.executor.extraJavaOptions (in
spark-defaults.conf) and --driver-java-options (as a command-line
argument), and then examining the stdout logs.


On Sun, Jul 27, 2014 at 10:29 AM, lokesh.gidra 
wrote:

> I am comparing the total time spent in finishing the job. And What I am
> comparing, to be precise, is on a 48-core machine. I am comparing the
> performance of local[48] vs. standalone mode with 8 nodes of 6 cores each
> (totalling 48 cores) on localhost. In this comparison, the standalone mode
> outperforms local[48] substantially. When I did some troublshooting using
> oprofile, I found that local[48] was spending much more time in
> writeObject0
> as compared to standalone mode.
>
> I am running the PageRank example provided in the package.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10743.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


MLlib NNLS implementation is buggy, returning wrong solutions

2014-07-27 Thread Aureliano Buendia
Hi,

The recently added NNLS implementation in MLlib returns wrong solutions.
This is not data specific, just try any data in R's nnls, and then the same
data in MLlib's NNLS. The results are very different.

Also, the elected algorithm Polyak(1969) is not the best one around. The
most popular one is Lawson-Hanson (1974):

http://en.wikipedia.org/wiki/Non-negative_least_squares#Algorithms


Re: SparkSQL extensions

2014-07-27 Thread Michael Armbrust
Ah, I understand now.  That sounds pretty useful and is something we would
currently plan very inefficiently.


On Sun, Jul 27, 2014 at 1:07 AM, Christos Kozanitis 
wrote:

> Thanks Michael for the recommendations. Actually the region-join (or I
> could name it range-join or interval-join) that I was thinking should join
> the entries of two tables with inequality predicates. For example if table
> A(col1 int, col2 int) contains entries (1,4) and (10,12) and table b(c1
> int, c2 int) contains entries (3,6) and (43,23) then the region-join of A,
> B on (col1 < c1 and c2 < col2) should produce the tuple(1,4,3,6).
>
> Does it make sense?
>
> Actually there is a JIRA on a similar topic for Hive here:
> https://issues.apache.org/jira/browse/HIVE-556
>
> Also ADAM implements region-joins here:
> https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RegionJoin.scala
>
> I was thinking to provide an improved version of method "partitionAndJoin"
> from the ADAM implementation above
>
>
>
> On Sat, Jul 26, 2014 at 12:37 PM, Michael Armbrust  > wrote:
>
>> A very simple example of adding a new operator to Spark SQL:
>> https://github.com/apache/spark/pull/1366
>> An example of adding a new type of join to Spark SQL:
>> https://github.com/apache/spark/pull/837
>>
>> Basically, you will need to add a new physical operator that inherits
>> from SparkPlan and a Strategy that causes the query planner to select it.
>>  Maybe you can explain a little more what you mean by region-join?  If its
>> only a different algorithm, and not a logically different type of join,
>> then you will not need to make some of he logical modifications that the
>> second PR did.
>>
>> Often the hardest part here is going to be figuring out when to use one
>> join over another.  Right now the rules are pretty straightforward: The
>> joins that are picked first are the most efficient but only handle certain
>> cases (inner joins with equality predicates).  When that is not the case it
>> falls back on slower, but more general operators.  If there are more subtle
>> trade offs involved then we may need to wait until we have more statistics
>> to help us make the choice.
>>
>> I'd suggest opening a JIRA and proposing a design before going too far.
>>
>> Michael
>>
>>
>> On Sat, Jul 26, 2014 at 3:32 AM, Christos Kozanitis <
>> kozani...@berkeley.edu> wrote:
>>
>>> Hello
>>>
>>> I was wondering is it easy for you guys to point me to what modules I
>>> need to update if I had to add extra functionality to sparkSQL?
>>>
>>> I was thinking to implement a region-join operator and I guess I should
>>> add the implementation details under joins.scala but what else do I need to
>>> modify?
>>>
>>> thanks
>>> Christos
>>>
>>
>>
>


Re: MLlib NNLS implementation is buggy, returning wrong solutions

2014-07-27 Thread DB Tsai
Could you help to provide a test case to verify this issue and open a JIRA
to track this? Also, are you interested in submit a PR to fix it? Thanks.

Sent from my Google Nexus 5
On Jul 27, 2014 11:07 AM, "Aureliano Buendia"  wrote:

> Hi,
>
> The recently added NNLS implementation in MLlib returns wrong solutions.
> This is not data specific, just try any data in R's nnls, and then the same
> data in MLlib's NNLS. The results are very different.
>
> Also, the elected algorithm Polyak(1969) is not the best one around. The
> most popular one is Lawson-Hanson (1974):
>
> http://en.wikipedia.org/wiki/Non-negative_least_squares#Algorithms
>
>
>


Re: Spark MLlib vs BIDMach Benchmark

2014-07-27 Thread Ameet Talwalkar
To add to the last point, multimodel training is something we've explored
as part of the MLbase Optimizer, and we've seen some nice speedups.  This
feature will be added to MLlib soon (not sure if it'll make it into the 1.1
release though).


On Sat, Jul 26, 2014 at 11:27 PM, Matei Zaharia 
wrote:

> BTW I should add that one other thing that would help MLlib locally would
> be doing model updates in batches. That is, instead of operating on one
> point at a time, group together a bunch of them and apply a matrix
> operation, which will allow more efficient use of BLAS or other linear
> algebra primitives. We don't do a lot of this yet, but there was a project
> in the AMPLab to do more of it. Multiple models can also be trained
> simultaneously with this approach.
>
> On July 26, 2014 at 11:21:17 PM, Matei Zaharia (matei.zaha...@gmail.com)
> wrote:
>
>  These numbers are from GPUs and Intel MKL (a closed-source math library
> for Intel processors), where for CPU-bound algorithms you are going to get
> faster speeds than MLlib's JBLAS. However, there's in theory nothing
> preventing the use of these in MLlib (e.g. if you have a faster BLAS
> locally; adding a GPU-based one would probably require bigger code changes).
>
>  Some of the numbers there are also from more naive implementations of
> K-means and logistic regression in the Spark research paper, which include
> the fairly expensive cost of reading the data out of HDFS.
>
> On July 26, 2014 at 8:31:11 PM, DB Tsai (dbt...@dbtsai.com) wrote:
>
>  BIDMach is CPU and GPU-accelerated Machine Learning Library also from
> Berkeley.
>
> https://github.com/BIDData/BIDMach/wiki/Benchmarks
>
> They did benchmark against Spark 0.9, and they claimed that it's
> significantly faster than Spark MLlib. In Spark 1.0, lot of
> performance optimization had been done, and sparse data is supported.
> It will be interesting to see new benchmark result.
>
> Anyone familiar with BIDMach? Are they as fast as they claim?
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>


Re: "Spilling in-memory..." messages in log even with MEMORY_ONLY

2014-07-27 Thread lokesh.gidra
I have confirmed, it is not GC related. Oprofile shows stop-the-world 
pauses separately from the regular java methods.

However, I was wrong when I said that the amount of time spent in 
writeObject0 is much more in local[n] mode as compared to standalone mode.

It is instead hashCode function. The time spent in fetching the hash 
code of java.lang.Object is almost 100 times more in local[n] as 
compared to standalone mode. To be precise, the exact function is 
JVM_IHashCode, which is called when hashCode function of 
java.lang.Object is called.

So now the question is, is there any possible reason why there would be 
large number of invocations of hashCode in local[n] mode as compared to 
standalone? Is there something related to hash tables?

On 07/27/2014 07:56 PM, Aaron Davidson [via Apache Spark User List] wrote:
> I see. There should not be a significant algorithmic difference 
> between those two cases, as far as I can think, but there is a good 
> bit of "local-mode-only" logic in Spark.
>
> One typical problem we see on large-heap, many-core JVMs, though, is 
> much more time spent in garbage collection. I'm not sure how oprofile 
> gathers its statistics, but it's possible the stop-the-world pauses 
> just appear as pausing inside regular methods. You could see if this 
> is happening by adding "-XX:+PrintGCDetails" 
> to spark.executor.extraJavaOptions (in spark-defaults.conf) 
> and --driver-java-options (as a command-line argument), and then 
> examining the stdout logs.
>
>
> On Sun, Jul 27, 2014 at 10:29 AM, lokesh.gidra <[hidden email] 
> > wrote:
>
> I am comparing the total time spent in finishing the job. And What
> I am
> comparing, to be precise, is on a 48-core machine. I am comparing the
> performance of local[48] vs. standalone mode with 8 nodes of 6
> cores each
> (totalling 48 cores) on localhost. In this comparison, the
> standalone mode
> outperforms local[48] substantially. When I did some
> troublshooting using
> oprofile, I found that local[48] was spending much more time in
> writeObject0
> as compared to standalone mode.
>
> I am running the PageRank example provided in the package.
>
>
>
> --
> View this message in context:
> 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10743.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
>
>
>
> 
> If you reply to this email, your message will be added to the 
> discussion below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10744.html
>  
>
> To unsubscribe from "Spilling in-memory..." messages in log even with 
> MEMORY_ONLY, click here 
> .
> NAML 
> 
>  
>





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Maximum jobs finish very soon, some of them take longer time.

2014-07-27 Thread Sarthak Dash
Hello everyone,

I am trying out Spark for the first time, and after a month of work - I am
stuck with an issue. I have a very simple program that, given a directed
Graph with nodes/edges parameters and a particular node, tries to figure out
all the siblings(in the traditional sense) of the given node.

Right now, I have 1200 partitions, and I see that while most of the tasks(on
an average 1190-1195) tasks finish within 500 ms, a few tasks (5-10 of them)
take about 1-2 seconds to finish. I am aiming for a scenario wherein all the
tasks finish under a second, and hence trying to figure out why a few
tasks(5-10 of them) take longer time to complete as opposed to the remaining
(1190-1195) tasks ?

Also, Please let me know whether its possible to change some settings, so as
to achieve my target scenario ?
Any help would be much appreciated.

My configurations:
1. Tried with both FAIR/FIFO scheduler.
2. Tried playing around with Spark.locality.wait settings. Currently I have
a max scheduler delay of 300 ms.
3. Version: Apache Spark 1.0.0 on a 50 node cluster, 14GB each RAM, 8
cores/node.

Thanks,
Sarthak



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Maximum-jobs-finish-very-soon-some-of-them-take-longer-time-tp10750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KMeans: expensiveness of large vectors

2014-07-27 Thread Xiangrui Meng
If you have an m-by-n dataset and train a k-means model with k, the
cost for each iteration is

O(m * n * k) (assuming dense data)

Since m * n * k == n * m * k, so ideally you would expect the same run
time. However,

1. Communication. We need to broadcast current centers (m * k), do the
computation, and then collect the average centers from each partition
(m * k). Having a large n (#cols) will increase the communication
cost.

2. Load. How many partitions did you use? If there are 10k rows, maybe
the rows are distributed well. But if there are only 1000 rows, you
may have, for example, 400 rows on a single partition and then 200
rows on the other three. Then the run time is determined by the
largest partition size.

Hopefully these could explain your observation.

Best,
Xiangru

On Thu, Jul 24, 2014 at 2:30 PM, durin  wrote:
> As a source, I have a textfile with n rows that each contain m
> comma-separated integers.
> Each row is then converted into a feature vector with m features each.
>
> I've noticed, that given the same total filesize and number of features, a
> larger number of columns is much more expensive for training a KMeans model
> than a large number of rows.
>
> To give an example:
> 10k rows X 1k columns took 21 seconds on my cluster, whereas 1k rows X 10k
> colums took 1min47s. Both files had a size of 238M.
>
> Can someone explain what in the implementation of KMeans causes large
> vectors to be so much more expensive than having many of these vectors?
> A pointer to the exact part of the source would be fantastic, but even a
> general explanation would help me.
>
>
> Best regards,
> Simon
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Kmeans: set initial centers explicitly

2014-07-27 Thread Xiangrui Meng
I think this is nice to have. Feel free to create a JIRA for it and it
would be great if you can send a PR. Thanks! -Xiangrui

On Thu, Jul 24, 2014 at 12:39 PM, SK  wrote:
>
> Hi,
>
> The mllib.clustering.kmeans implementation supports a random or parallel
> initialization mode to pick the initial centers. is there a way to specify
> the initial centers explictly? It would be useful to have a setCenters()
> method where we can explicitly specify the initial centers. (For e.g. R
> allows us to specify the initial centers.)
>
> thanks
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-set-initial-centers-explicitly-tp10609.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark as a application library vs infra

2014-07-27 Thread Mayur Rustagi
Based on some discussions with my application users, I have been trying to come 
up with a standard way to deploy applications built on Spark

1. Bundle the version of spark with your application and ask users store it in 
hdfs before referring it in yarn to boot your application
2. Provide ways to manage dependency in your app across various versions of 
spark bundled in with Hadoop distributions 

1 provides greater control and reliability as I am only working against yarn 
versions and dependencies, I assume 2 gives me some benefits of distribution 
versions of spark (easier management, common sysops tools ?? ) . 
I was wondering if anyone has thoughts around both and any reasons to prefer 
one over the other. 

Sent from my iPad

RE: Strange exception on coalesce()

2014-07-27 Thread innowireless TaeYun Kim
Thank you. It works.
(I've applied the changed source code to my local 1.0.0 source)

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Friday, July 25, 2014 11:47 PM
To: user@spark.apache.org
Subject: Re: Strange exception on coalesce()

I'm pretty sure this was already fixed last week in SPARK-2414:
https://github.com/apache/spark/commit/7c23c0dc3ed721c95690fc49f435d9de6952523c

On Fri, Jul 25, 2014 at 1:34 PM, innowireless TaeYun Kim 
 wrote:
> Hi,
> I'm using Spark 1.0.0.
>
> On filter() - map() - coalesce() - saveAsText() sequence, the 
> following exception is thrown.
>
> Exception in thread "main" java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at
> org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:270)
> at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:337)
> at
> org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:83)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
> at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1086)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunct
> ions.s
> cala:788)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunction
> s.scal
> a:674)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunction
> s.scal
> a:593)
> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
> at
> org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike
> .scala
> :436)
> at 
> org.apache.spark.api.java.JavaRDD.saveAsTextFile(JavaRDD.scala:29)
>
> The partition count of the original rdd is 306.
>
> When the argument of coalesce() is one of 59, 60, 61, 62, 63, the 
> exception above is thrown.
>
> But the argument is one of 50, 55, 58, 64, 65, 80, 100, the exception 
> is not thrown. (I've not tried other values, I think that they will be 
> ok.)
>
> Is there any magic number for the argument of coalesce() ?
>
> Thanks.
>
>



Re: Spark as a application library vs infra

2014-07-27 Thread Krishna Sankar
   - IMHO, #2 is preferred as it could work in any environment (Mesos,
   Standalone et al). While Spark needs HDFS (for any decent distributed
   system) YARN is not required at all - Meson is a lot better.
   - Also managing the app with appropriate bootstrap/deployment framework
   is more flexible across multiple scenarios, topologies et al.
   - What kind of capabilities are you thinking of ? Automatic discovery ?
   Dynamic deployment based on resources available, versions of Hadoop et al ?

Cheers




On Sun, Jul 27, 2014 at 6:32 PM, Mayur Rustagi 
wrote:

> Based on some discussions with my application users, I have been trying to
> come up with a standard way to deploy applications built on Spark
>
> 1. Bundle the version of spark with your application and ask users store
> it in hdfs before referring it in yarn to boot your application
> 2. Provide ways to manage dependency in your app across various versions
> of spark bundled in with Hadoop distributions
>
> 1 provides greater control and reliability as I am only working against
> yarn versions and dependencies, I assume 2 gives me some benefits of
> distribution versions of spark (easier management, common sysops tools ?? )
> .
> I was wondering if anyone has thoughts around both and any reasons to
> prefer one over the other.
>
> Sent from my iPad


Re: Spark as a application library vs infra

2014-07-27 Thread Koert Kuipers
i used to do 1) but couldnt get it to work on yarn and the trend seemed
towards 2) using spark-submit so i gave in

the main promise of 2) is tha you can provide an application that can run
on multiple hadoop and spark versions. however for that to become true
spark needs to address the issue of user-classpath-first being broken. for
example if i want to use a recent version of avro i am out of luck, even if
i bundle it with my app, because an old version could be in classpath in
spark (it is for example on cdh) and i cannot override the classpath
currently.


On Sun, Jul 27, 2014 at 9:32 PM, Mayur Rustagi 
wrote:

> Based on some discussions with my application users, I have been trying to
> come up with a standard way to deploy applications built on Spark
>
> 1. Bundle the version of spark with your application and ask users store
> it in hdfs before referring it in yarn to boot your application
> 2. Provide ways to manage dependency in your app across various versions
> of spark bundled in with Hadoop distributions
>
> 1 provides greater control and reliability as I am only working against
> yarn versions and dependencies, I assume 2 gives me some benefits of
> distribution versions of spark (easier management, common sysops tools ?? )
> .
> I was wondering if anyone has thoughts around both and any reasons to
> prefer one over the other.
>
> Sent from my iPad


Re: Spark as a application library vs infra

2014-07-27 Thread Tobias Pfeiffer
Mayur,

I don't know if I exactly understand the context of what you are asking,
but let me just mention issues I had with deploying.

* As my application is a streaming application, it doesn't read any files
from disk, so therefore I have no Hadoop/HDFS in place and I there is no
need for it, either. There should be no dependency on Hadoop or HDFS, since
you can perfectly run Spark applications without it.
* I use Mesos and so far I always had the downloaded Spark distribution
accessible for all machines (e.g., via HTTP) and then added my application
code by uploading a jar built with `sbt assembly`. As the Spark code itself
must not be contained in that jar file, I had to add '% "provided"' in the
sbt file, which in turn prevented me from running the application locally
from IntelliJ IDEA (it would not find the libraries marked with
"provided"), I always had to use `sbt run`.
* When using Mesos, on the Spark slaves the Spark jar is loaded before the
application jar, and so the log4j file from the Spark jar is used instead
of my custom one (that is different when running locally), so I had to edit
that file in the Spark distribution jar to customize logging of my Spark
nodes.

I wonder if the two latter problems would vanish if the Spark libraries
were bundled together with the application. (That would be your approach
#1, I guess.)

Tobias


spark checkpoint details

2014-07-27 Thread Madabhattula Rajesh Kumar
Hi Team,

Could you please help me on below query.

I'm using JavaStreamingContext to read streaming files from hdfs shared
directory. When i start spark streaming job it is reading files from hdfs
shared directory and doing some process. When i stop and restart the job it
is again reading old files. Is there any way to maintain checkpoint files
information in spark?


Re: Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode

2014-07-27 Thread Jianshi Huang
Hi Andrew,

Thanks for the reply, I figured out the cause of the issue. Some resource
files were missing in JARs. A class initialization depends on the resource
files so it got that exception.

I appended the resource files explicitly to --jars option and it worked
fine.

The "Caused by..." messages were found in yarn logs actually, I think it
might be useful if I can seem them from the console which runs
spark-submit. Would that be possible?

Jianshi



On Sat, Jul 26, 2014 at 7:08 AM, Andrew Lee  wrote:

> Hi Jianshi,
>
> Could you provide which HBase version you're using?
>
> By the way, a quick sanity check on whether the Workers can access HBase?
>
> Were you able to manually write one record to HBase with the serialize
> function? Hardcode and test it ?
>
> --
> From: jianshi.hu...@gmail.com
> Date: Fri, 25 Jul 2014 15:12:18 +0800
> Subject: Re: Need help, got java.lang.ExceptionInInitializerError in
> Yarn-Client/Cluster mode
> To: user@spark.apache.org
>
>
> I nailed it down to a union operation, here's my code snippet:
>
> val properties: RDD[((String, String, String),
> Externalizer[KeyValue])] = vertices.map { ve =>
>   val (vertices, dsName) = ve
>   val rval = GraphConfig.getRval(datasetConf, Constants.VERTICES,
> dsName)
>   val (_, rvalAsc, rvalType) = rval
>
>   println(s"Table name: $dsName, Rval: $rval")
>   println(vertices.toDebugString)
>
>   vertices.map { v =>
> val rk = appendHash(boxId(v.id)).getBytes
> val cf = PROP_BYTES
> val cq = boxRval(v.rval, rvalAsc, rvalType).getBytes
> val value = Serializer.serialize(v.properties)
>
> ((new String(rk), new String(cf), new String(cq)),
>   Externalizer(put(rk, cf, cq, value)))
>   }
> }.reduce(_.union(_)).sortByKey(numPartitions = 32)
>
> Basically I read data from multiple tables (Seq[RDD[(key, value)]]) and
> they're transformed to the a KeyValue to be insert in HBase, so I need to
> do a .reduce(_.union(_)) to combine them into one RDD[(key, value)].
>
> I cannot see what's wrong in my code.
>
> Jianshi
>
>
>
> On Fri, Jul 25, 2014 at 12:24 PM, Jianshi Huang 
> wrote:
>
> I can successfully run my code in local mode using spark-submit (--master
> local[4]), but I got ExceptionInInitializerError errors in Yarn-client mode.
>
> Any hints what is the problem? Is it a closure serialization problem? How
> can I debug it? Your answers would be very helpful.
>
> 14/07/25 11:48:14 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.ExceptionInInitializerError
> java.lang.ExceptionInInitializerError
> at
> com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
> a:40)
> at
> com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
> a:36)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1016)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


subscribe

2014-07-27 Thread James Todd



Re: Spark as a application library vs infra

2014-07-27 Thread Sandy Ryza
At Cloudera we recommend bundling your application separately from the
Spark libraries.  The two biggest reasons are:
* No need to modify your application jar when upgrading or applying a patch.
* When running on YARN, the Spark jar can be cached as a YARN local
resource, meaning it doesn't need to be transferred every time.



On Sun, Jul 27, 2014 at 8:52 PM, Tobias Pfeiffer  wrote:

> Mayur,
>
> I don't know if I exactly understand the context of what you are asking,
> but let me just mention issues I had with deploying.
>
> * As my application is a streaming application, it doesn't read any files
> from disk, so therefore I have no Hadoop/HDFS in place and I there is no
> need for it, either. There should be no dependency on Hadoop or HDFS, since
> you can perfectly run Spark applications without it.
> * I use Mesos and so far I always had the downloaded Spark distribution
> accessible for all machines (e.g., via HTTP) and then added my application
> code by uploading a jar built with `sbt assembly`. As the Spark code itself
> must not be contained in that jar file, I had to add '% "provided"' in the
> sbt file, which in turn prevented me from running the application locally
> from IntelliJ IDEA (it would not find the libraries marked with
> "provided"), I always had to use `sbt run`.
> * When using Mesos, on the Spark slaves the Spark jar is loaded before the
> application jar, and so the log4j file from the Spark jar is used instead
> of my custom one (that is different when running locally), so I had to edit
> that file in the Spark distribution jar to customize logging of my Spark
> nodes.
>
> I wonder if the two latter problems would vanish if the Spark libraries
> were bundled together with the application. (That would be your approach
> #1, I guess.)
>
> Tobias
>