Is it necessary to call setID in SparkHadoopWriter.scala

2014-02-24 Thread haosdent
Hi, folks. I read the spark sources, and couldn't understand why we should
call setID(), setConfParams(), commit() in SparkHadoopWriter.scala. I think
we just need create a RecordWriter and write(k, v). Anything I miss?

-- 
Best Regards,
Haosdent Huang


Filter on Date by comparing

2014-02-24 Thread Soumya Simanta
I want to filter a RDD by comparing dates.

myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count


I'm using the JodaTime library but I get an exception about a Jodatime
class not serializable.

Is there a way to configure this or an easier alternative for this problem.


org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)

at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Running GraphX example from Scala REPL

2014-02-24 Thread Soumya Simanta
I'm trying to run the GraphX examples from the Scala REPL. However, it
complains that it cannot find RDD.

:23: error: not found: type RDD

   val users: RDD[(VertexId, (String, String))] =


 I'm using a Feb 3 commit of incubator spark. Should I do anything
differently to build GraphX ? or is there a configuration that I'm missing?


commit 23af00f9e0e5108f62cdb9629e3eb4e54bbaa321

Author: Xiangrui Meng 

Date:   Mon Feb 3 13:02:09 2014 -0800


thanks.

-Soumya


Re: java.io.NotSerializableException Of dependent Java lib.

2014-02-24 Thread yaoxin
Does this means that every class I used in Spark must be serializable? Even
the class that I dependent on?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-Of-dependent-Java-lib-tp1973p2006.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Filter on Date by comparing

2014-02-24 Thread Andrew Ash
This is because Joda's DateTimeFormatter is not serializable (doesn't
implement the empty Serializable interface)
http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html

One ugly thing I've done before is to instantiate a new DateTimeFormatter
in every line, so like this:

myRDD.filter(x =>
DateTimeFormat.forPattern("-mm-dd").parseString(x.getCreatedAt).isAfter(start)
).count

It's very inefficient but it gets things closer to working.

Another thing to try is to switch to using Kryo serialization instead of
the default Java serialization, which I think did handle DTF formatting
correctly.  Back in 0.7.x days though, there was an issue where some of the
Joda libraries wouldn't correctly serialize with Kryo, but I think that's
since been fixed:
https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU

HTH,
Andrew


On Mon, Feb 24, 2014 at 6:57 PM, Soumya Simanta wrote:

> I want to filter a RDD by comparing dates.
>
> myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count
>
>
> I'm using the JodaTime library but I get an exception about a Jodatime
> class not serializable.
>
> Is there a way to configure this or an easier alternative for this
> problem.
>
>
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>


Re: Filter on Date by comparing

2014-02-24 Thread Ewen Cheslack-Postava

Or use RDD.filterWith to create whatever you need out of serializable 
parts so you only run it once per partition.

   	   
   	Andrew Ash  
  February 24, 2014
 at 7:17 PM
  This is because 
Joda's DateTimeFormatter is not serializable (doesn't implement the 
empty Serializable interface) http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html

One ugly thing I've done before is to instantiate a new 
DateTimeFormatter in every line, so like this:myRDD.filter(x
 => 
DateTimeFormat.forPattern("-mm-dd").parseString(x.getCreatedAt).isAfter(start)
 ).count

It's very inefficient but it gets things closer to 
working.Another thing to try is to switch to 
using Kryo serialization instead of the default Java serialization, 
which I think did handle DTF formatting correctly.  Back in 0.7.x days 
though, there was an issue where some of the Joda libraries wouldn't 
correctly serialize with Kryo, but I think that's since been fixed: https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU

HTH,Andrew

  
   	   
   	Soumya Simanta  
  February 24, 2014
 at 6:57 PM
  I want to filter
 a RDD by comparing dates.







myRDD.filter( x => new 
DateTime(x.getCreatedAt).isAfter(start) ).countI'm
 using the JodaTime library but I get an exception about a Jodatime 
class not serializable. 
Is there a way to configure this or an easier alternative
 for this problem. 







org.apache.spark.SparkException: Job aborted: Task not 
serializable: java.io.NotSerializableException: 
org.joda.time.format.DateTimeFormatter	at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
	at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)	at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
	at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)	at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
	at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)	at 
akka.actor.ActorCell.invoke(ActorCell.scala:456)
	at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)	at 
akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)	at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)	at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

  




Re: Filter on Date by comparing

2014-02-24 Thread Soumya Simanta
Thanks Andrew. I was expecting this to be the issue.
Are there any pointers about how to change the serialization to Kryo ?




On Mon, Feb 24, 2014 at 10:17 PM, Andrew Ash  wrote:

> This is because Joda's DateTimeFormatter is not serializable (doesn't
> implement the empty Serializable interface)
> http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
>
> One ugly thing I've done before is to instantiate a new DateTimeFormatter
> in every line, so like this:
>
> myRDD.filter(x =>
> DateTimeFormat.forPattern("-mm-dd").parseString(x.getCreatedAt).isAfter(start)
> ).count
>
> It's very inefficient but it gets things closer to working.
>
> Another thing to try is to switch to using Kryo serialization instead of
> the default Java serialization, which I think did handle DTF formatting
> correctly.  Back in 0.7.x days though, there was an issue where some of the
> Joda libraries wouldn't correctly serialize with Kryo, but I think that's
> since been fixed:
> https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU
>
> HTH,
> Andrew
>
>
> On Mon, Feb 24, 2014 at 6:57 PM, Soumya Simanta 
> wrote:
>
>> I want to filter a RDD by comparing dates.
>>
>> myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count
>>
>>
>> I'm using the JodaTime library but I get an exception about a Jodatime
>> class not serializable.
>>
>> Is there a way to configure this or an easier alternative for this
>> problem.
>>
>>
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>
>


Re: Filter on Date by comparing

2014-02-24 Thread Andrew Ash
It's in the data serialization section of the tuning guide, here:

http://spark.incubator.apache.org/docs/latest/tuning.html#data-serialization


On Mon, Feb 24, 2014 at 7:44 PM, Soumya Simanta wrote:

> Thanks Andrew. I was expecting this to be the issue.
> Are there any pointers about how to change the serialization to Kryo ?
>
>
>
>
> On Mon, Feb 24, 2014 at 10:17 PM, Andrew Ash  wrote:
>
>> This is because Joda's DateTimeFormatter is not serializable (doesn't
>> implement the empty Serializable interface)
>> http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
>>
>> One ugly thing I've done before is to instantiate a new DateTimeFormatter
>> in every line, so like this:
>>
>> myRDD.filter(x =>
>> DateTimeFormat.forPattern("-mm-dd").parseString(x.getCreatedAt).isAfter(start)
>> ).count
>>
>> It's very inefficient but it gets things closer to working.
>>
>> Another thing to try is to switch to using Kryo serialization instead of
>> the default Java serialization, which I think did handle DTF formatting
>> correctly.  Back in 0.7.x days though, there was an issue where some of the
>> Joda libraries wouldn't correctly serialize with Kryo, but I think that's
>> since been fixed:
>> https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU
>>
>> HTH,
>> Andrew
>>
>>
>> On Mon, Feb 24, 2014 at 6:57 PM, Soumya Simanta > > wrote:
>>
>>> I want to filter a RDD by comparing dates.
>>>
>>> myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count
>>>
>>>
>>> I'm using the JodaTime library but I get an exception about a Jodatime
>>> class not serializable.
>>>
>>> Is there a way to configure this or an easier alternative for this
>>> problem.
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>
>>
>


Re: How to get well-distribute partition

2014-02-24 Thread Mayur Rustagi
Easiest is to plugin your own partitioner if you know the nature of the
data. If you dont then you can sample the data for creating the partitions
weight, you can use RangePartitioner out of the box.

Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Mon, Feb 24, 2014 at 6:16 PM, zhaoxw12 wrote:

> I use spark-0.8.0. This is my code in python.
>
>
> list = [(i, i*i) for i in xrange(0, 16)]*10
> rdd = sc.parallelize(list, 80)
> temp = rdd.collect()
> temp2 = rdd.partitionBy(16, lambda x: x )
> count = 0
> for i in temp2.glom().collect():
>   print count, "**", i
>   count += 1
>
> This will get result below:
>
> 0 ** [(10, 100), (1, 1), (10, 100)... (10, 100), (1, 1), (10, 100)]
> 1 ** [(2, 4), (11, 121), (11, 121)... (2, 4), (11, 121), (2, 4), (11, 121)]
> 2 ** [(12, 144), (12, 144), (3, 9)... (12, 144), (12, 144), (3, 9), (12,
> 144)]
> 3 ** [(13, 169), (13, 169), (4, 16)... (4, 16), (13, 169), (13, 169), (13,
> 169)]
> 4 ** [(14, 196), (5, 25), (5, 25)... (14, 196), (5, 25), (14, 196), (5,
> 25),
> (14, 196)]
> 5 ** [(6, 36), (6, 36), (15, 225)...(6, 36), (6, 36), (15, 225), (15, 225),
> (6, 36)]
> 6 ** [(7, 49), (7, 49), (7, 49)... (7, 49), (7, 49), (7, 49), (7, 49), (7,
> 49), (7, 49)]
> 7 ** [(8, 64), (8, 64), (8, 64)... (8, 64), (8, 64), (8, 64), (8, 64), (8,
> 64), (8, 64)]
> 8 ** [(9, 81), (9, 81), (9, 81)... (9, 81), (9, 81), (9, 81), (9, 81), (9,
> 81), (9, 81)]
> 9 ** []
> 10 ** []
> 11 ** []
> 12 ** []
> 13 ** []
> 14 ** []
> 15 ** [(0, 0), (0, 0), (0, 0), (0, 0)... (0, 0), (0, 0), (0, 0), (0, 0),
> (0,
> 0)]
>
> I want that each partition only has one key number. As you see, there are 6
> partitions which are empty and 6 partition which have 2 key numbers. It
> will
> cause me a lot of trouble when I was handling big datas.
> I face the same problem as
> "
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html
> ".
> But I have not found a solution in python.
> Thank a lot for your help. :)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-well-distribute-partition-tp2002.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


spark failure

2014-02-24 Thread Nathan Kronenfeld
I'm using spark 0.8.1, and trying to run a job from a new remote client (it
works fine when run directly from the master).

When I try and run it, the job just fails without doing anything.

Unfortunately, I also can't find anywhere were it tells me why it fails.
 I'll add the bits of the logs below, but there really isn't much.

Does anyone know how to tell why it's failing? I assume it must be getting
an exception somewhere, but it isn't telling me about it.

On the client, I see:
14/02/24 23:44:43 INFO Client$ClientActor: Executor added:
app-20140224234441-0003/4 on
worker-20140224140443-hadoop-s2.oculus.local-40819
(hadoop-s2.oculus.local:7077) with 32 cores
14/02/24 23:44:43 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140224234441-0003/4 on hostPort hadoop-s2.oculus.local:7077 with 32
cores, 200.0 GB RAM
14/02/24 23:44:43 INFO Client$ClientActor: Executor updated:
app-20140224234441-0003/4 is now RUNNING
14/02/24 23:44:43 INFO FileInputFormat: Total input paths to process : 200
14/02/24 23:44:43 INFO Client$ClientActor: Executor updated:
app-20140224234441-0003/1 is now FAILED (Command exited with code 1)
14/02/24 23:44:43 INFO SparkDeploySchedulerBackend: Executor
app-20140224234441-0003/1 removed: Command exited with code 1

The master log just has:
14/02/24 23:44:43 INFO master.Master: Launching executor
app-20140224234441-0003/4 on worker
worker-20140224140443-hadoop-s2.oculus.local-40819
14/02/24 23:44:45 INFO master.Master: Removing executor
app-20140224234441-0003/4 because it is FAILED

(no other mention of 0003/4)

The client log has:
14/02/24 23:44:43 INFO worker.Worker: Asked to launch executor
app-20140224234441-0003/4 for Pyramid Binning(ndk)
14/02/24 23:44:43 INFO worker.ExecutorRunner: Launch command:
"/usr/java/jdk1.7.0_25-cloudera/bin/java" "-cp"
"math-utilities-0.2.jar:binning-utilities-0.2.jar:tile-generation-0.2.jar:hbase-client-0.95.2-cdh5.0.0-beta-1.jar:hbase-protocol-0.95.2-cdh5.0.0-beta-1.jar:hbase-common-0.95.2-cdh5.0.0-beta-1.jar:htrace-core-2.01.jar:avro-1.7.4.jar:commons-compress-1.4.1.jar:scala-library-2.9.3.jar:scala-compiler-2.9.3.jar:/opt/spark/conf:spark-assembly-0.8.1-incubating-hadoop2.2.0-mr1-cdh5.0.0-beta-1.jar"
"-Dspark.executor.memory=200G" "-Xms204800M" "-Xmx204800M"
"org.apache.spark.executor.CoarseGrainedExecutorBackend"
"akka://spark@hadoop-client.oculus.local:41101/user/CoarseGrainedScheduler"
"4" "hadoop-s2.oculus.local" "32" "app-20140224234441-0003"
14/02/24 23:44:45 INFO worker.Worker: Executor app-20140224234441-0003/4
finished with state FAILED message Command exited with code 1 exitStatus 1


Again, nothing else

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: How to get well-distribute partition

2014-02-24 Thread zhaoxw12
Thanks for your reply.
For some reasons, I have to use python in my program. I can't find the API
of RangePartitioner. Could you tell me more details?   




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-well-distribute-partition-tp2002p2013.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: apparently non-critical errors running spark-ec2 launch

2014-02-24 Thread Nicholas Chammas
Alright, that's good to know. And I guess the first of these errors can be
prevented by increasing the wait time via --wait.

Thank you.

Nick


On Mon, Feb 24, 2014 at 9:04 PM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> Replies inline
>
> On Mon, Feb 24, 2014 at 5:26 PM, nicholas.chammas
>  wrote:
> > I'm seeing a bunch of (apparently) non-critical errors when launching new
> > clusters with spark-ec2 0.9.0.
> >
> > Here are some of them (emphasis added; names redacted):
> >
> > Generating cluster's SSH key on master...
> >
> > ssh: connect to host ec2-.compute-1.amazonaws.com port 22:
> > Connection refused
> >
> > Error executing remote command, retrying after 30 seconds: Command
> '['ssh',
> > '-o', 'StrictHostKeyChecking=no', '-i',
> > '/Users//.pem.txt', '-t', '-t',
> > u'root@ec2-.compute-1.amazonaws.com', "\n  [ -f
> ~/.ssh/id_rsa
> > ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&\n
> cat
> > ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)\n"]' returned non-zero
> exit
> > status 255
> >
> This is harmless -- EC2 instances sometimes take longer than we expect
> to startup. The retry mechanism handles things like that.
>
> > ...
> >
> > Unpacking Spark
> > ~/spark-ec2
> > Initializing shark
> > ~ ~/spark-ec2
> > ERROR: Unknown Shark version
> > Initializing ephemeral-hdfs
> > ~ ~/spark-ec2 ~/spark-ec2
> >
> > ...
> >
> I think this error happens when there isn't a Shark version
> corresponding to a Spark release. This is the case right now for 0.9 I
> think. The downside of this is that Shark will not be available on
> your cluster.
>
> > RSYNC'ing /root/hive* to slaves...
> > ec2-.compute-1.amazonaws.com
> > rsync: link_stat "/root/hive*" failed: No such file or directory (2)
> > rsync error: some files/attrs were not transferred (see previous errors)
> > (code 23) at main.c(1039) [sender=3.0.6]
> >
> I think this is also due to the Shark error. I am not very sure.
>
> > Setting up ephemeral-hdfs
> >
> > ...
> >
> > RSYNC'ing /etc/ganglia to slaves...
> > ec2-.compute-1.amazonaws.com
> > Shutting down GANGLIA gmond:   [FAILED]
> > Starting GANGLIA gmond:[  OK  ]
> > Shutting down GANGLIA gmond:   [FAILED]
> > Starting GANGLIA gmond:[  OK  ]
> > Connection to ec2-.compute-1.amazonaws.com closed.
> > Shutting down GANGLIA gmetad:  [FAILED]
> > Starting GANGLIA gmetad:   [  OK  ]
> > Stopping httpd:[FAILED]
> > Starting httpd:[  OK  ]
> >
>
> This is expected and just comes from using service restarts on Linux.
> It just says that ganglia isn't running when we try to stop it.
>
> > Are these errors known to be harmless, or somehow expected/normal?
> >
> > When I log in to the cluster the shell starts up fine.
> >
> > Nick
> >
> >
> > 
> > View this message in context: apparently non-critical errors running
> > spark-ec2 launch
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


答复: Can spark-streaming work with spark-on-yarn mode?

2014-02-24 Thread 林武康
Thank you Tathagata, I will try  it out later.

-原始邮件-
发件人: "Tathagata Das" 
发送时间: ‎2014/‎2/‎22 11:12
收件人: "u...@spark.incubator.apache.org" 
主题: Re: Can spark-streaming work with spark-on-yarn mode?

Yes, Spark and Spark Streaming programs can be deployed on YARN. Here is the 
documentation.


TD



On Thu, Feb 20, 2014 at 11:16 PM, 林武康  wrote:

hi all,
I am a very newbie of apache spark, recently I have tried spark on yarn, it 
works for batch process. Now we want to try streaming process using 
spark-streaming, and still, use yarn for resource scheduler as we want to 
manager all the resource of the cluster used for computing tasks in a unified 
way.  Can this works? 
any suggestions are welcome!




Best Regards!

Job initialization performance of Spark standalone mode vs YARN

2014-02-24 Thread polkosity
Is there any difference in the performance of Spark standalone mode and YARN
when it comes to initializing a new Spark job?  

In my application, response time is absolutely critical, and I'm hoping to
have the executors working within a few seconds of submitting the job.

Both options ran quickly for me (running the SparkPi example) in a single
node cluster, only a couple of seconds until executors began work.  On my 10
node cluster it takes YARN over 10 seconds before the executors actually
begin work.  Could I expect Spark standalone to get going any quicker?  If
so I will take the time to configure it on 10 node cluster.

Why does the example run so much quicker on my local single node cluster
than on my 10 EC2 m1.larges?

Aside from YARN being able to schedule Spark, MRv2 and other job types, are
there any major differences between Spark standalone and YARN?

Thanks.
- Dan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-tp2016.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark performance optimization

2014-02-24 Thread polkosity
As mentioned in a previous post, I have an application which relies on a
quick response.  The application matches a client's image against a set of
stored images.  Image features are stored in a SequenceFile and passed over
JNI to match in OpenCV, along with the features for the client's image.  An
id for the matched image is returned.

I was using Hadoop 1.2.1 and achieved some pretty good results, but the job
initialization was taking about 15 seconds, and we'd hoped to have a
response in ~5 seconds.  So we moved to Hadoop 2.2, YARN & Spark.  Sadly,
job initialization is still taking over 10 seconds (on a cluster of 10 EC2
m1.large).

Any suggestions on what I can do to bring this initialization time down?

Once the executors begin work, the performance is quite good, but any
general performance optimization tips also welcome!

Thanks. 
- Dan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-optimization-tp2017.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


WARNING: Spark lists moving to spark.apache.org domain name

2014-02-24 Thread Matei Zaharia
Hi everyone,

As you may have noticed, our lists are currently in the process of being 
migrated from @spark.incubator.apache.org domain names to @spark.apache.org, as 
part of the project becoming a top-level project. Please beware that messages 
will come to the new lists and you’ll have to adjust email filters accordingly. 
This is hopefully our last move of mailing list — thanks for bearing with us 
through the moves in the past year.

Matei

Re: Spark performance optimization

2014-02-24 Thread Andrew Ash
Have you tried using a standalone spark cluster vs a YARN one?  I get the
impression that standalone responses are faster (the JVMs are already all
running) but haven't done any rigorous testing (and have only used
standalone so far).


On Mon, Feb 24, 2014 at 10:43 PM, polkosity  wrote:

> As mentioned in a previous post, I have an application which relies on a
> quick response.  The application matches a client's image against a set of
> stored images.  Image features are stored in a SequenceFile and passed over
> JNI to match in OpenCV, along with the features for the client's image.  An
> id for the matched image is returned.
>
> I was using Hadoop 1.2.1 and achieved some pretty good results, but the job
> initialization was taking about 15 seconds, and we'd hoped to have a
> response in ~5 seconds.  So we moved to Hadoop 2.2, YARN & Spark.  Sadly,
> job initialization is still taking over 10 seconds (on a cluster of 10 EC2
> m1.large).
>
> Any suggestions on what I can do to bring this initialization time down?
>
> Once the executors begin work, the performance is quite good, but any
> general performance optimization tips also welcome!
>
> Thanks.
> - Dan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-optimization-tp2017.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Job initialization performance of Spark standalone mode vs YARN

2014-02-24 Thread Mayur Rustagi
Mayur Rustagi
Ph: +919632149971
h ttp://www.sigmoidanalytics.com
https://twitter.com/mayur_rustagi



On Mon, Feb 24, 2014 at 10:22 PM, polkosity  wrote:

> Is there any difference in the performance of Spark standalone mode and
> YARN
> when it comes to initializing a new Spark job?
>
Yes Yarn is a much more complex cluster manager than the one provided by
Spark Standalone.

>
> In my application, response time is absolutely critical, and I'm hoping to
> have the executors working within a few seconds of submitting the job.
>
> Both options ran quickly for me (running the SparkPi example) in a single
> node cluster, only a couple of seconds until executors began work.  On my
> 10
> node cluster it takes YARN over 10 seconds before the executors actually
> begin work.  Could I expect Spark standalone to get going any quicker?  If
> so I will take the time to configure it on 10 node cluster.
>
Yes Spark standalone is much much faster & can be prefered if you are not
running any other applications (like hive, hbase, etc ) on the cluster. I
get very responsive 2-3sec response time in standalone mode with 10
machines.

>
> Why does the example run so much quicker on my local single node cluster
> than on my 10 EC2 m1.larges?


> Aside from YARN being able to schedule Spark, MRv2 and other job types, are
> there any major differences between Spark standalone and YARN?
>
Yarn has much more granular control over the cluster resources. You can
also look into Mesos for management which will be much faster than Yarn for
now.

>
> Thanks.
> - Dan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-tp2016.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Spark performance optimization

2014-02-24 Thread Roshan Nair
Hi,

We use sequence files as input as well. Spark creates a task for each part*
file by default. We use RDD.coalesce (set to number of cores or 2*number of
cores). This helps when there are many more part* files than the number of
cores and each part* file is relatively small. Coalesce doesn't actually
move files or around or force a repartition.

This shouldn't affect your overall job initialization times, but might
improve your general job throughput.

Roshan


On Tue, Feb 25, 2014 at 12:13 PM, polkosity  wrote:

> As mentioned in a previous post, I have an application which relies on a
> quick response.  The application matches a client's image against a set of
> stored images.  Image features are stored in a SequenceFile and passed over
> JNI to match in OpenCV, along with the features for the client's image.  An
> id for the matched image is returned.
>
> I was using Hadoop 1.2.1 and achieved some pretty good results, but the job
> initialization was taking about 15 seconds, and we'd hoped to have a
> response in ~5 seconds.  So we moved to Hadoop 2.2, YARN & Spark.  Sadly,
> job initialization is still taking over 10 seconds (on a cluster of 10 EC2
> m1.large).
>
> Any suggestions on what I can do to bring this initialization time down?
>
> Once the executors begin work, the performance is quite good, but any
> general performance optimization tips also welcome!
>
> Thanks.
> - Dan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-performance-optimization-tp2017.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


[no subject]

2014-02-24 Thread Soumitra Kumar
I have a code which reads an HBase table, and counts number of rows
containing a field.

def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
RDD[List[Array[Byte]]] = {
return rdd.flatMap(kv => {
// Set of interesting keys for this use case
val keys = List ("src")
var data = List[Array[Byte]]()
var usefulRow = false

val cf = Bytes.toBytes ("cf")
keys.foreach {key =>
val col = kv._2.getValue(cf, Bytes.toBytes(key))
if (col != null)
usefulRow = true
data = data :+ col
}

if (usefulRow)
Some(data)
else
None
})
}

def main(args: Array[String]) {
val hBaseRDD = init(args)
// hBaseRDD.cache()

println(" Initial row count " + hBaseRDD.count())
println(" Rows with interesting fields " +
readFields(hBaseRDD).count())
  }


I am running on a one mode CDH installation.

As it is it takes around 2.5 minutes. But if I comment out 'println("
Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.

Is it doing HBase scan twice, for both 'count' calls? How do I improve it?

Thanks,
-Soumitra.


HBase row count

2014-02-24 Thread Soumitra Kumar
I have a code which reads an HBase table, and counts number of rows
containing a field.

def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
RDD[List[Array[Byte]]] = {
return rdd.flatMap(kv => {
// Set of interesting keys for this use case
val keys = List ("src")
var data = List[Array[Byte]]()
var usefulRow = false

val cf = Bytes.toBytes ("cf")
keys.foreach {key =>
val col = kv._2.getValue(cf, Bytes.toBytes(key))
if (col != null)
usefulRow = true
data = data :+ col
}

if (usefulRow)
Some(data)
else
None
})
}

def main(args: Array[String]) {
val hBaseRDD = init(args)
// hBaseRDD.cache()

println(" Initial row count " + hBaseRDD.count())
println(" Rows with interesting fields " +
readFields(hBaseRDD).count())
  }


I am running on a one mode CDH installation.

As it is it takes around 2.5 minutes. But if I comment out 'println("
Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.

Is it doing HBase scan twice, for both 'count' calls? How do I improve it?

Thanks,
-Soumitra.


Re: HBase row count

2014-02-24 Thread Nick Pentreath
Yes, you''re initiating a scan for each count call. The normal way to
improve this would be to use cache(), which is what you have in your
commented out line:
// hBaseRDD.cache()

If you uncomment that line, you should see an improvement overall.

If caching is not an option for some reason (maybe data is too large), then
you can implement an overall count in your readFields method using
accumulators:

val count = sc.accumulator(0L)
...
In your flatMap function do count += 1 for each row (regardless of whether
"interesting" or not).

In your main method after doing an action (e.g. count in your case), call val
totalCount = count.value.




On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar wrote:

> I have a code which reads an HBase table, and counts number of rows
> containing a field.
>
> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
> RDD[List[Array[Byte]]] = {
> return rdd.flatMap(kv => {
> // Set of interesting keys for this use case
> val keys = List ("src")
> var data = List[Array[Byte]]()
> var usefulRow = false
>
> val cf = Bytes.toBytes ("cf")
> keys.foreach {key =>
> val col = kv._2.getValue(cf, Bytes.toBytes(key))
> if (col != null)
> usefulRow = true
> data = data :+ col
> }
>
> if (usefulRow)
> Some(data)
> else
> None
> })
> }
>
> def main(args: Array[String]) {
> val hBaseRDD = init(args)
> // hBaseRDD.cache()
>
> println(" Initial row count " + hBaseRDD.count())
> println(" Rows with interesting fields " +
> readFields(hBaseRDD).count())
>   }
>
>
> I am running on a one mode CDH installation.
>
> As it is it takes around 2.5 minutes. But if I comment out 'println("
> Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.
>
> Is it doing HBase scan twice, for both 'count' calls? How do I improve it?
>
> Thanks,
> -Soumitra.
>


Re: HBase row count

2014-02-24 Thread Soumitra Kumar
I did try with 'hBaseRDD.cache()', but don't see any improvement.

My expectation is that with cache enabled, there should not be any penalty
of 'hBaseRDD.count' call.



On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath
wrote:

> Yes, you''re initiating a scan for each count call. The normal way to
> improve this would be to use cache(), which is what you have in your
> commented out line:
> // hBaseRDD.cache()
>
> If you uncomment that line, you should see an improvement overall.
>
> If caching is not an option for some reason (maybe data is too large),
> then you can implement an overall count in your readFields method using
> accumulators:
>
> val count = sc.accumulator(0L)
> ...
> In your flatMap function do count += 1 for each row (regardless of
> whether "interesting" or not).
>
> In your main method after doing an action (e.g. count in your case), call val
> totalCount = count.value.
>
>
>
>
> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar 
> wrote:
>
>> I have a code which reads an HBase table, and counts number of rows
>> containing a field.
>>
>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>> RDD[List[Array[Byte]]] = {
>> return rdd.flatMap(kv => {
>> // Set of interesting keys for this use case
>> val keys = List ("src")
>> var data = List[Array[Byte]]()
>> var usefulRow = false
>>
>> val cf = Bytes.toBytes ("cf")
>> keys.foreach {key =>
>> val col = kv._2.getValue(cf, Bytes.toBytes(key))
>> if (col != null)
>> usefulRow = true
>> data = data :+ col
>> }
>>
>> if (usefulRow)
>> Some(data)
>> else
>> None
>> })
>> }
>>
>> def main(args: Array[String]) {
>> val hBaseRDD = init(args)
>> // hBaseRDD.cache()
>>
>> println(" Initial row count " + hBaseRDD.count())
>> println(" Rows with interesting fields " +
>> readFields(hBaseRDD).count())
>>   }
>>
>>
>> I am running on a one mode CDH installation.
>>
>> As it is it takes around 2.5 minutes. But if I comment out 'println("
>> Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.
>>
>> Is it doing HBase scan twice, for both 'count' calls? How do I improve it?
>>
>> Thanks,
>> -Soumitra.
>>
>
>