Multiple Filter Effiency

2014-12-16 Thread zkidkid
Hi,
Currently I am trying to count on a document with multiple filter.
Let say, here is my document:

//user field1 field2 field3
user1 0 0 1
user2 0 1 0
user3 0 0 0

I want to count on user.log for some filters like this:

Filter1: field1 == 0 & field 2 = 0
Filter2: field1 == 0 & field 3 = 1
Filter3: field1 == 0 & field 3 = 0
...
and total line.

I have tried and I found that I couldn't use "group by" or "map then reduce"
because a line could match two or more filter.

My idea now is "foreach" line and then maintain a outsite counter service.

Forexample:

JavaRDD textFile = sc.textFile(hdfs, 10);
long start = System.currentTimeMillis();

textFile.foreach(new VoidFunction() {

public void call(String s) {
   foreach(MyFilter filter: MyFilters){
   if(filter.match(s)) filter.increaseOwnCounter();
   }
}
});


I would happy if there have another way to do it, any help is appreciate.
Thanks in advance.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Filter-Effiency-tp20701.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



GC problem while filtering large data

2014-12-16 Thread Joe L
Hi I am trying to filter large table with 3 columns. Spark SQL might be a
good choice but want to do it without SQL. The goal is to filter bigtable
with multi clauses. I filtered bigtable 3times but the first filtering takes
about 50seconds but the second and third filter transformation took about 5
seconds. I wonder if it is because of lazy evaluation. But I already
evaluated my rdd parsing it when I first read it using sc.textFile then
counted it.
Running times:
t1 => 50seconds
t2 => 5seconds
t3 => 4seconds

val clause = List(
  ("",
""),
  ("", "?Z"),
  ("", "?Y")
)

val bcastedSubj: Broadcast[String] = sc.broadcast("?X")
val bcastedCls: Broadcast[List[(String, String)]] = sc.broadcast(clause)
var n = clause.length

val t0 = System.currentTimeMillis()

val subgraph1 = bigtable.mapPartitions (
  iterator => {
val bcls = bcastedCls.value
val bsubj = bcastedSubj.value
n = bcls.length
for ((s, grp) <- iterator;
 if {
   val flag = if (!bsubj.startsWith("?") && !bsubj.equals(s))
false
   else {
 var k = 0

 val m = grp.length
 var flag1 = true

 while(k < n) {
   var flag2 = false
   var l = 0
   while(l < m) {
 if (grp(l)._1.equals(bcls(k)._1) &&
grp(l)._2.equals(bcls(k)._2)) flag2 = true
 else if (bcls(k)._1.startsWith("?") &&
grp(l)._2.equals(bcls(k)._2)) flag2 = true
 else if  (bcls(k)._2.startsWith("?") &&
grp(l)._1.equals(bcls(k)._1)) flag2 = true
 l += 1
   }
   if (!flag2) flag1 = false

   k += 1
 }

 flag1
   }

   flag
 }
) yield (s, grp)
  }, preservesPartitioning = true).cache()
val num1 = subgraph1.count()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GC-problem-while-filtering-large-data-tp20702.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Custom UDTF with Lateral View throws ClassNotFound exception in Spark SQL CLI

2014-12-16 Thread shenghua
A workaround trick is found and put in the ticket
https://issues.apache.org/jira/browse/SPARK-4854. Hope this would be useful.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-UDTF-with-Lateral-View-throws-ClassNotFound-exception-in-Spark-SQL-CLI-tp20689p20704.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accessing Apache Spark from Java

2014-12-16 Thread Akhil Das
Hi Jai,

Refer this doc and make sure your network is not blocking
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-job-on-Unix-cluster-from-dev-environment-Windows-td16989.html

Also make sure you are using the same version of spark in both places (the
one on the cluster, and the one that you used inside your application)

Thanks
Best Regards

On Tue, Dec 16, 2014 at 1:25 PM, Jai  wrote:
>
> Hi
>
> I have installed a standalone Spark set up in standalone mode in a Linux
> server and I am trying to access that spark setup from Java in windows.
> When
> I try connecting to Spark I see the following exception
>
> 14/12/16 12:52:52 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient memory
> 14/12/16 12:52:56 INFO AppClient$ClientActor: Connecting to master
> spark://01hw294954.INDIA:7077...
> 14/12/16 12:53:07 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient memory
> 14/12/16 12:53:16 INFO AppClient$ClientActor: Connecting to master
> spark://01hw294954.INDIA:7077...
> 14/12/16 12:53:22 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient memory
> 14/12/16 12:53:36 ERROR SparkDeploySchedulerBackend: Application has been
> killed. Reason: All masters are unresponsive! Giving up.
> 14/12/16 12:53:36 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> 14/12/16 12:53:36 INFO TaskSchedulerImpl: Cancelling stage 0
> 14/12/16 12:53:36 INFO DAGScheduler: Failed to run collect at
> MySqlConnector.java:579
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: All masters are unresponsive! Giving up.
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run
>
> I have attached the Spark Master UI
>
>  Spark Master at spark://01hw294954.INDIA:7077
> URL: spark://01hw294954.INDIA:7077
> Workers: 1
> Cores: 2 Total, 0 Used
> Memory: 835.0 MB Total, 0.0 B Used
> Applications: 0 Running, 0 Completed
> Drivers: 0 Running, 0 Completed
> Status: ALIVE
> Workers
>
> Id  Address State   Cores   Memory
> worker-20141216123503-01hw294954.INDIA-3896201hw294954.INDIA:38962
> ALIVE   2
> (0 Used) 835.0 MB (0.0 B Used)
> Running Applications
>
> ID  NameCores   Memory per Node Submitted Time  UserState
>  Duration
> Completed Applications
>
> ID  NameCores   Memory per Node Submitted Time  UserState
>  Duration
>
>
> My Spark Slave is
>
>  Spark Worker at 01hw294954.INDIA:38962
> ID: worker-20141216123503-01hw294954.INDIA-38962
> Master URL: spark://01hw294954.INDIA:7077
> Cores: 2 (0 Used)
> Memory: 835.0 MB (0.0 B Used)
> Back to Master
>
> Running Executors (0)
>
> ExecutorID  Cores   State   Memory  Job Details Logs
>
>
> My Java Master Code looks like this
>
> SparkConf sparkConf = new SparkConf().setAppName("JdbcRddTest");
> sparkConf.setMaster("spark://01hw294954.INDIA:7077");
> When I tried using the same code 

Re: MLLib: Saving and loading a model

2014-12-16 Thread Jaonary Rabarisoa
Hi,

There's is a ongoing work on model export
https://www.github.com/apache/spark/pull/3062

For now, since LinearRegression is serializable you can save it as object
file :

sc.saveAsObjectFile(Seq(model))

then

val model = sc.objectFile[LinearRegresionWithSGD]("path").first

model.predict(...)




On Mon, Dec 15, 2014 at 11:21 PM, Sameer Tilak  wrote:
>
> Hi All,
> Resending this:
>
> I am using LinearRegressionWithSGD and then I save the model weights and
> intercept.
> File that contains weights have this format:
>
> 1.20455
> 0.1356
> 0.000456
> ..
>
> Intercept is 0 since I am using train not setting the intercept so it can
> be ignored for the moment. I would now like to initialize a new model
> object and using these saved weights from the above file. We are using CDH
> 5.1
>
> Something along these lines:
>
> val weights = sc.textFile("linear-weights");
> val model = new LinearRegressionWithSGD(weights);
>
> then use is as:
>
> val valuesAndPreds = testData.map { point =>
>   val prediction = model.predict(point.features)
>   (point.label, prediction)
> }
>
>
> Any pointers to how do I do that?
>
>


GC problem while filtering

2014-12-16 Thread Batselem
Hi I am trying to filter a large table with 3 columns. My goal is to filter
this bigtable using multi clauses. I filtered bigtable 3 times but the first
filtering took about 50 seconds to complete whereas the second and third
filter transformation took about 5 seconds. I wonder if it is because of
lazy evaluation. But I already evaluated my rdd parsing it when I first read
the data using sc.textFile then counted it. I got the following result:

Running times: 
t1 => 50seconds 
t2 => 5seconds 
t3 => 4seconds 
***CODE***
val clause = List( 
  ("",
""), 
  ("", "?Z"), 
  ("", "?Y") 
) 

val bcastedSubj: Broadcast[String] = sc.broadcast("?X") 
val bcastedCls: Broadcast[List[(String, String)]] = sc.broadcast(clause) 
var n = clause.length 

val t0 = System.currentTimeMillis() 

val subgraph1 = bigtable.mapPartitions ( 
  iterator => { 
val bcls = bcastedCls.value 
val bsubj = bcastedSubj.value 
n = bcls.length 
for ((s, grp) <- iterator; 
 if { 
   val flag = if (!bsubj.startsWith("?") && !bsubj.equals(s))
false 
   else { 
 var k = 0 

 val m = grp.length 
 var flag1 = true 

 while(k < n) { 
   var flag2 = false 
   var l = 0 
   while(l < m) { 
 if (grp(l)._1.equals(bcls(k)._1) &&
grp(l)._2.equals(bcls(k)._2)) flag2 = true 
 else if (bcls(k)._1.startsWith("?") &&
grp(l)._2.equals(bcls(k)._2)) flag2 = true 
 else if  (bcls(k)._2.startsWith("?") &&
grp(l)._1.equals(bcls(k)._1)) flag2 = true 
 l += 1 
   } 
   if (!flag2) flag1 = false 

   k += 1 
 } 

 flag1 
   } 

   flag 
 } 
) yield (s, grp) 
  }, preservesPartitioning = true).cache() 
val num1 = subgraph1.count()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GC-problem-while-filtering-tp20705.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



答复: Fetch Failed caused job failed.

2014-12-16 Thread Ma,Xi
Hi Das,

Thanks for your advice.

I'm not sure what's the usage of setting memoryFraction to 1. I've tried to 
rerun the test again with the following parameters in spark_default.conf, but 
failed again:

spark.rdd.compress  true
spark.akka.frameSize  50
spark.storage.memoryFraction 0.8
spark.core.connection.ack.wait.timeout 6000

14/12/16 16:45:08 ERROR PythonRDD: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/spark/spark-1.1/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 146, in 
_read_with_length
length = read_int(stream)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 464, in 
read_int
raise EOFError
EOFError
 at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
 at 
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)

I suspect that there something wrong in shuffle stage, but not sure what's the 
error ?

Thanks,

Mars


发件人: Akhil Das [mailto:ak...@sigmoidanalytics.com]
发送时间: 2014年12月16日 14:57
收件人: Ma,Xi
抄送: u...@spark.incubator.apache.org
主题: Re: Fetch Failed caused job failed.

You could try setting the following while creating the sparkContext


  .set("spark.rdd.compress","true")

  .set("spark.storage.memoryFraction","1")

  .set("spark.core.connection.ack.wait.timeout","600")

  .set("spark.akka.frameSize","50")


Thanks
Best Regards

On Tue, Dec 16, 2014 at 8:30 AM, Mars Max 
mailto:m...@baidu.com>> wrote:
While I was running spark MR job, there was FetchFailed(BlockManagerId(47,
xx.com, 40975, 0), shuffleId=2, mapId=5, 
reduceId=286), then there
were many retries, and the job failed finally.

And the log showed the following error, does anybody meet this error ? or is
it a known issue in Spark ? Thanks.

4/12/16 10:43:43 ERROR PythonRDD: Python worker exited unexpectedly
(crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/home/spark/spark-1.1/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 146, in
_read_with_length
length = read_int(stream)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 464, in
read_int
raise EOFError
EOFError

at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
at
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.apache.spark.shuffle.FetchFailedException: Fetch failed:
BlockManagerId(47, 
nmg01-taihang-d11609.nmg01.baidu.com,
 40975, 0) 2 5 286
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:68)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:78)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 

Re: 答复: Fetch Failed caused job failed.

2014-12-16 Thread Akhil Das
So the fetch failure error is gone? Can you paste the code that you are
executing? What is the size of the data and your cluster setup?

Thanks
Best Regards

On Tue, Dec 16, 2014 at 3:16 PM, Ma,Xi  wrote:
>
>  Hi Das,
>
>
>
> Thanks for your advice.
>
>
>
> I'm not sure what's the usage of setting memoryFraction to 1. I've tried
> to rerun the test again with the following parameters in
> spark_default.conf, but failed again:
>
>
>
> spark.rdd.compress  true
>
> spark.akka.frameSize  50
>
> spark.storage.memoryFraction 0.8
>
> spark.core.connection.ack.wait.timeout 6000
>
>
>
> 14/12/16 16:45:08 ERROR PythonRDD: Python worker exited unexpectedly
> (crashed)
>
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>
>   File "/home/spark/spark-1.1/python/pyspark/worker.py", line 75, in main
>
> command = pickleSer._read_with_length(infile)
>
>   File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 146, in
> _read_with_length
>
> length = read_int(stream)
>
>   File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 464, in
> read_int
>
> raise EOFError
>
> EOFError
>
>  at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
>
>  at
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
>
>  at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
>
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
>  at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>
>  at org.apache.spark.scheduler.Task.run(Task.scala:54)
>
>  at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>
>  at java.lang.Thread.run(Thread.java:662)
>
>
>
> I suspect that there something wrong in shuffle stage, but not sure what's
> the error ?
>
>
>
> Thanks,
>
>
>
> Mars
>
>
>
>
>
> *发件人:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *发送时间:* 2014年12月16日 14:57
> *收件人:* Ma,Xi
> *抄送:* u...@spark.incubator.apache.org
> *主题:* Re: Fetch Failed caused job failed.
>
>
>
> You could try setting the following while creating the sparkContext
>
>
>
>   *.*set*(*"spark.rdd.compress"*,*"true"*)*
>
>   *.*set*(*"spark.storage.memoryFraction"*,*"1"*)*
>
>   *.*set*(*"spark.core.connection.ack.wait.timeout"*,*"600"*)*
>
>   *.*set*(*"spark.akka.frameSize"*,*"50"*)*
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Tue, Dec 16, 2014 at 8:30 AM, Mars Max  wrote:
>
> While I was running spark MR job, there was FetchFailed(BlockManagerId(47,
> xx.com, 40975, 0), shuffleId=2, mapId=5, reduceId=286), then there
> were many retries, and the job failed finally.
>
> And the log showed the following error, does anybody meet this error ? or
> is
> it a known issue in Spark ? Thanks.
>
> 4/12/16 10:43:43 ERROR PythonRDD: Python worker exited unexpectedly
> (crashed)
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "/home/spark/spark-1.1/python/pyspark/worker.py", line 75, in main
> command = pickleSer._read_with_length(infile)
>   File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 146, in
> _read_with_length
> length = read_int(stream)
>   File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 464, in
> read_int
> raise EOFError
> EOFError
>
> at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
> at
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
> at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: org.apache.spark.shuffle.FetchFailedException: Fetch failed:
> BlockManagerId(47, nmg01-taihang-d11609.nmg01.baidu.com, 40975, 0) 2 5 286
> at
>
>

Spark inserting into parquet files with different schema

2014-12-16 Thread AdamPD
Hi all,

I understand that parquet allows for schema versioning automatically in the
format; however, I'm not sure whether Spark supports this.

I'm saving a SchemaRDD to a parquet file, registering it as a table, then
doing an insertInto with a SchemaRDD with an extra column.

The second SchemaRDD does in fact get inserted, but the extra column isn't
present when I try to query it with Spark SQL.

Is there anything I can do to get this working how I'm hoping?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-inserting-into-parquet-files-with-different-schema-tp20706.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Locality level and Kryo

2014-12-16 Thread aecc
Hi guys,

It happens to me quite often that when the locality level of a task goes
further than LOCAL (NODE, RACK, etc), I get some of the following
exceptions: "too many files open", "encountered unregistered class id",
"cannot cast X to Y".

I do not get any exceptions during shuffling (which means that kryo works
well). 

I'm running Spark 1.0.0 with the following characteristics:

- 18 executors with 30G each
- Yarn client mode
- ulimit is defined in 500k
- Input data: hdfs file with 1000 partitions and 10 GB of size

Please any hint would be appreciated



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Locality-level-and-Kryo-tp20708.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Locality level and Kryo

2014-12-16 Thread Akhil Das
Make sure ulimit is set system wide across the cluster (ulimit -a). Also
reduce the number of partitions to a smaller number (say 200-500) to get
ride of *the too many open files*.

Thanks
Best Regards

On Tue, Dec 16, 2014 at 3:54 PM, aecc  wrote:
>
> Hi guys,
>
> It happens to me quite often that when the locality level of a task goes
> further than LOCAL (NODE, RACK, etc), I get some of the following
> exceptions: "too many files open", "encountered unregistered class id",
> "cannot cast X to Y".
>
> I do not get any exceptions during shuffling (which means that kryo works
> well).
>
> I'm running Spark 1.0.0 with the following characteristics:
>
> - 18 executors with 30G each
> - Yarn client mode
> - ulimit is defined in 500k
> - Input data: hdfs file with 1000 partitions and 10 GB of size
>
> Please any hint would be appreciated
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Locality-level-and-Kryo-tp20708.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: protobuf error running spark on hadoop 2.4

2014-12-16 Thread RodrigoB
Hi,

I'm currently having this issue as well, while using Spark with Akka 2.3.4.

Did Debasish's suggestion work for you?

I've already built Spark with hadoop 2.3.0 but still having the error. 

tnks,
Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/protobuf-error-running-spark-on-hadoop-2-4-tp15975p20709.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD "toarray","first" behavior

2014-12-16 Thread buring
Hi
Recently I have some problems about rdd behaviors.It's about
"RDD.first","RDD.toArray" method when RDD only has one element. I can't get
the correct element in RDD. I will give more detail after the code.
My code was as follows:
//get and rdd with just one row RDD[(Long,Array[Byte])] 
val alsresult =
sc.sequenceFile(args(0)+"/als",classOf[LongWritable],classOf[BytesWritable]).map{case(uid,sessions)=>
  sessions.setCapacity(sessions.getLength)
  (uid.get(),sessions.getBytes)
}.filter{line=>
  line._1 == userindex.value //specified from arguments
}
//log information really surprised me
logger.info("alsInformation:%d".format(alsresult.count()))
   
alsresult.toArray().foreach(e=>logger.info("alstoarray:%d\t%s".format(e._1,e._2.mkString("
"
   
alsresult.take(1).foreach(e=>logger.info("take1result:%d\t%s".format(e._1,e._2.mkString("
"
   
logger.info("firstInformation:%d\t%s".format(alsresult.first()._1,alsresult.first()._2.mkString("
")))
   
alsresult.collect().foreach(e=>logger.info("alscollectresult:%d\t%s".format(e._1,e._2.mkString("
"
   
alsresult.take(3).foreach(e=>logger.info("alstake3result:%d\t%s".format(e._1,e._2.mkString("
" //3 is big than the rdd.count()

I get a RDD which just have one element. But use the different method ,I
got the different element. My print information as follows:

userindex.value =28116855   
 userindex.value
=123456
alsInformation  1   
1
alstoarray  2811685516 32 0 22 13 49 19...  
1234561632 0 22 13 49 19...
take1result 2811685516 52 31 42 29 36 14... 
1234563939 21 34 25 49 51
...
firstInformation2811685516 52 31 42 29 36 14... 
1234563939 21 34 25
49 51 ...
alscollectresult2811685516 32 0 22 13 49 19...  
1234561632 0 22 13
49 19...
alstake3result  2811685516 32 0 22 13 49 19...  
1234561632 0 22 13 49
19...
 I filter the rdd and guarantee the RDD.count() equal 1.,I think different
"userindex.value"arguments should get different alsresult ,
but "RDD.toArray","RDD.collect","RDD.take(3)" ,have the same result and
under the same argument "toArray" ,"take(1)","take(3)" 
have the different resultmethod ,It's really surpurised me.

Can anyone explain it or give me some reference?

Thanks 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-toarray-first-behavior-tp20710.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLLib /ALS : java.lang.OutOfMemoryError: Java heap space

2014-12-16 Thread buring
you can try to decrease the rank value.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-java-lang-OutOfMemoryError-Java-heap-space-tp20584p20711.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
Hi,
I've built spark successfully with maven but when I try to run spark-shell I 
get the following errors:
Spark assembly has been built with Hive, including Datanucleus jars on classpath

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/spark/deploy/SparkSubmit

Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.SparkSubmit

at java.net.URLClassLoader$1.run(URLClassLoader.java:217)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:205)

at java.lang.ClassLoader.loadClass(ClassLoader.java:323)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at 
java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find the main 
class: org.apache.spark.deploy.SparkSubmit. Program will exit.

What can be wrong?

Thanks,
Daniel

Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Akhil Das
print the CLASSPATH and make sure the spark assembly jar is there in the
classpath

Thanks
Best Regards

On Tue, Dec 16, 2014 at 5:04 PM, Daniel Haviv  wrote:
>
> Hi,
> I've built spark successfully with maven but when I try to run spark-shell
> I get the following errors:
>
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/deploy/SparkSubmit
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.deploy.SparkSubmit
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at
> java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find the
> main class: org.apache.spark.deploy.SparkSubmit. Program will exit.
> What can be wrong?
>
> Thanks,
> Daniel
>


Re: Data Loss - Spark streaming

2014-12-16 Thread Gerard Maas
Hi Jeniba,

The second part of this meetup recording has a very good answer to your
question.  TD explains the current behavior and the on-going work in Spark
Streaming to fix HA.
https://www.youtube.com/watch?v=jcJq3ZalXD8


-kr, Gerard.

On Tue, Dec 16, 2014 at 11:32 AM, Jeniba Johnson <
jeniba.john...@lntinfotech.com> wrote:
>
> Hi,
>
> I need a clarification, while running streaming examples, suppose the
> batch interval is set to 5 minutes, after collecting the data from the
> input source(FLUME) and  processing till 5 minutes.
> What will happen to the data which is flowing continuously from the input
> source to spark streaming ? Will that data be stored somewhere or else the
> data will be lost ?
> Or else what is the solution to capture each and every data without any
> loss in Spark streaming.
>
> Awaiting for your kind reply.
>
>
> Regards,
> Jeniba Johnson
>
>
> 
> The contents of this e-mail and any attachment(s) may contain confidential
> or privileged information for the intended recipient(s). Unintended
> recipients are prohibited from taking action on the basis of information in
> this e-mail and using or disseminating the information, and must notify the
> sender and delete it from their system. L&T Infotech will not accept
> responsibility or liability for the accuracy or completeness of, or the
> presence of any virus or disabling code in this e-mail"
>


Re: KafkaUtils explicit acks

2014-12-16 Thread Mukesh Jha
I agree that this is not a trivial task as in this approach the kafka ack's
will be done by the SparkTasks that means a plug-able mean to ack your
input data source i.e. changes in core.

>From my limited experience with Kafka + Spark what I've seem is If spark
tasks takes longer time than the batch interval the next batch waits for
the previous one to finish, so I was wondering if offset management can be
done by spark too.

I'm just trying to figure out if this seems to be a worthwhile addition to
have?

On Mon, Dec 15, 2014 at 11:39 AM, Shao, Saisai 
wrote:
>
>  Hi,
>
>
>
> It is not a trivial work to acknowledge the offsets when RDD is fully
> processed, I think from my understanding only modify the KafakUtils is not
> enough to meet your requirement, you need to add a metadata management
> stuff for each block/RDD, and track them both in executor-driver side, and
> many other things should also be taken care J.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* mukh@gmail.com [mailto:mukh@gmail.com] *On Behalf Of *Mukesh
> Jha
> *Sent:* Monday, December 15, 2014 1:31 PM
> *To:* Tathagata Das
> *Cc:* francois.garil...@typesafe.com; user@spark.apache.org
> *Subject:* Re: KafkaUtils explicit acks
>
>
>
> Thanks TD & Francois for the explanation & documentation. I'm curious if
> we have any performance benchmark with & without WAL for
> spark-streaming-kafka.
>
>
>
> Also In spark-streaming-kafka (as kafka provides a way to acknowledge
> logs) on top of WAL can we modify KafkaUtils to acknowledge the offsets
> only when the RRDs are fully processed and are getting evicted out of the
> Spark memory thus we can be cent percent sure that all the records are
> getting processed in the system.
>
> I was thinking if it's good to have the kafka offset information of each
> batch as part of RDDs metadata and commit the offsets once the RDDs lineage
> is complete.
>
>
>
> On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
> I am updating the docs right now. Here is a staged copy that you can
> have sneak peek of. This will be part of the Spark 1.2.
>
>
> http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html
>
> The updated fault-tolerance section tries to simplify the explanation
> of when and what data can be lost, and how to prevent that using the
> new experimental feature of write ahead logs.
> Any feedback will be much appreciated.
>
> TD
>
>
> On Wed, Dec 10, 2014 at 2:42 AM,   wrote:
> > [sorry for the botched half-message]
> >
> > Hi Mukesh,
> >
> > There's been some great work on Spark Streaming reliability lately.
> > https://www.youtube.com/watch?v=jcJq3ZalXD8
> > Look at the links from:
> > https://issues.apache.org/jira/browse/SPARK-3129
> >
> > I'm not aware of any doc yet (did I miss something ?) but you can look at
> > the ReliableKafkaReceiver's test suite:
> >
> >
> external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
> >
> > --
> > FG
> >
> >
> > On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha 
> > wrote:
> >>
> >> Hello Guys,
> >>
> >> Any insights on this??
> >> If I'm not clear enough my question is how can I use kafka consumer and
> >> not loose any data in cases of failures with spark-streaming.
> >>
> >> On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha 
> >> wrote:
> >>>
> >>> Hello Experts,
> >>>
> >>> I'm working on a spark app which reads data from kafka & persists it in
> >>> hbase.
> >>>
> >>> Spark documentation states the below [1] that in case of worker failure
> >>> we can loose some data. If not how can I make my kafka stream more
> reliable?
> >>> I have seen there is a simple consumer [2] but I'm not sure if it has
> >>> been used/tested extensively.
> >>>
> >>> I was wondering if there is a way to explicitly acknowledge the kafka
> >>> offsets once they are replicated in memory of other worker nodes (if
> it's
> >>> not already done) to tackle this issue.
> >>>
> >>> Any help is appreciated in advance.
> >>>
> >>>
> >>> Using any input source that receives data through a network - For
> >>> network-based data sources like Kafka and Flume, the received input
> data is
> >>> replicated in memory between nodes of the cluster (default replication
> >>> factor is 2). So if a worker node fails, then the system can recompute
> the
> >>> lost from the the left over copy of the input data. However, if the
> worker
> >>> node where a network receiver was running fails, then a tiny bit of
> data may
> >>> be lost, that is, the data received by the system but not yet
> replicated to
> >>> other node(s). The receiver will be started on a different node and it
> will
> >>> continue to receive data.
> >>> https://github.com/dibbhatt/kafka-spark-consumer
> >>>
> >>> Txz,
> >>>
> >>> Mukesh Jha
> >>
> >>
> >>
> >>
> >> --
> >>
> >>
> >> Thanks & Regards,
> >>
> >> Mukesh Jha
> >
> >
>
>
>
>
> --
>
>
>
>
>
> Thanks & Regards,
>
> *Mukesh Jha *
>


-- 


Thanks & Regards,

*Mukesh Jha *


Why so many tasks?

2014-12-16 Thread bethesda
Our job is creating what appears to be an inordinate number of very small
tasks, which blow out our os inode and file limits.  Rather than continually
upping those limits, we are seeking to understand whether our real problem
is that too many tasks are running, perhaps because we are mis-configured or
we are coding incorrectly.

Rather than posting our actual code I have re-created the essence of the
matter in the shell with a directory of files simulating the data we deal
with.  We have three servers, each with 8G RAM.

Given 1,000 files, each containing a string of 100 characters, in the
myfiles directory:

val data = sc.textFile("/user/foo/myfiles/*")

val c = data.count

The count operation produces 1,000 tasks.  Is this normal?

val cart = data.cartesian(data)
cart.count

The cartesian operation produces 1M tasks.  I understand that the cartesian
product of 1,000 items against itself is 1M, however, it seems the overhead
of all this task creation and file I/O of all these tiny files outweighs the
gains of distributed computing.  What am I missing here?

Below is the truncated output of the count operation, if this helps indicate
a configuration problem.

Thank you.

scala> data.count
14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
14/12/16 07:40:47 INFO SparkContext: Starting job: count at :15
14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at :15) with
1000 output partitions (allowLocal=false)
14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
:15)
14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
(/user/ds/randomfiles/* MappedRDD[3] at textFile at :12), which has
no missing parents
14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
curMem=507154, maxMem=278019440
14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 2.3 KB, free 264.7 MB)
14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
curMem=509554, maxMem=278019440
14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 1813.0 B, free 264.7 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at :12)
14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
tasks
14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-3.abc.cloud/10.40.13.192:36133]
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-2.abc.cloud/10.40.13.195:35716]
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-1.abc.cloud/10.40.13.194:33728]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-1.abc.cloud/10.40.13.194:49458]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-3.abc.cloud/10.40.13.192:58579]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-2.abc.cloud/10.40.13.195:52502]
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-2.abc.cloud/10.40.13.195:52502], 1 messages pending
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-1.abc.cloud:49458 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-3.abc.cloud:58579 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-2.abc.cloud:52502 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piec

Pyspark 1.1.1 error with large number of records - serializer.dump_stream(func(split_index, iterator), outfile)

2014-12-16 Thread mj
I've got a simple pyspark program that generates two CSV files and then
carries out a leftOuterJoin (a fact RDD joined to a dimension RDD). The
program works fine for smaller volumes of records, but when it goes beyond 3
million records for the fact dataset, I get the error below. I'm running
PySpark via PyCharm and the information for my environment is:

OS: Windows 7
Python version: 2.7.9
Spark version: 1.1.1
Java version: 1.8

I've also included the py file I am using. I'd appreciate any help you can
give me, 

MJ.


ERROR MESSAGE
C:\Python27\python.exe "C:/Users/Mark
Jones/PycharmProjects/spark_test/spark_error_sample.py"
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/12/16 12:48:26 INFO SecurityManager: Changing view acls to: Mark Jones,
14/12/16 12:48:26 INFO SecurityManager: Changing modify acls to: Mark Jones,
14/12/16 12:48:26 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(Mark Jones, );
users with modify permissions: Set(Mark Jones, )
14/12/16 12:48:26 INFO Slf4jLogger: Slf4jLogger started
14/12/16 12:48:27 INFO Remoting: Starting remoting
14/12/16 12:48:27 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@192.168.19.83:51387]
14/12/16 12:48:27 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@192.168.19.83:51387]
14/12/16 12:48:27 INFO Utils: Successfully started service 'sparkDriver' on
port 51387.
14/12/16 12:48:27 INFO SparkEnv: Registering MapOutputTracker
14/12/16 12:48:27 INFO SparkEnv: Registering BlockManagerMaster
14/12/16 12:48:27 INFO DiskBlockManager: Created local directory at
C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141216124827-11ef
14/12/16 12:48:27 INFO Utils: Successfully started service 'Connection
manager for block manager' on port 51390.
14/12/16 12:48:27 INFO ConnectionManager: Bound socket to port 51390 with id
= ConnectionManagerId(192.168.19.83,51390)
14/12/16 12:48:27 INFO MemoryStore: MemoryStore started with capacity 265.1
MB
14/12/16 12:48:27 INFO BlockManagerMaster: Trying to register BlockManager
14/12/16 12:48:27 INFO BlockManagerMasterActor: Registering block manager
192.168.19.83:51390 with 265.1 MB RAM
14/12/16 12:48:27 INFO BlockManagerMaster: Registered BlockManager
14/12/16 12:48:27 INFO HttpFileServer: HTTP File server directory is
C:\Users\MARKJO~1\AppData\Local\Temp\spark-3b772ca1-dbf7-4eaa-b62c-be5e73036f5d
14/12/16 12:48:27 INFO HttpServer: Starting HTTP Server
14/12/16 12:48:27 INFO Utils: Successfully started service 'HTTP file
server' on port 51391.
14/12/16 12:48:27 INFO Utils: Successfully started service 'SparkUI' on port
4040.
14/12/16 12:48:27 INFO SparkUI: Started SparkUI at http://192.168.19.83:4040
14/12/16 12:48:27 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/12/16 12:48:28 ERROR Shell: Failed to locate the winutils binary in the
hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in
the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
at org.apache.hadoop.util.Shell.(Shell.java:326)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.(Groups.java:77)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at
org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283)
at 
org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36)
at
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109)
at 
org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala)
at org.apache.spark.SparkContext.(SparkContext.scala:228)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:53)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.ja

Re: Why so many tasks?

2014-12-16 Thread Koert Kuipers
sc.textFile uses a hadoop input format. hadoop input formats by default
create one task per file, and they are not very suitable for many very
small files. can you turns your 1000 files into one larger text file?

otherwise maybe try:
val data = sc.textFile("/user/foo/myfiles/*").coalesce(100)

On Tue, Dec 16, 2014 at 7:51 AM, bethesda  wrote:
>
> Our job is creating what appears to be an inordinate number of very small
> tasks, which blow out our os inode and file limits.  Rather than
> continually
> upping those limits, we are seeking to understand whether our real problem
> is that too many tasks are running, perhaps because we are mis-configured
> or
> we are coding incorrectly.
>
> Rather than posting our actual code I have re-created the essence of the
> matter in the shell with a directory of files simulating the data we deal
> with.  We have three servers, each with 8G RAM.
>
> Given 1,000 files, each containing a string of 100 characters, in the
> myfiles directory:
>
> val data = sc.textFile("/user/foo/myfiles/*")
>
> val c = data.count
>
> The count operation produces 1,000 tasks.  Is this normal?
>
> val cart = data.cartesian(data)
> cart.count
>
> The cartesian operation produces 1M tasks.  I understand that the cartesian
> product of 1,000 items against itself is 1M, however, it seems the overhead
> of all this task creation and file I/O of all these tiny files outweighs
> the
> gains of distributed computing.  What am I missing here?
>
> Below is the truncated output of the count operation, if this helps
> indicate
> a configuration problem.
>
> Thank you.
>
> scala> data.count
> 14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
> 14/12/16 07:40:47 INFO SparkContext: Starting job: count at :15
> 14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at :15) with
> 1000 output partitions (allowLocal=false)
> 14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
> :15)
> 14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
> (/user/ds/randomfiles/* MappedRDD[3] at textFile at :12), which
> has
> no missing parents
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
> curMem=507154, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 2.3 KB, free 264.7 MB)
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
> curMem=509554, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes
> in memory (estimated size 1813.0 B, free 264.7 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
> 14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
> Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at :12)
> 14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
> tasks
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-3.abc.cloud/10.40.13.192:36133]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-2.abc.cloud/10.40.13.195:35716]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-1.abc.cloud/10.40.13.194:33728]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502]
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-2.abc.cloud/10.40

Re: Why so many tasks?

2014-12-16 Thread Akhil Das
Try to repartition the data like:

val data = sc.textFile("/user/foo/myfiles/*").repartition(100)


​Since the file size is small it shouldn't be a problem.​


Thanks
Best Regards

On Tue, Dec 16, 2014 at 6:21 PM, bethesda  wrote:
>
> Our job is creating what appears to be an inordinate number of very small
> tasks, which blow out our os inode and file limits.  Rather than
> continually
> upping those limits, we are seeking to understand whether our real problem
> is that too many tasks are running, perhaps because we are mis-configured
> or
> we are coding incorrectly.
>
> Rather than posting our actual code I have re-created the essence of the
> matter in the shell with a directory of files simulating the data we deal
> with.  We have three servers, each with 8G RAM.
>
> Given 1,000 files, each containing a string of 100 characters, in the
> myfiles directory:
>
> val data = sc.textFile("/user/foo/myfiles/*")
>
> val c = data.count
>
> The count operation produces 1,000 tasks.  Is this normal?
>
> val cart = data.cartesian(data)
> cart.count
>
> The cartesian operation produces 1M tasks.  I understand that the cartesian
> product of 1,000 items against itself is 1M, however, it seems the overhead
> of all this task creation and file I/O of all these tiny files outweighs
> the
> gains of distributed computing.  What am I missing here?
>
> Below is the truncated output of the count operation, if this helps
> indicate
> a configuration problem.
>
> Thank you.
>
> scala> data.count
> 14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
> 14/12/16 07:40:47 INFO SparkContext: Starting job: count at :15
> 14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at :15) with
> 1000 output partitions (allowLocal=false)
> 14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
> :15)
> 14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
> (/user/ds/randomfiles/* MappedRDD[3] at textFile at :12), which
> has
> no missing parents
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
> curMem=507154, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 2.3 KB, free 264.7 MB)
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
> curMem=509554, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes
> in memory (estimated size 1813.0 B, free 264.7 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
> 14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
> Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at :12)
> 14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
> tasks
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-3.abc.cloud/10.40.13.192:36133]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-2.abc.cloud/10.40.13.195:35716]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-1.abc.cloud/10.40.13.194:33728]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502]
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502], 1 messages pending
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> 

Re: MLLib /ALS : java.lang.OutOfMemoryError: Java heap space

2014-12-16 Thread Gen
Hi,How many clients and how many products do you have?CheersGen
jaykatukuri wrote
> Hi all,I am running into an out of memory error while running ALS using
> MLLIB on a reasonably small data set consisting of around 6 Million
> ratings.The stack trace is below:java.lang.OutOfMemoryError: Java heap
> space at org.jblas.DoubleMatrix.

> (DoubleMatrix.java:323)   at
> org.jblas.DoubleMatrix.zeros(DoubleMatrix.java:471)   at
> org.jblas.DoubleMatrix.zeros(DoubleMatrix.java:476)   at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$21.apply(ALS.scala:465)
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$21.apply(ALS.scala:465)
> at scala.Array$.fill(Array.scala:267) at
> org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:465)
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:445)
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:444)
> at
> org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
> at
> org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:156)
> at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:154) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)   at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)  at
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)   at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)  at
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)  at
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)   at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)  at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.Task.run(Task.scala:51) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)I am
> using 2GB for executors memory.  I tried with 100 executors.Can some one
> please point me in the right direction ?Thanks,Jay





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-java-lang-OutOfMemoryError-Java-heap-space-tp20584p20714.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Why so many tasks?

2014-12-16 Thread Gerard Maas
Creating an RDD from a wildcard like this:
val data = sc.textFile("/user/foo/myfiles/*")

Will create 1 partition for each file found. 1000 files = 1000 partitions.
A task is a job stage (defined as a sequence of transformations) applied to
a partition, so 1000 partitions = 1000 tasks per stage.

You can reduce the amount of partitions at any time with rdd.coalesce:
val coalescedRDD = data.coalesce (10)  // 10 partitions

-kr, Gerard.
@maasg




On Tue, Dec 16, 2014 at 1:51 PM, bethesda  wrote:
>
> Our job is creating what appears to be an inordinate number of very small
> tasks, which blow out our os inode and file limits.  Rather than
> continually
> upping those limits, we are seeking to understand whether our real problem
> is that too many tasks are running, perhaps because we are mis-configured
> or
> we are coding incorrectly.
>
> Rather than posting our actual code I have re-created the essence of the
> matter in the shell with a directory of files simulating the data we deal
> with.  We have three servers, each with 8G RAM.
>
> Given 1,000 files, each containing a string of 100 characters, in the
> myfiles directory:
>
> val data = sc.textFile("/user/foo/myfiles/*")
>
> val c = data.count
>
> The count operation produces 1,000 tasks.  Is this normal?
>
> val cart = data.cartesian(data)
> cart.count
>
> The cartesian operation produces 1M tasks.  I understand that the cartesian
> product of 1,000 items against itself is 1M, however, it seems the overhead
> of all this task creation and file I/O of all these tiny files outweighs
> the
> gains of distributed computing.  What am I missing here?
>
> Below is the truncated output of the count operation, if this helps
> indicate
> a configuration problem.
>
> Thank you.
>
> scala> data.count
> 14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
> 14/12/16 07:40:47 INFO SparkContext: Starting job: count at :15
> 14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at :15) with
> 1000 output partitions (allowLocal=false)
> 14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
> :15)
> 14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
> (/user/ds/randomfiles/* MappedRDD[3] at textFile at :12), which
> has
> no missing parents
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
> curMem=507154, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 2.3 KB, free 264.7 MB)
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
> curMem=509554, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes
> in memory (estimated size 1813.0 B, free 264.7 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
> 14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
> Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at :12)
> 14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
> tasks
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-3.abc.cloud/10.40.13.192:36133]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-2.abc.cloud/10.40.13.195:35716]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-1.abc.cloud/10.40.13.194:33728]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502]
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [d

Re: Why so many tasks?

2014-12-16 Thread Gen
Hi,

As you have 1,000 files, the RDD created by textFile will have 1,000
partitions. It is normal. In fact, as the same principal of HDFS, it is
better to store data with smaller number of files but larger size file. 

You can use data.coalesce(10) to solve this problem(it reduce the number of
partitions). 

Cheers
Gen



bethesda wrote
> Our job is creating what appears to be an inordinate number of very small
> tasks, which blow out our os inode and file limits.  Rather than
> continually upping those limits, we are seeking to understand whether our
> real problem is that too many tasks are running, perhaps because we are
> mis-configured or we are coding incorrectly.
> 
> Rather than posting our actual code I have re-created the essence of the
> matter in the shell with a directory of files simulating the data we deal
> with.  We have three servers, each with 8G RAM.
> 
> Given 1,000 files, each containing a string of 100 characters, in the
> myfiles directory:
> 
> val data = sc.textFile("/user/foo/myfiles/*")
> 
> val c = data.count
> 
> The count operation produces 1,000 tasks.  Is this normal?
> 
> val cart = data.cartesian(data)
> cart.count
> 
> The cartesian operation produces 1M tasks.  I understand that the
> cartesian product of 1,000 items against itself is 1M, however, it seems
> the overhead of all this task creation and file I/O of all these tiny
> files outweighs the gains of distributed computing.  What am I missing
> here?
> 
> Below is the truncated output of the count operation, if this helps
> indicate a configuration problem.
> 
> Thank you.
> 
> scala> data.count
> 14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process :
> 1000
> 14/12/16 07:40:47 INFO SparkContext: Starting job: count at 
> 
> :15
> 14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at 
> 
> :15) with 1000 output partitions (allowLocal=false)
> 14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at 
> 
> :15)
> 14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
> (/user/ds/randomfiles/* MappedRDD[3] at textFile at 
> 
> :12), which has no missing parents
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
> curMem=507154, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 2.3 KB, free 264.7 MB)
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
> curMem=509554, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes in memory (estimated size 1813.0 B, free 264.7 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in
> memory on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
> 14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
> Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at 
> 
> :12)
> 14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
> tasks
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-3.abc.cloud/10.40.13.192:36133]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-2.abc.cloud/10.40.13.195:35716]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-1.abc.cloud/10.40.13.194:33728]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502]
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver

Error when Applying schema to a dictionary with a Tuple as key

2014-12-16 Thread sahanbull

Hi Guys,

Im running a spark cluster in AWS with Spark 1.1.0 in EC2

I am trying to convert a an RDD with tuple 

(u'string', int , {(int, int): int, (int, int): int})

to a schema rdd using the schema:

fields = [StructField('field1',StringType(),True),
StructField('field2',IntegerType(),True),

StructField('field3',MapType(StructType([StructField('field31',IntegerType(),True),

StructField('field32',IntegerType(),True)]),IntegerType(),True),True)
]

schema = StructType(fields)
# generate the schemaRDD with the defined schema 
schemaRDD = sqc.applySchema(RDD, schema)

But when I add "field3" to the schema, it throws an execption: 

Traceback (most recent call last):
  File "", line 1, in 
  File "/root/spark/python/pyspark/rdd.py", line 1153, in take
res = self.context.runJob(self, takeUpToNumLeft, p, True)
  File "/root/spark/python/pyspark/context.py", line 770, in runJob
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
javaPartitions, allowLocal)
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 28.0 failed 4 times, most recent failure: Lost task 0.3 in stage
28.0 (TID 710, ip-172-31-29-120.ec2.internal):
net.razorvine.pickle.PickleException: couldn't introspect javabean:
java.lang.IllegalArgumentException: wrong number of arguments
net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.put_map(Pickler.java:321)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:412)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:412)
net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
net.razorvine.pickle.Pickler.save(Pickler.java:125)
net.razorvine.pickle.Pickler.dump(Pickler.java:95)
net.razorvine.pickle.Pickler.dumps(Pickler.java:80)
   
org.apache.spark.sql.SchemaRDD$$anonfun$javaToPython$1$$anonfun$apply$2.apply(SchemaRDD.scala:417)
   
org.apache.spark.sql.SchemaRDD$$anonfun$javaToPython$1$$anonfun$apply$2.apply(SchemaRDD.scala:417)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
   
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:331)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
   
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.

Re: Passing Spark Configuration from Driver (Master) to all of the Slave nodes

2014-12-16 Thread Gerard Maas
Hi Demi,

Thanks for sharing.

What we usually do is let the driver read the configuration for the job and
pass the config object to the actual job as a serializable object. That way
avoids the need of a centralized config sharing point that needs to be
accessed from the workers. as you have defined in your solution.
We use chef to write the configuration for the job in the environment it
belongs to (dev, prod,...) when the job gets deployed to a host node. That
config file is used to instantiate the job.

We can maintain any number of different environments in that way.

kr, Gerard.


On Fri, Dec 12, 2014 at 6:38 PM, Demi Ben-Ari  wrote:
>
> Hi to all,
>
> Our problem was passing configuration from Spark Driver to the Slaves.
> After a lot of time spent figuring out how things work, this is the
> solution I came up with.
> Hope this will be helpful for others as well.
>
>
> You can read about it in my Blog Post
> 
>
> --
> Enjoy,
> Demi Ben-Ari
> Senior Software Engineer
> Windward LTD.
>


Re: KafkaUtils explicit acks

2014-12-16 Thread Cody Koeninger
Do you actually need spark streaming per se for your use case?  If you're
just trying to read data out of kafka into hbase, would something like this
non-streaming rdd work for you:

https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka/src/main/scala/org/apache/spark/rdd/kafka

Note that if you're trying to get exactly-once semantics out of kafka, you
need either idempotent writes, or a transactional relationship between the
place you're storing data and the place you're storing offsets.  Using
normal batch rdds instead of streaming makes the second approach pretty
trivial actually.

On Tue, Dec 16, 2014 at 6:34 AM, Mukesh Jha  wrote:
>
> I agree that this is not a trivial task as in this approach the kafka
> ack's will be done by the SparkTasks that means a plug-able mean to ack
> your input data source i.e. changes in core.
>
> From my limited experience with Kafka + Spark what I've seem is If spark
> tasks takes longer time than the batch interval the next batch waits for
> the previous one to finish, so I was wondering if offset management can be
> done by spark too.
>
> I'm just trying to figure out if this seems to be a worthwhile addition to
> have?
>
> On Mon, Dec 15, 2014 at 11:39 AM, Shao, Saisai 
> wrote:
>>
>>  Hi,
>>
>>
>>
>> It is not a trivial work to acknowledge the offsets when RDD is fully
>> processed, I think from my understanding only modify the KafakUtils is not
>> enough to meet your requirement, you need to add a metadata management
>> stuff for each block/RDD, and track them both in executor-driver side, and
>> many other things should also be taken care J.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* mukh@gmail.com [mailto:mukh@gmail.com] *On Behalf Of *Mukesh
>> Jha
>> *Sent:* Monday, December 15, 2014 1:31 PM
>> *To:* Tathagata Das
>> *Cc:* francois.garil...@typesafe.com; user@spark.apache.org
>> *Subject:* Re: KafkaUtils explicit acks
>>
>>
>>
>> Thanks TD & Francois for the explanation & documentation. I'm curious if
>> we have any performance benchmark with & without WAL for
>> spark-streaming-kafka.
>>
>>
>>
>> Also In spark-streaming-kafka (as kafka provides a way to acknowledge
>> logs) on top of WAL can we modify KafkaUtils to acknowledge the offsets
>> only when the RRDs are fully processed and are getting evicted out of the
>> Spark memory thus we can be cent percent sure that all the records are
>> getting processed in the system.
>>
>> I was thinking if it's good to have the kafka offset information of each
>> batch as part of RDDs metadata and commit the offsets once the RDDs lineage
>> is complete.
>>
>>
>>
>> On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>> I am updating the docs right now. Here is a staged copy that you can
>> have sneak peek of. This will be part of the Spark 1.2.
>>
>>
>> http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html
>>
>> The updated fault-tolerance section tries to simplify the explanation
>> of when and what data can be lost, and how to prevent that using the
>> new experimental feature of write ahead logs.
>> Any feedback will be much appreciated.
>>
>> TD
>>
>>
>> On Wed, Dec 10, 2014 at 2:42 AM,   wrote:
>> > [sorry for the botched half-message]
>> >
>> > Hi Mukesh,
>> >
>> > There’s been some great work on Spark Streaming reliability lately.
>> > https://www.youtube.com/watch?v=jcJq3ZalXD8
>> > Look at the links from:
>> > https://issues.apache.org/jira/browse/SPARK-3129
>> >
>> > I’m not aware of any doc yet (did I miss something ?) but you can look
>> at
>> > the ReliableKafkaReceiver’s test suite:
>> >
>> >
>> external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
>> >
>> > —
>> > FG
>> >
>> >
>> > On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha 
>> > wrote:
>> >>
>> >> Hello Guys,
>> >>
>> >> Any insights on this??
>> >> If I'm not clear enough my question is how can I use kafka consumer and
>> >> not loose any data in cases of failures with spark-streaming.
>> >>
>> >> On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha 
>> >> wrote:
>> >>>
>> >>> Hello Experts,
>> >>>
>> >>> I'm working on a spark app which reads data from kafka & persists it
>> in
>> >>> hbase.
>> >>>
>> >>> Spark documentation states the below [1] that in case of worker
>> failure
>> >>> we can loose some data. If not how can I make my kafka stream more
>> reliable?
>> >>> I have seen there is a simple consumer [2] but I'm not sure if it has
>> >>> been used/tested extensively.
>> >>>
>> >>> I was wondering if there is a way to explicitly acknowledge the kafka
>> >>> offsets once they are replicated in memory of other worker nodes (if
>> it's
>> >>> not already done) to tackle this issue.
>> >>>
>> >>> Any help is appreciated in advance.
>> >>>
>> >>>
>> >>> Using any input source that receives data through a network - For
>> >>> network-based data sources like Kafka and Flume, the received input
>> data is
>> >>> replicated in memory

Streaming | Partition count mismatch exception while saving data in RDD

2014-12-16 Thread Aniket Bhatnagar
I am using spark 1.1.0 running a streaming job that uses updateStateByKey
and then (after a bunch of maps/flatMaps) does a foreachRDD to save data in
each RDD by making HTTP calls. The issue is that each time I attempt to
save the RDD (using foreach on RDD), it gives me the following exception:

org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[467] at apply
at List.scala:318(0) has different number of partitions than original RDD
MapPartitionsRDD[461] at mapPartitions at StateDStream.scala:71(56)

I read through the code but couldn't understand why this exception is
happening. Any help would be appreciated!

Thanks,
Aniket


Re: Why so many tasks?

2014-12-16 Thread bethesda
Thank you!  I had known about the small-files problem in HDFS but didn't
realize that it affected sc.textFile().



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712p20717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Appending an incrental value to each RDD record

2014-12-16 Thread bethesda
I think this is sort of a newbie question, but I've checked the api closely
and don't see an obvious answer:

Given an RDD, how would I create a new RDD of Tuples where the first Tuple
value is an incremented Int e.g. 1,2,3 ... and the second value of the Tuple
is the original RDD record?  I'm trying to simply assign a unique ID to each
record in my RDD.  (I want to stay in RDD land, and not convert to a List
and back to RDD, since that seems unnecessary and probably bad form.)

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Appending-an-incrental-value-to-each-RDD-record-tp20718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



NullPointerException on cluster mode when using foreachPartition

2014-12-16 Thread richiesgr
Hi

This time I need expert.
On 1.1.1 and only in cluster (standalone or EC2) 
when I use this code :

countersPublishers.foreachRDD(rdd => {
rdd.foreachPartition(partitionRecords => {
  partitionRecords.foreach(record => {
//dbActorUpdater ! updateDBMessage(record)
println(record)
  })
})
  })

Get NPP (When I run this locally all is OK)

If I use this 
  countersPublishers.foreachRDD(rdd => rdd.collect().foreach(r =>
dbActorUpdater ! updateDBMessage(r)))

There is no problem. I think something is misconfigured 
Thanks for help 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-on-cluster-mode-when-using-foreachPartition-tp20719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Appending an incrental value to each RDD record

2014-12-16 Thread Gerard Maas
You would do:

rdd.zipWithIndexGives you  an RDD[Original, Int] where the second
element is the index.
To have a (index,original) tuple, you will need to map that previous RDD to
the desired shape:
rdd.zipWithIndex.map(_.swap)

-kr, Gerard.



kr, Gerard.

On Tue, Dec 16, 2014 at 4:12 PM, bethesda  wrote:
>
> I think this is sort of a newbie question, but I've checked the api closely
> and don't see an obvious answer:
>
> Given an RDD, how would I create a new RDD of Tuples where the first Tuple
> value is an incremented Int e.g. 1,2,3 ... and the second value of the
> Tuple
> is the original RDD record?  I'm trying to simply assign a unique ID to
> each
> record in my RDD.  (I want to stay in RDD land, and not convert to a List
> and back to RDD, since that seems unnecessary and probably bad form.)
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Appending-an-incrental-value-to-each-RDD-record-tp20718.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Appending an incrental value to each RDD record

2014-12-16 Thread mj
You could try using zipWIthIndex (links below to API docs). For example, in
python:

items =['a','b','c']
items2= sc.parallelize(items)

print(items2.first())

items3=items2.map(lambda x: (x, x+"!"))

print(items3.first())

items4=items3.zipWithIndex()

print(items4.first())

items5=items4.map(lambda x: (x[1], x[0]))
print(items5.first())


This will give you an output of (0, ('a', 'a!')) - where the 0 is the index.
You could also use a map to increment them up by a value (e.g. if you wanted
to count from 1).

Links
http://spark.apache.org/docs/latest/api/python/index.html
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Appending-an-incrental-value-to-each-RDD-record-tp20718p20720.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
That's the first thing I tried... still the same error:
hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
hdfs@ams-rsrv01:/tmp/spark/spark-branch-1.1$ ./bin/spark-shell

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/deploy/SparkSubmit
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.deploy.SparkSubmit
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
Could not find the main class: org.apache.spark.deploy.SparkSubmit. Program
will exit.

Thanks,
Daniel

On Tue, Dec 16, 2014 at 1:53 PM, Akhil Das 
wrote:
>
> print the CLASSPATH and make sure the spark assembly jar is there in the
> classpath
>
> Thanks
> Best Regards
>
> On Tue, Dec 16, 2014 at 5:04 PM, Daniel Haviv 
> wrote:
>>
>> Hi,
>> I've built spark successfully with maven but when I try to run
>> spark-shell I get the following errors:
>>
>> Spark assembly has been built with Hive, including Datanucleus jars on
>> classpath
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/deploy/SparkSubmit
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.deploy.SparkSubmit
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
>>
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at
>> java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find the
>> main class: org.apache.spark.deploy.SparkSubmit. Program will exit.
>> What can be wrong?
>>
>> Thanks,
>> Daniel
>>
>


Re: Why so many tasks?

2014-12-16 Thread Ashish Rangole
Take a look at combine file input format. Repartition or coalesce could
introduce shuffle I/O overhead.
On Dec 16, 2014 7:09 AM, "bethesda"  wrote:

> Thank you!  I had known about the small-files problem in HDFS but didn't
> realize that it affected sc.textFile().
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712p20717.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.2 + Avro does not work in HDP2.2

2014-12-16 Thread manasdebashiskar
Hi All, 
 I saw some helps online about forcing avro-mapred to hadoop2 using
classifiers.

 Now my configuration is thus
 val avro= "org.apache.avro" % "avro-mapred" % V.avro classifier
"hadoop2" 

How ever I still get java.lang.IncompatibleClassChangeError. I think I am
not building spark correctly. Clearly the following steps is missing
something avro related.

/( mvn -Pyarn -Dhadoop.version=2.6.0 -Dyarn.version=2.6.0 -Phadoop-2.3
-Phive -DskipTests clean package)/


*Can someone please help me build spark1.2 for either CDH5.2 or HDP2.2  +
Hive + Avro *

Thanks





-
Manas Kar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Avro-does-not-work-in-HDP2-2-tp20667p20721.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming | Partition count mismatch exception while saving data in RDD

2014-12-16 Thread Aniket Bhatnagar
It turns out that this happens when checkpoint is set to a local directory
path. I have opened a JIRA SPARK-4862 for Spark streaming to output better
error message.

Thanks,
Aniket

On Tue Dec 16 2014 at 20:08:13 Aniket Bhatnagar 
wrote:

> I am using spark 1.1.0 running a streaming job that uses updateStateByKey
> and then (after a bunch of maps/flatMaps) does a foreachRDD to save data in
> each RDD by making HTTP calls. The issue is that each time I attempt to
> save the RDD (using foreach on RDD), it gives me the following exception:
>
> org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[467] at
> apply at List.scala:318(0) has different number of partitions than original
> RDD MapPartitionsRDD[461] at mapPartitions at StateDStream.scala:71(56)
>
> I read through the code but couldn't understand why this exception is
> happening. Any help would be appreciated!
>
> Thanks,
> Aniket
>


Re: Spark 1.2 + Avro does not work in HDP2.2

2014-12-16 Thread Sean Owen
Given that the error is

java.lang.IncompatibleClassChangeError: Found interface
org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

...this usually means there is a Hadoop version problem.

But in particular it's
https://issues.apache.org/jira/browse/SPARK-3039 which affects
assembly using the Hive code.

There's a workaround there but I'm not sure what the resolution is.

You may have luck building for hadoop.version=2.5.0-cdh5.2.1, if your
cluster in CDH. The packaging probably harmonizes this correctly even
with the Hive profile, but I have not tested this myself.

On Tue, Dec 16, 2014 at 4:19 PM, manasdebashiskar
 wrote:
> Hi All,
>  I saw some helps online about forcing avro-mapred to hadoop2 using
> classifiers.
>
>  Now my configuration is thus
>  val avro= "org.apache.avro" % "avro-mapred" % V.avro classifier
> "hadoop2"
>
> How ever I still get java.lang.IncompatibleClassChangeError. I think I am
> not building spark correctly. Clearly the following steps is missing
> something avro related.
>
> /( mvn -Pyarn -Dhadoop.version=2.6.0 -Dyarn.version=2.6.0 -Phadoop-2.3
> -Phive -DskipTests clean package)/
>
>
> *Can someone please help me build spark1.2 for either CDH5.2 or HDP2.2  +
> Hive + Avro *
>
> Thanks
>
>
>
>
>
> -
> Manas Kar
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Avro-does-not-work-in-HDP2-2-tp20667p20721.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Akhil Das
This is how it looks on my machine.

[image: Inline image 1]

Thanks
Best Regards

On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv  wrote:
>
> That's the first thing I tried... still the same error:
> hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
> hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
> hdfs@ams-rsrv01:/tmp/spark/spark-branch-1.1$ ./bin/spark-shell
>
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/deploy/SparkSubmit
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.deploy.SparkSubmit
> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> Could not find the main class: org.apache.spark.deploy.SparkSubmit.
> Program will exit.
>
> Thanks,
> Daniel
>
> On Tue, Dec 16, 2014 at 1:53 PM, Akhil Das 
> wrote:
>>
>> print the CLASSPATH and make sure the spark assembly jar is there in the
>> classpath
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Dec 16, 2014 at 5:04 PM, Daniel Haviv 
>> wrote:
>>>
>>> Hi,
>>> I've built spark successfully with maven but when I try to run
>>> spark-shell I get the following errors:
>>>
>>> Spark assembly has been built with Hive, including Datanucleus jars on
>>> classpath
>>>
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/deploy/SparkSubmit
>>>
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.spark.deploy.SparkSubmit
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
>>>
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at
>>> java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find the
>>> main class: org.apache.spark.deploy.SparkSubmit. Program will exit.
>>> What can be wrong?
>>>
>>> Thanks,
>>> Daniel
>>>
>>


Kafka Receiver not recreated after executor died

2014-12-16 Thread Luis Ángel Vicente Sánchez
Dear spark community,


We were testing a spark failure scenario where the executor that is running
a Kafka Receiver dies.

We are running our streaming jobs on top of mesos and we killed the mesos
slave that was running the executor ; a new executor was created on another
mesos-slave but according to the Driver UI, the Kafka receiver location is
still on the dead slave.

It seems that if the executor where the receiver was running dies, spark on
top of mesos is not able to create a new receiver on a different executor
to continue working.

Is that a known issue? Is there a way we could ensure that the receiver
would be recreated?

Kind regards,

Luis Vicente


Re: Kafka Receiver not recreated after executor died

2014-12-16 Thread Luis Ángel Vicente Sánchez
It seems to be slightly related to this:

https://issues.apache.org/jira/browse/SPARK-1340

But in this case, it's not the Task that is failing but the entire executor
where the Kafka Receiver resides.

2014-12-16 16:53 GMT+00:00 Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com>:
>
> Dear spark community,
>
>
> We were testing a spark failure scenario where the executor that is
> running a Kafka Receiver dies.
>
> We are running our streaming jobs on top of mesos and we killed the mesos
> slave that was running the executor ; a new executor was created on another
> mesos-slave but according to the Driver UI, the Kafka receiver location is
> still on the dead slave.
>
> It seems that if the executor where the receiver was running dies, spark
> on top of mesos is not able to create a new receiver on a different
> executor to continue working.
>
> Is that a known issue? Is there a way we could ensure that the receiver
> would be recreated?
>
> Kind regards,
>
> Luis Vicente
>


Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
Same here...
# jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep
SparkSubmit.class
*org/apache/spark/deploy/SparkSubmit.class*






On Tue, Dec 16, 2014 at 6:50 PM, Akhil Das 
wrote:
>
> This is how it looks on my machine.
>
> [image: Inline image 1]
>
> Thanks
> Best Regards
>
> On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv 
> wrote:
>>
>> That's the first thing I tried... still the same error:
>> hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
>> hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
>> hdfs@ams-rsrv01:/tmp/spark/spark-branch-1.1$ ./bin/spark-shell
>>
>> Spark assembly has been built with Hive, including Datanucleus jars on
>> classpath
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/deploy/SparkSubmit
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.deploy.SparkSubmit
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
>> Could not find the main class: org.apache.spark.deploy.SparkSubmit.
>> Program will exit.
>>
>> Thanks,
>> Daniel
>>
>> On Tue, Dec 16, 2014 at 1:53 PM, Akhil Das 
>> wrote:
>>>
>>> print the CLASSPATH and make sure the spark assembly jar is there in the
>>> classpath
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Dec 16, 2014 at 5:04 PM, Daniel Haviv 
>>> wrote:

 Hi,
 I've built spark successfully with maven but when I try to run
 spark-shell I get the following errors:

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 Exception in thread "main" java.lang.NoClassDefFoundError:
 org/apache/spark/deploy/SparkSubmit

 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.deploy.SparkSubmit

 at java.net.URLClassLoader$1.run(URLClassLoader.java:217)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:205)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:323)

 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at
 java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find
 the main class: org.apache.spark.deploy.SparkSubmit. Program will exit.
 What can be wrong?

 Thanks,
 Daniel

>>>


Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Akhil Das
And this is how my classpath looks like

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar

Thanks
Best Regards

On Tue, Dec 16, 2014 at 10:33 PM, Daniel Haviv 
wrote:
>
> Same here...
> # jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep
> SparkSubmit.class
> *org/apache/spark/deploy/SparkSubmit.class*
>
>
>
>
>
>
> On Tue, Dec 16, 2014 at 6:50 PM, Akhil Das 
> wrote:
>>
>> This is how it looks on my machine.
>>
>> [image: Inline image 1]
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv 
>> wrote:
>>>
>>> That's the first thing I tried... still the same error:
>>> hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
>>> hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
>>> hdfs@ams-rsrv01:/tmp/spark/spark-branch-1.1$ ./bin/spark-shell
>>>
>>> Spark assembly has been built with Hive, including Datanucleus jars on
>>> classpath
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> org/apache/spark/deploy/SparkSubmit
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.spark.deploy.SparkSubmit
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
>>> Could not find the main class: org.apache.spark.deploy.SparkSubmit.
>>> Program will exit.
>>>
>>> Thanks,
>>> Daniel
>>>
>>> On Tue, Dec 16, 2014 at 1:53 PM, Akhil Das 
>>> wrote:

 print the CLASSPATH and make sure the spark assembly jar is there in
 the classpath

 Thanks
 Best Regards

 On Tue, Dec 16, 2014 at 5:04 PM, Daniel Haviv 
 wrote:
>
> Hi,
> I've built spark successfully with maven but when I try to run
> spark-shell I get the following errors:
>
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/deploy/SparkSubmit
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.deploy.SparkSubmit
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)at
> java.lang.ClassLoader.loadClass(ClassLoader.java:268)Could not find
> the main class: org.apache.spark.deploy.SparkSubmit. Program will exit.
> What can be wrong?
>
> Thanks,
> Daniel
>



Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
I've added every jar in the lib dir to my classpath and still no luck:

CLASSPATH=/tmp/spark/spark-branch-1.1/lib/datanucleus-api-jdo-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/TestSerDe.jar




On Tue, Dec 16, 2014 at 7:05 PM, Akhil Das 
wrote:
>
> And this is how my classpath looks like
>
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
>
> ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar
>
> Thanks
> Best Regards
>
> On Tue, Dec 16, 2014 at 10:33 PM, Daniel Haviv 
> wrote:
>>
>> Same here...
>> # jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep
>> SparkSubmit.class
>> *org/apache/spark/deploy/SparkSubmit.class*
>>
>>
>>
>>
>>
>>
>> On Tue, Dec 16, 2014 at 6:50 PM, Akhil Das 
>> wrote:
>>>
>>> This is how it looks on my machine.
>>>
>>> [image: Inline image 1]
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv 
>>> wrote:

 That's the first thing I tried... still the same error:
 hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
 hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
 hdfs@ams-rsrv01:/tmp/spark/spark-branch-1.1$ ./bin/spark-shell

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Exception in thread "main" java.lang.NoClassDefFoundError:
 org/apache/spark/deploy/SparkS

Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Akhil Das
Can you open the file bin/spark-class and then put an echo $CLASSPATH below
the place where they exports it and see what are the contents?
On 16 Dec 2014 22:46, "Daniel Haviv"  wrote:

> I've added every jar in the lib dir to my classpath and still no luck:
>
>
> CLASSPATH=/tmp/spark/spark-branch-1.1/lib/datanucleus-api-jdo-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/TestSerDe.jar
>
>
>
>
> On Tue, Dec 16, 2014 at 7:05 PM, Akhil Das 
> wrote:
>>
>> And this is how my classpath looks like
>>
>> Spark assembly has been built with Hive, including Datanucleus jars on
>> classpath
>>
>> ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Dec 16, 2014 at 10:33 PM, Daniel Haviv 
>> wrote:
>>>
>>> Same here...
>>> # jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep
>>> SparkSubmit.class
>>> *org/apache/spark/deploy/SparkSubmit.class*
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Dec 16, 2014 at 6:50 PM, Akhil Das 
>>> wrote:

 This is how it looks on my machine.

 [image: Inline image 1]

 Thanks
 Best Regards

 On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv 
 wrote:
>
> That's the first thing I tried... still the same error:
> hdfs@ams-rsrv01:~$ export CLASSPATH=/tmp/spark/spark-branch-1.1/lib
> hdfs@ams-rsrv01:~$ cd /tmp/spark/spark-branch-1.1
> hdfs@ams-rsrv01:/

Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
Completely diffrent than the one I set:
Classpath is
 
::/tmp/spark/spark-branch-1.1/conf:/tmp/spark/spark-branch-1.1/assembly/target/scala-2.10/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib_managed/jars/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib_managed/jars/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar



On Tue, Dec 16, 2014 at 7:18 PM, Akhil Das 
wrote:
>
> Can you open the file bin/spark-class and then put an echo $CLASSPATH
> below the place where they exports it and see what are the contents?
> On 16 Dec 2014 22:46, "Daniel Haviv"  wrote:
>
>> I've added every jar in the lib dir to my classpath and still no luck:
>>
>>
>> CLASSPATH=/tmp/spark/spark-branch-1.1/lib/datanucleus-api-jdo-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/TestSerDe.jar
>>
>>
>>
>>
>> On Tue, Dec 16, 2014 at 7:05 PM, Akhil Das 
>> wrote:
>>>
>>> And this is how my classpath looks like
>>>
>>> Spark assembly has been built with Hive, including Datanucleus jars on
>>> classpath
>>>
>>> ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Dec 16, 2014 at 10:33 PM, Daniel Haviv 
>>> wrote:

 Same here...
 # jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep
 SparkSubmit.class
 *org/apache/spark/deploy/Spark

Re: Could not find the main class: org.apache.spark.deploy.SparkSubmit

2014-12-16 Thread Daniel Haviv
I'm using CDH5 that was installed via Cloudera Manager.
Does it matter?


Thanks,
Daniel

> On 16 בדצמ׳ 2014, at 19:18, Akhil Das  wrote:
> 
> Can you open the file bin/spark-class and then put an echo $CLASSPATH below 
> the place where they exports it and see what are the contents?
> 
>> On 16 Dec 2014 22:46, "Daniel Haviv"  wrote:
>> I've added every jar in the lib dir to my classpath and still no luck:
>> 
>> CLASSPATH=/tmp/spark/spark-branch-1.1/lib/datanucleus-api-jdo-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-core-3.2.2.jar:/tmp/spark/spark-branch-1.1/lib/datanucleus-rdbms-3.2.1.jar:/tmp/spark/spark-branch-1.1/lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-bagel_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-catalyst_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-core_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples-1.1.2-SNAPSHOT-hadoop2.3.0.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-examples_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-graphx_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-hive-thriftserver_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-mllib_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-repl_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-sql_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming_2.10-1.1.2-SNAPSHOT-tests.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-flume-sink_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-kafka_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-mqtt_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-twitter_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-streaming-zeromq_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-tools_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT.jar:/tmp/spark/spark-branch-1.1/lib/spark-yarn_2.10-1.1.2-SNAPSHOT-sources.jar:/tmp/spark/spark-branch-1.1/lib/TestSerDe.jar
>> 
>> 
>> 
>> 
>>> On Tue, Dec 16, 2014 at 7:05 PM, Akhil Das  
>>> wrote:
>>> And this is how my classpath looks like
>>> 
>>> Spark assembly has been built with Hive, including Datanucleus jars on 
>>> classpath
>>> ::/home/akhld/mobi/localcluster/spark-1/conf:/home/akhld/mobi/localcluster/spark-1/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-core-3.2.2.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-rdbms-3.2.1.jar:/home/akhld/mobi/localcluster/spark-1/lib/datanucleus-api-jdo-3.2.1.jar
>>> 
>>> Thanks
>>> Best Regards
>>> 
 On Tue, Dec 16, 2014 at 10:33 PM, Daniel Haviv  
 wrote:
 Same here...
 # jar tf lib/spark-assembly-1.1.2-SNAPSHOT-hadoop2.3.0.jar  | grep 
 SparkSubmit.class
 org/apache/spark/deploy/SparkSubmit.class
 
 
 
 
 
 
> On Tue, Dec 16, 2014 at 6:50 PM, Akhil Das  
> wrote:
> This is how it looks on my machine.
> 
> 
> 
> Thanks
> Best Regards
> 
>> On Tue, Dec 16, 2014 at 9:33 PM, Daniel Haviv  
>> wrote:
>> That's the first thing I tried...

Re: Appending an incrental value to each RDD record

2014-12-16 Thread bethesda
Thanks! zipWithIndex() works well.  I had overlooked it because the name
'zip' is rather odd



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Appending-an-incrental-value-to-each-RDD-record-tp20718p20722.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: integrating long-running Spark jobs with Thriftserver

2014-12-16 Thread Tim Schweichler
To ask a related question, if I use Zookeeper for table locking, will this 
affect all attempts to access the Hive tables (including those from my Spark 
applications) or only those made through the Thriftserver? In other words, does 
Zookeeper provide concurrency for the Hive metastore in general or only for 
Hiveserver2/Spark's Thriftserver?

Thanks!

From: Tim Schweichler 
mailto:tim.schweich...@healthination.com>>
Date: Monday, December 15, 2014 at 10:56 AM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: integrating long-running Spark jobs with Thriftserver

Hi everybody,

I apologize if the answer to my question is obvious but I haven't been able to 
find a straightforward solution anywhere on the internet.

I have a number of Spark jobs written using the python API that do things like 
read in data from Amazon S3 to a main table in the Hive metastore, perform 
intensive calculations on that data to build derived/aggregated tables, etc. I 
also have Tableau set up to read those tables via the Spark Thriftserver.

My question is how best to integrate those two sides of Spark. I want to have 
the Thriftserver constantly running so that Tableau can update its extracts on 
a scheduled basis and users can manually query those tables as needed, but I 
also need to run those python jobs on a scheduled basis as well. What's the 
best way to do that? The options I'm considering are as follows:


  1.  Simply call the python jobs via spark-submit, scheduled by cron. My 
concern here is concurrency issues if Tableau or a user tries to read from a 
table at the same time that a job is rebuilding/updating that table. To my 
understanding the Thriftserver is designed to handle concurrency, but Spark in 
general is not if two different Spark contexts are attempting to access the 
same data (as would be the case with this approach.) Am I correct in that 
thinking or is there actually no problem with this method?
  2.  Call the python jobs through the Spark Thriftserver so that the same 
Spark context is used. My question here is how to do that. I know one can call 
a python script as part of a HiveQL query using TRANSFORM, but that seems to be 
designed more for performing quick calculations on existing data as part of a 
query rather than building tables in the first place or calling long-running 
jobs that don't return anything (again, am I correct in this thinking or would 
this actually be a viable solution?) Is there a different way to call 
long-running Spark jobs via the Thriftserver?

Are either of these good approaches or is there a better way that I'm missing?

Thanks!


No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
Okay,

I have an rdd that I want to run an aggregate over but it insists on
spilling to disk even though I structured the processing to only require a
single pass.

In other words, I can do all of my processing one entry in the rdd at a time
without persisting anything.

I set rdd.persist(StorageLevel.NONE) and it had no affect. When I run
locally I get my /tmp directory filled with transient rdd data even though I
never need the data again after the row's been processed. Is there a way to
turn this off?

Thanks
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2 + Avro file does not work in HDP2.2

2014-12-16 Thread Zhan Zhang
Hi Manas,

There is a small patch needed for HDP2.2. You can refer to this PR
https://github.com/apache/spark/pull/3409

There are some other issues compiling against hadoop2.6. But we will fully 
support it very soon. You can ping me, if you want.

Thanks.

Zhan Zhang

On Dec 12, 2014, at 11:38 AM, Manas Kar  wrote:

> Hi Experts, 
>  I have recently installed HDP2.2(Depends on hadoop 2.6).
>  My spark 1.2 is built with hadoop 2.4 profile.
> 
>  My program has following dependencies
> val avro= "org.apache.avro" % "avro-mapred" %"1.7.7"
> val spark   = "org.apache.spark" % "spark-core_2.10" % "1.2.0" % 
> "provided"
> 
> My program to read avro files fails with the following error. What am I doing 
> wrong?
> 
> 
> java.lang.IncompatibleClassChangeError: Found interface 
> org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
>   at 
> org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
>   at 
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:133)
>   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>   at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
In case a little more information is helpful:

the RDD is constructed using sc.textFile(fileUri) where the fileUri is to a
".gz" file (that's too big to fit on my disk).

I do an rdd.persist(StorageLevel.NONE) and it seems to have no affect.

This rdd is what I'm calling aggregate on and I expect to only use it once.
Each row in the rdd never has to be revisited. The aggregate seqOp is
modifying a "current state" and returning it so there's no need to store the
results of the seqOp on a row-by-row basis, and give the fact that there's
one partition the comboOp doesn't even need to be called (since there would
be nothing to combine across partitions).

Thanks for any help.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20724.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No disk single pass RDD aggregation

2014-12-16 Thread Jim Carroll
Nvm. I'm going to post another question since this has to do with the way
spark handles sc.textFile with a file://.gz



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20725.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Understanding disk usage with Accumulators

2014-12-16 Thread Ganelin, Ilya
Hi all – I’m running a long running batch-processing job with Spark through 
Yarn. I am doing the following

Batch Process

val resultsArr = 
sc.accumulableCollection(mutable.ArrayBuffer[ListenableFuture[Result]]())

InMemoryArray.forEach{
1) Using a thread pool, generate callable jobs that operate on an RDD
1a) These callable jobs perform an operation combining that RDD and a 
broadcasted array and store the result of that computation as an Array (Result)
2) Store the results of this operation (upon resolution) in the 
accumulableCollection
}

sc.parallelize(resultsArr).saveAsObjectFile (about 1gb of data), happens a 
total of about 4 times during execution over the course of several hours.

My immediate problem is that during this execution two things happen.

Firstly, on my driver node I eventually run out of memory, and start swapping 
to disk (which causes slowdown). However, each Batch can be processed entirely 
within the available memory on the driver, so basically this memory is somehow 
not being released between runs (even though I leave the context of the 
function running the Batch process)

Secondly, during execution, things are being written to HDFS and I am running 
out of space on the local partitions on the node. Note, this is NOT the 
explicit saveAsObjectFile call that I am making, but appears to be something 
going on with Spark internally.


Can anyone speak to what is going on under the hood here and what I can do to 
resolve this?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Understanding disk usage with Accumulators

2014-12-16 Thread Ganelin, Ilya
Also, this may be related to this issue 
https://issues.apache.org/jira/browse/SPARK-3885.

Further, to clarify, data is being written to Hadoop on the data nodes.

Would really appreciate any help. Thanks!

From: , "Ganelin, Ilya" 
mailto:ilya.gane...@capitalone.com>>
Date: Tuesday, December 16, 2014 at 10:23 AM
To: "'user@spark.apache.org'" 
mailto:user@spark.apache.org>>
Subject: Understanding disk usage with Accumulators

Hi all – I’m running a long running batch-processing job with Spark through 
Yarn. I am doing the following

Batch Process

val resultsArr = 
sc.accumulableCollection(mutable.ArrayBuffer[ListenableFuture[Result]]())

InMemoryArray.forEach{
1) Using a thread pool, generate callable jobs that operate on an RDD
1a) These callable jobs perform an operation combining that RDD and a 
broadcasted array and store the result of that computation as an Array (Result)
2) Store the results of this operation (upon resolution) in the 
accumulableCollection
}

sc.parallelize(resultsArr).saveAsObjectFile (about 1gb of data), happens a 
total of about 4 times during execution over the course of several hours.

My immediate problem is that during this execution two things happen.

Firstly, on my driver node I eventually run out of memory, and start swapping 
to disk (which causes slowdown). However, each Batch can be processed entirely 
within the available memory on the driver, so basically this memory is somehow 
not being released between runs (even though I leave the context of the 
function running the Batch process)

Secondly, during execution, things are being written to HDFS and I am running 
out of space on the local partitions on the node. Note, this is NOT the 
explicit saveAsObjectFile call that I am making, but appears to be something 
going on with Spark internally.


Can anyone speak to what is going on under the hood here and what I can do to 
resolve this?



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Spark 1.1.0 does not spawn more than 6 executors in yarn-client mode and ignores --num-executors

2014-12-16 Thread Aniket Bhatnagar
Hi guys

I am hoping someone might have a clue on why this is happening. Otherwise I
will have to dwell into YARN module's source code to better understand the
issue.

On Wed, Dec 10, 2014, 11:54 PM Aniket Bhatnagar 
wrote:

> I am running spark 1.1.0 on AWS EMR and I am running a batch job that
> should seems to be highly parallelizable in yarn-client mode. But spark
> stop spawning any more executors after spawning 6 executors even though
> YARN cluster has 15 healthy m1.large nodes. I even tried providing
> '--num-executors 60' argument during spark-submit but even that doesn't
> help. A quick look at spark admin UI suggests there are active stages whose
> tasks have not been started yet and even then spark doesn't start more
> executors. I am not sure why. Any help on this would be greatly appreciated.
>
> Here is link to screen shots that I took of spark admin and yarn admin -
> http://imgur.com/a/ztjr7
>
> Thanks,
> Aniket
>
>


Spark handling of a file://xxxx.gz Uri

2014-12-16 Thread Jim Carroll
Is there a way to get Spark to NOT reparition/shuffle/expand a
sc.textFile(fileUri) when the URI is a gzipped file?

Expanding a gzipped file should be thought of as a "transformation" and not
an "action" (if the analogy is apt). There is no need to fully create and
fill out an intermediate RDD with the expanded data when it can be done one
row at a time.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running Spark Job on Yarn from Java Code

2014-12-16 Thread Rahul Swaminathan
Hi all,

I am trying to run a simple Spark_Pi application through Yarn from Java code. I 
have the Spark_Pi class and everything works fine if I run on Spark. However, 
when I set master to "yarn-client" and set yarn mode to true, I keep getting 
exceptions. I suspect this has something to do with the wrong jars being in my 
class path, but I can't seem to find any documentation on exactly what I need. 
Can someone post an example of running a Spark Job from Java code on Yarn? What 
config properties and jars were necessary?

Thanks!
Rahul


Re: Multiple Filter Effiency

2014-12-16 Thread Imran Rashid
I think accumulators do exactly what you want.

(Scala syntax below, I'm just not familiar with the Java equivalent ...)

val f1counts = sc.accumulator (0)
val f2counts = sc.accumulator (0)
val f3counts = sc.accumulator (0)

textfile.foreach { s =>
  if(f1matches) f1counts += 1
  ...
}

Note that you could also do a normal map reduce even though a record might
match more than one filter.  In the scala api you can use flatmap to output
zero or more records:

textfile.flatmap { s =>
  Seq (
 (if (f1matches) Some ("f1" -> 1) else None),
 ...
).flatten
}.reduceByKey { _ + _ }
On Dec 16, 2014 2:07 AM, "zkidkid"  wrote:

> Hi,
> Currently I am trying to count on a document with multiple filter.
> Let say, here is my document:
>
> //user field1 field2 field3
> user1 0 0 1
> user2 0 1 0
> user3 0 0 0
>
> I want to count on user.log for some filters like this:
>
> Filter1: field1 == 0 & field 2 = 0
> Filter2: field1 == 0 & field 3 = 1
> Filter3: field1 == 0 & field 3 = 0
> ...
> and total line.
>
> I have tried and I found that I couldn't use "group by" or "map then
> reduce"
> because a line could match two or more filter.
>
> My idea now is "foreach" line and then maintain a outsite counter service.
>
> Forexample:
>
> JavaRDD textFile = sc.textFile(hdfs, 10);
> long start = System.currentTimeMillis();
>
> textFile.foreach(new VoidFunction() {
>
> public void call(String s) {
>foreach(MyFilter filter: MyFilters){
>if(filter.match(s)) filter.increaseOwnCounter();
>}
> }
> });
>
>
> I would happy if there have another way to do it, any help is appreciate.
> Thanks in advance.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Filter-Effiency-tp20701.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark Sql on Yarn using python

2014-12-16 Thread Sam Flint
I have tested my python script by using the pyspark shell.  I run into an
error because of memory limits on the name node.

I am wondering how I run the script no spark yarn.  I am not familiar with
this at all.

Any help would be greatly appreciated.


Thanks,

-- 

*MAGNE**+**I**C*

*Sam Flint* | *Lead Developer, Data Analytics*


RE: Spark SQL API Doc & IsCached as SQL command

2014-12-16 Thread Judy Nash
Thanks Cheng. Tried it out and saw the InMemoryColumnarTableScan word in the 
physical plan.

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Friday, December 12, 2014 11:37 PM
To: Judy Nash; user@spark.apache.org
Subject: Re: Spark SQL API Doc & IsCached as SQL command


There isn’t a SQL statement that directly maps SQLContext.isCached, but you can 
use EXPLAIN EXTENDED to check whether the underlying physical plan is a 
InMemoryColumnarTableScan.

On 12/13/14 7:14 AM, Judy Nash wrote:
Hello,

Few questions on Spark SQL:


1)  Does Spark SQL support equivalent SQL Query for Scala command: 
IsCached() ?


2)  Is there a documentation spec I can reference for question like this?



Closest doc I can find is this one: 
https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#deploying-in-existing-hive-warehouses


Thanks,
Judy
​


Re: streaming linear regression is not building the model

2014-12-16 Thread tsu-wt
I am having the same issue, and it still does not update for me. I am trying
to execute the example by using bin/run-example



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-linear-regression-is-not-building-the-model-tp18522p20727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cannot parse ListBuffer - StreamingLinearRegression

2014-12-16 Thread tsu-wt
Hi,

I am trying to run StreamingLinearRegression example on single node w/ out
hadoop installed. I keep getting the following error and cannot find any
documentation about the issue:

/14/12/16 13:26:50 ERROR JobScheduler: Error running job streaming job
141875801 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0
(TID 1, localhost): org.apache.spark.SparkException: Cannot parse
ListBuffer([D@3e164f33)./

Please let me know if you have a solution to this issue.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-parse-ListBuffer-StreamingLinearRegression-tp20728.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NumberFormatException

2014-12-16 Thread Imran Rashid
wow, really weird.  My intuition is the same as everyone else's, some
unprintable character.  Here's a couple more debugging tricks I've used in
the past:

//set up an accumulator to catch the bad rows as a side-effect
val nBadRows = sc.accumulator(0)
val nGoodRows = sc.accumulator(0)
val badRows =
sc.accumulableCollection(scala.collection.mutable.Set[String]())

//flatMap so that you can skip the bad rows

datastream.flatMap{ str =>
  try {
val strArray = str.trim().split(",")
val result = (strArray(0).toInt, strArray(1).toInt)
nGoodRows += 1
Some(result)
  }  catch {
case NumberFormatException =>
  nBadRows += 1
  badRows += str
  None
  }
}.saveAsTextFile(...)


if (badRows.value.nonEmpty) {
  println(" BAD ROWS *")
  badRows.value.foreach{str =>
//look at a bit more info from each string ... print out length & each
character one by one
println(str)
println(str.length)
str.foreach{println}
println()
  }
}

// if it is some data corruption, that you just have to live with, you
might leave the flatMap / try
// even when you'e running it for real.  But then you might want to add a
little check that there aren't
// t many bad rows.  Note that the accumulator[Set] will run out of
mem if there are really
// a ton of bad rows, in which case you might switch to a reservoir sample

val badFrac = nBadRows.value / (nGoodRows.value + nBadRows.value.toDouble)
println(s"${nBadRows.value} bad rows; ${nGoodRows.value} good rows;
($badFrac) bad fraction")
if (badFrac > maxAllowedBadRows) {
  throw new RuntimeException("too many bad rows! " + badFrac)
}




On Mon, Dec 15, 2014 at 3:49 PM, yu  wrote:
>
> Hello, everyone
>
> I know 'NumberFormatException' is due to the reason that String can not be
> parsed properly, but I really can not find any mistakes for my code. I hope
> someone may kindly help me.
> My hdfs file is as follows:
> 8,22
> 3,11
> 40,10
> 49,47
> 48,29
> 24,28
> 50,30
> 33,56
> 4,20
> 30,38
> ...
>
> So each line contains an integer + "," + an integer + "\n"
> My code is as follows:
> object StreamMonitor {
>   def main(args: Array[String]): Unit = {
> val myFunc = (str: String) => {
>   val strArray = str.trim().split(",")
>   (strArray(0).toInt, strArray(1).toInt)
> }
> val conf = new SparkConf().setAppName("StreamMonitor");
> val ssc = new StreamingContext(conf, Seconds(30));
> val datastream = ssc.textFileStream("/user/yu/streaminput");
> val newstream = datastream.map(myFunc)
> newstream.saveAsTextFiles("output/", "");
> ssc.start()
> ssc.awaitTermination()
>   }
>
> }
>
> The exception info is:
> 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
> (TID 0, h3): java.lang.NumberFormatException: For input string: "8"
>
>
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> java.lang.Integer.parseInt(Integer.java:492)
> java.lang.Integer.parseInt(Integer.java:527)
>
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9)
> StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
>
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
>
> So based on the above info, "8" is the first number in the file and I think
> it should be parsed to integer without any problems.
> I know it may be a very stupid question and the answer may be very easy.
> But
> I really can not find the reason. I am thankful to anyone who helps!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Control default partition when load a RDD from HDFS

2014-12-16 Thread Shuai Zheng
Hi All,

 

My application load 1000 files, each file from 200M -  a few GB, and combine
with other data to do calculation. 

Some pre-calculation must be done on each file level, then after that, the
result need to combine to do further calculation. 

In Hadoop, it is simple because I can turn-off the file split for input
format (to enforce each file will go to same mapper), then I will do the
file level calculation in mapper and pass result to reducer. But in spark,
how can I do it?

Basically I want to make sure after I load these files into RDD, it is
partitioned by file (not split file and also no merge there), so I can call
mapPartitions. Is it any way I can control the default partition when I load
the RDD? 

This might be the default behavior that spark do the partition (partitioned
by file when first time load the RDD), but I can't find any document to
support my guess, if not, can I enforce this kind of partition? Because the
total file size is bigger, I don't want to re-partition in the code. 

 

Regards,

 

Shuai



Re: Data Loss - Spark streaming

2014-12-16 Thread Ryan Williams
TD's portion seems to start at 27:24: http://youtu.be/jcJq3ZalXD8?t=27m24s

On Tue Dec 16 2014 at 7:13:43 AM Gerard Maas  wrote:

> Hi Jeniba,
>
> The second part of this meetup recording has a very good answer to your
> question.  TD explains the current behavior and the on-going work in Spark
> Streaming to fix HA.
> https://www.youtube.com/watch?v=jcJq3ZalXD8
>
>
> -kr, Gerard.
>
> On Tue, Dec 16, 2014 at 11:32 AM, Jeniba Johnson <
> jeniba.john...@lntinfotech.com> wrote:
>>
>> Hi,
>>
>> I need a clarification, while running streaming examples, suppose the
>> batch interval is set to 5 minutes, after collecting the data from the
>> input source(FLUME) and  processing till 5 minutes.
>> What will happen to the data which is flowing continuously from the input
>> source to spark streaming ? Will that data be stored somewhere or else the
>> data will be lost ?
>> Or else what is the solution to capture each and every data without any
>> loss in Spark streaming.
>>
>> Awaiting for your kind reply.
>>
>>
>> Regards,
>> Jeniba Johnson
>>
>>
>> 
>> The contents of this e-mail and any attachment(s) may contain
>> confidential or privileged information for the intended recipient(s).
>> Unintended recipients are prohibited from taking action on the basis of
>> information in this e-mail and using or disseminating the information, and
>> must notify the sender and delete it from their system. L&T Infotech will
>> not accept responsibility or liability for the accuracy or completeness of,
>> or the presence of any virus or disabling code in this e-mail"
>>
>


Re: Error when Applying schema to a dictionary with a Tuple as key

2014-12-16 Thread Davies Liu
It's a bug, could you file a JIRA for this? thanks!

On Tue, Dec 16, 2014 at 5:49 AM, sahanbull  wrote:
>
> Hi Guys,
>
> Im running a spark cluster in AWS with Spark 1.1.0 in EC2
>
> I am trying to convert a an RDD with tuple
>
> (u'string', int , {(int, int): int, (int, int): int})
>
> to a schema rdd using the schema:
>
> fields = [StructField('field1',StringType(),True),
> StructField('field2',IntegerType(),True),
>
> StructField('field3',MapType(StructType([StructField('field31',IntegerType(),True),
> 
> StructField('field32',IntegerType(),True)]),IntegerType(),True),True)
> ]
>
> schema = StructType(fields)
> # generate the schemaRDD with the defined schema
> schemaRDD = sqc.applySchema(RDD, schema)
>
> But when I add "field3" to the schema, it throws an execption:
>
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/root/spark/python/pyspark/rdd.py", line 1153, in take
> res = self.context.runJob(self, takeUpToNumLeft, p, True)
>   File "/root/spark/python/pyspark/context.py", line 770, in runJob
> it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
> javaPartitions, allowLocal)
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 28.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 28.0 (TID 710, ip-172-31-29-120.ec2.internal):
> net.razorvine.pickle.PickleException: couldn't introspect javabean:
> java.lang.IllegalArgumentException: wrong number of arguments
> net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
> net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
> net.razorvine.pickle.Pickler.save(Pickler.java:125)
> net.razorvine.pickle.Pickler.put_map(Pickler.java:321)
> net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
> net.razorvine.pickle.Pickler.save(Pickler.java:125)
> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:412)
> net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
> net.razorvine.pickle.Pickler.save(Pickler.java:125)
> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:412)
> net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
> net.razorvine.pickle.Pickler.save(Pickler.java:125)
> net.razorvine.pickle.Pickler.dump(Pickler.java:95)
> net.razorvine.pickle.Pickler.dumps(Pickler.java:80)
>
> org.apache.spark.sql.SchemaRDD$$anonfun$javaToPython$1$$anonfun$apply$2.apply(SchemaRDD.scala:417)
>
> org.apache.spark.sql.SchemaRDD$$anonfun$javaToPython$1$$anonfun$apply$2.apply(SchemaRDD.scala:417)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:331)
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.r

Spark eating exceptions in multi-threaded local mode

2014-12-16 Thread Corey Nolet
I've been running a job in local mode using --master local[*] and I've
noticed that, for some reason, exceptions appear to get eaten- as in, I
don't see them. If i debug in my IDE, I'll see that an exception was thrown
if I step through the code but if I just run the application, it appears
everything completed but i know a bunch of my jobs did not actually ever
get run.

The exception is happening in a map stage.

Is there a special way this is supposed to be handled? Am I missing a
property somewhere that allows these to be bubbled up?


Re: Error when Applying schema to a dictionary with a Tuple as key

2014-12-16 Thread Davies Liu
I had created https://issues.apache.org/jira/browse/SPARK-4866, it
will be fixed by https://github.com/apache/spark/pull/3714.

Thank you for reporting this.

Davies

On Tue, Dec 16, 2014 at 12:44 PM, Davies Liu  wrote:
> It's a bug, could you file a JIRA for this? thanks!
>
> On Tue, Dec 16, 2014 at 5:49 AM, sahanbull  wrote:
>>
>> Hi Guys,
>>
>> Im running a spark cluster in AWS with Spark 1.1.0 in EC2
>>
>> I am trying to convert a an RDD with tuple
>>
>> (u'string', int , {(int, int): int, (int, int): int})
>>
>> to a schema rdd using the schema:
>>
>> fields = [StructField('field1',StringType(),True),
>> StructField('field2',IntegerType(),True),
>>
>> StructField('field3',MapType(StructType([StructField('field31',IntegerType(),True),
>> 
>> StructField('field32',IntegerType(),True)]),IntegerType(),True),True)
>> ]
>>
>> schema = StructType(fields)
>> # generate the schemaRDD with the defined schema
>> schemaRDD = sqc.applySchema(RDD, schema)
>>
>> But when I add "field3" to the schema, it throws an execption:
>>
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File "/root/spark/python/pyspark/rdd.py", line 1153, in take
>> res = self.context.runJob(self, takeUpToNumLeft, p, True)
>>   File "/root/spark/python/pyspark/context.py", line 770, in runJob
>> it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
>> javaPartitions, allowLocal)
>>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
>> 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.runJob.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 28.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 28.0 (TID 710, ip-172-31-29-120.ec2.internal):
>> net.razorvine.pickle.PickleException: couldn't introspect javabean:
>> java.lang.IllegalArgumentException: wrong number of arguments
>> net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
>> net.razorvine.pickle.Pickler.save(Pickler.java:125)
>> net.razorvine.pickle.Pickler.put_map(Pickler.java:321)
>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
>> net.razorvine.pickle.Pickler.save(Pickler.java:125)
>> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:412)
>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
>> net.razorvine.pickle.Pickler.save(Pickler.java:125)
>> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:412)
>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
>> net.razorvine.pickle.Pickler.save(Pickler.java:125)
>> net.razorvine.pickle.Pickler.dump(Pickler.java:95)
>> net.razorvine.pickle.Pickler.dumps(Pickler.java:80)
>>
>> org.apache.spark.sql.SchemaRDD$$anonfun$javaToPython$1$$anonfun$apply$2.apply(SchemaRDD.scala:417)
>>
>> org.apache.spark.sql.SchemaRDD$$anonfun$javaToPython$1$$anonfun$apply$2.apply(SchemaRDD.scala:417)
>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>
>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:331)
>>
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
>>
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>>
>> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
>>
>> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
>> Driver stacktrace:
>> at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
>> at
>> org.apache.spark.scheduler.DAG

Re: Spark handling of a file://xxxx.gz Uri

2014-12-16 Thread Harry Brundage
Are you certain that's happening Jim? Why? What happens if you just do
sc.textFile(fileUri).count() ? If I'm not mistaken the Hadoop InputFormat
for gzip and the RDD wrapper around it already has the "streaming"
behaviour you wish for. but I could be wrong. Also, are you in pyspark or
scala Spark?

On Tue, Dec 16, 2014 at 1:22 PM, Jim Carroll  wrote:
>
> Is there a way to get Spark to NOT reparition/shuffle/expand a
> sc.textFile(fileUri) when the URI is a gzipped file?
>
> Expanding a gzipped file should be thought of as a "transformation" and not
> an "action" (if the analogy is apt). There is no need to fully create and
> fill out an intermediate RDD with the expanded data when it can be done one
> row at a time.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: pyspark exception catch

2014-12-16 Thread cfregly
hey igor!

a few ways to work around this depending on the level of exception-handling
granularity you're willing to accept:
1) use mapPartitions() to wrap the entire partition handling code in a
try/catch -- this is fairly coarse-grained, however, and will fail the
entire partition.
2) modify your transformation code to wrap a try-catch around the individual
record handler -- return either "None" (or some other well-known empty
value) for input records that fail and the actual value for records that
succeed.  use a filter() to filter out the "None" values.
3) same as #2, but use empty array for a failure and a single-element array
for a success.

hope that helps!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-exception-catch-tp20483p20730.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark 1.1.1 error with large number of records - serializer.dump_stream(func(split_index, iterator), outfile)

2014-12-16 Thread Sebastián Ramírez
Your Spark is trying to load a hadoop library "winutils.exe", which you
don't have in your Windows:

14/12/16 12:48:28 ERROR Shell: Failed to locate the winutils binary in the
hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in
the Hadoop binaries.
at org.apache.hadoop.util.Shell.
getQualifiedBinPath(Shell.java:318)
...


It's a known bug: https://issues.apache.org/jira/browse/SPARK-2356

That issue references this email thread:
http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7

And that email thread references this blog post:
https://social.msdn.microsoft.com/forums/azure/en-US/28a57efb-082b-424b-8d9e-731b1fe135de/please-read-if-experiencing-job-failures?forum=hdinsight


I had the same problem before, you may temporarily solve it by using the
distribution from the summit:
https://databricks-training.s3.amazonaws.com/getting-started.html
Or you may want to try that other solution.

...but in my case, I ended up running Spark from a Linux machine in a VM
after I got other errors.
I have the impression that development for Windows is not currently a big
priority, since the bug is from version 1.0...

I hope that helps.

Best,


*Sebastián Ramírez*
Diseñador de Algoritmos

 

 Tel: (+571) 795 7950 ext: 1012
 Cel: (+57) 300 370 77 10
 Calle 73 No 7 - 06  Piso 4
 Linkedin: co.linkedin.com/in/tiangolo/
 Twitter: @tiangolo 
 Email: sebastian.rami...@senseta.com
 www.senseta.com

On Tue, Dec 16, 2014 at 8:04 AM, mj  wrote:
>
> I've got a simple pyspark program that generates two CSV files and then
> carries out a leftOuterJoin (a fact RDD joined to a dimension RDD). The
> program works fine for smaller volumes of records, but when it goes beyond
> 3
> million records for the fact dataset, I get the error below. I'm running
> PySpark via PyCharm and the information for my environment is:
>
> OS: Windows 7
> Python version: 2.7.9
> Spark version: 1.1.1
> Java version: 1.8
>
> I've also included the py file I am using. I'd appreciate any help you can
> give me,
>
> MJ.
>
>
> ERROR MESSAGE
> C:\Python27\python.exe "C:/Users/Mark
> Jones/PycharmProjects/spark_test/spark_error_sample.py"
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 14/12/16 12:48:26 INFO SecurityManager: Changing view acls to: Mark Jones,
> 14/12/16 12:48:26 INFO SecurityManager: Changing modify acls to: Mark
> Jones,
> 14/12/16 12:48:26 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(Mark Jones, );
> users with modify permissions: Set(Mark Jones, )
> 14/12/16 12:48:26 INFO Slf4jLogger: Slf4jLogger started
> 14/12/16 12:48:27 INFO Remoting: Starting remoting
> 14/12/16 12:48:27 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@192.168.19.83:51387]
> 14/12/16 12:48:27 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://sparkDriver@192.168.19.83:51387]
> 14/12/16 12:48:27 INFO Utils: Successfully started service 'sparkDriver' on
> port 51387.
> 14/12/16 12:48:27 INFO SparkEnv: Registering MapOutputTracker
> 14/12/16 12:48:27 INFO SparkEnv: Registering BlockManagerMaster
> 14/12/16 12:48:27 INFO DiskBlockManager: Created local directory at
> C:\Users\MARKJO~1\AppData\Local\Temp\spark-local-20141216124827-11ef
> 14/12/16 12:48:27 INFO Utils: Successfully started service 'Connection
> manager for block manager' on port 51390.
> 14/12/16 12:48:27 INFO ConnectionManager: Bound socket to port 51390 with
> id
> = ConnectionManagerId(192.168.19.83,51390)
> 14/12/16 12:48:27 INFO MemoryStore: MemoryStore started with capacity 265.1
> MB
> 14/12/16 12:48:27 INFO BlockManagerMaster: Trying to register BlockManager
> 14/12/16 12:48:27 INFO BlockManagerMasterActor: Registering block manager
> 192.168.19.83:51390 with 265.1 MB RAM
> 14/12/16 12:48:27 INFO BlockManagerMaster: Registered BlockManager
> 14/12/16 12:48:27 INFO HttpFileServer: HTTP File server directory is
>
> C:\Users\MARKJO~1\AppData\Local\Temp\spark-3b772ca1-dbf7-4eaa-b62c-be5e73036f5d
> 14/12/16 12:48:27 INFO HttpServer: Starting HTTP Server
> 14/12/16 12:48:27 INFO Utils: Successfully started service 'HTTP file
> server' on port 51391.
> 14/12/16 12:48:27 INFO Utils: Successfully started service 'SparkUI' on
> port
> 4040.
> 14/12/16 12:48:27 INFO SparkUI: Started SparkUI at
> http://192.168.19.83:4040
> 14/12/16 12:48:27 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/12/16 12:48:28 ERROR Shell: Failed to locate the winutils binary in the
> hadoop binary path
> java.io.IOException: Could not locate executable null\bin\winutils.exe in
> the Hadoop binaries.
> at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
> at org.apache.hadoop.util.Shell.

Re: Spark handling of a file://xxxx.gz Uri

2014-12-16 Thread Jim


Hi Harry,

Thanks for your response.

I'm working in scala. When I do a "count" call it expands the RDD in the 
count (since it's an action). You can see the call stack that results in 
the failure of the job here:


 ERROR DiskBlockObjectWriter - Uncaught exception while reverting 
partial writes to file 
/tmp/spark-local-20141216170458-964a/1d/temp_shuffle_4f46af09-5521-4fc6-adb1-c72839520560

java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86)
at 
org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)

at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at 
org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:263)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)

at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:718)
at 
org.apache.spark.serializer.JavaSerializationStream.flush(JavaSerializer.scala:51)
at 
org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala:173)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:774)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:773)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.util.collection.ExternalSorter.stop(ExternalSorter.scala:773)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.stop(SortShuffleWriter.scala:93)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Notice the task run (this is now doing a "count") results in a Shuffle 
during which it writes the intermediate RDD to disk (and fails when the 
disk is full). This intermediate RDD/disk write is unnecessary.


I even implemented a "Seq[String]" in terms of streaming the file and 
called sc.parallelize(mySequence,1) and THIS results in a call to 
"toArray" on my sequence. Since this wont fit on disk it certainly wont 
fit in an array in memory.


Thanks for taking the time to respond.

Jim

On 12/16/2014 04:57 PM, Harry Brundage wrote:
Are you certain that's happening Jim? Why? What happens if you just do 
sc.textFile(fileUri).count() ? If I'm not mistaken the Hadoop 
InputFormat for gzip and the RDD wrapper around it already has the 
"streaming" behaviour you wish for. but I could be wrong. Also, are 
you in pyspark or scala Spark?


On Tue, Dec 16, 2014 at 1:22 PM, Jim Carroll > wrote:


Is there a way to get Spark to NOT reparition/shuffle/expand a
sc.textFile(fileUri) when the URI is a gzipped file?

Expanding a gzipped file should be thought of as a
"transformation" and not
an "action" (if the analogy is apt). There is no need to fully
create and
fill out an intermediate RDD with the expanded data when it can be
done one
row at a time.




--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org






Re: pyspark sc.textFile uses only 4 out of 32 threads per node

2014-12-16 Thread Sebastián Ramírez
Are you reading the file from your driver (main / master) program?

Is your file in a distributed system like HDFS? available to all your nodes?

It might be due to the laziness of transformations:
http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations

"Transformations" are lazy, and aren't applied until they are needed by an
"action" (and, to me, it happend for readings too some time ago).
You can try calling a .first() in your RDD from once in a while to force it
to load the RDD to your cluster (but it might not be the cleanest way to do
it).


*Sebastián Ramírez*
Diseñador de Algoritmos

 

 Tel: (+571) 795 7950 ext: 1012
 Cel: (+57) 300 370 77 10
 Calle 73 No 7 - 06  Piso 4
 Linkedin: co.linkedin.com/in/tiangolo/
 Twitter: @tiangolo 
 Email: sebastian.rami...@senseta.com
 www.senseta.com

On Tue, Dec 9, 2014 at 1:59 PM, Gautham  wrote:
>
> I am having an issue with pyspark launched in ec2 (using spark-ec2) with 5
> r3.4xlarge machines where each has 32 threads and 240GB of RAM. When I do
> sc.textFile to load data from a number of gz files, it does not progress as
> fast as expected. When I log-in to a child node and run top, I see only 4
> threads at 100 cpu. All remaining 28 cores were idle. This is not an issue
> when processing the strings after loading, when all the cores are used to
> process the data.
>
> Please help me with this? What setting can be changed to get the CPU usage
> back up to full?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-sc-textFile-uses-only-4-out-of-32-threads-per-node-tp20595.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-- 
**
*This e-mail transmission, including any attachments, is intended only for 
the named recipient(s) and may contain information that is privileged, 
confidential and/or exempt from disclosure under applicable law. If you 
have received this transmission in error, or are not the named 
recipient(s), please notify Senseta immediately by return e-mail and 
permanently delete this transmission, including any attachments.*


S3 globbing

2014-12-16 Thread durga
Hi All,

I need help with regex in my sc.textFile()

I have lots of files with with epoch millisecond timestamp.

ex:abc_1418759383723.json

Now I need to consume last one hour files using the epoch time stamp as
mentioned above.

I tried couple of options , nothing seems working for me.

If any one of you face this issue and got a solution , please help me. 

Appreciating your help,

Thanks,
D





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3-globbing-tp20731.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How do I stop the automatic partitioning of my RDD?

2014-12-16 Thread Jim Carroll

I've been trying to figure out how to use Spark to do a simple aggregation
without reparitioning and essentially creating fully instantiated
intermediate RDDs and it seem virtually impossible.

I've now gone as far as writing my own single parition RDD that wraps an
Iterator[String] and calling aggregate() on it. Before any of my aggregation
code executes the entire Iterator is unwound and multiple partitions are
created to be given to my aggregation.

The Task execution call stack includes:
   ShuffleMap.runTask
   SortShuffleWriter.write
   ExternalSorter.insertAll
  ... which is iterating over my entire RDD and repartitioning it an
SpillFile collecting it. 

How do I prevent this from happening? There's no need to do this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stop-the-automatic-partitioning-of-my-RDD-tp20732.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I stop the automatic partitioning of my RDD?

2014-12-16 Thread Jim Carroll
Wow. i just realized what was happening and it's all my fault. I have a
library method that I wrote that presents the RDD and I was actually
repartitioning it myself.

I feel pretty dumb. Sorry about that.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-stop-the-automatic-partitioning-of-my-RDD-tp20732p20735.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



"toArray","first" get the different result from one element RDD

2014-12-16 Thread buring
Hi
Recently I have some problems about rdd behaviors.It's about
"RDD.first","RDD.toArray" method when RDD only has one element. 
I get the different result in different method from one element RDD 
where i
should have the same result. I will give more detail after the code.
My code was as follows:
//get and rdd with just one row ,RDD[(Long,Array[Byte])]
val alsresult =
sc.sequenceFile(args(0)+"/als",classOf[LongWritable],classOf[BytesWritable]).map{case(uid,sessions)=>
  sessions.setCapacity(sessions.getLength)
  (uid.get(),sessions.getBytes)
}.filter{line=>
  line._1 == userindex.value //specified from arguments
}
//log information really surprised me
logger.info("alsInformation:%d".format(alsresult.count()))
   
alsresult.toArray().foreach(e=>logger.info("alstoarray:%d\t%s".format(e._1,e._2.mkString("
"
   
alsresult.take(1).foreach(e=>logger.info("take1result:%d\t%s".format(e._1,e._2.mkString("
"
   
logger.info("firstInformation:%d\t%s".format(alsresult.first()._1,alsresult.first()._2.mkString("
")))
   
alsresult.collect().foreach(e=>logger.info("alscollectresult:%d\t%s".format(e._1,e._2.mkString("
"
   
alsresult.take(3).foreach(e=>logger.info("alstake3result:%d\t%s".format(e._1,e._2.mkString("
" //3 is big than the rdd.count()

I get a RDD which just have one element. But use the different method ,I
got the different element. My print information as follows:

argument userindex  33057172
281168553814772 
3209314
alsInformation  1   
1   
1   1 
alstoarray  1612242 0 22 47 37 6 19...  3337442 16 32 0 
22 13 49... 
3697319 16 32 0 22 13 49...  3   0 22 47 37 6 19...
take1result 1612242 21 24 3 56 16 27... 3337442 16 52 
31 42 29 36 ...
3697319 39 21 34 56 3 37...  3   34 10 18 28 38 11...
firstInformation1612242 21 24 3 56 16 27... 3337442 16 52 31 42 29 
36 ...
3697319 39 21 34 56 3 37...  3   34 10 18 28 38 11...
alscollectresult1612242 0 22 47 37 6 19...  3337442 16 32 0 22 13 
49... 
3697319 16 32 0 22 13 49...  3   0 22 47 37 6 19...
alstake3result  1612242 0 22 47 37 6 19...  3337442 16 32 0 
22 13 49... 
3697319 16 32 0 22 13 49...  3   0 22 47 37 6 19...
 I filter the rdd and guarantee the RDD.count() equal 1.,I think different
"userindex.value"arguments should get different alsresult ,
but "RDD.toArray","RDD.collect","RDD.take(3)" ,have the same result and
under the same argument "toArray" ,"take(1)","take(3)" 
have the different resultmethod ,It's really surpurised me. The arguments is
random specified.

Can anyone explain it or give me some reference?

Thanks 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/toArray-first-get-the-different-result-from-one-element-RDD-tp20734.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Executor memory

2014-12-16 Thread Pala M Muthaia
Thanks for the clarifications. I misunderstood what the number on UI meant.

On Mon, Dec 15, 2014 at 7:00 PM, Sean Owen  wrote:

> I believe this corresponds to the 0.6 of the whole heap that is
> allocated for caching partitions. See spark.storage.memoryFraction on
> http://spark.apache.org/docs/latest/configuration.html 0.6 of 4GB is
> about 2.3GB.
>
> The note there is important, that you probably don't want to exceed
> the JVM old generation size with this parameter.
>
> On Tue, Dec 16, 2014 at 12:53 AM, Pala M Muthaia
>  wrote:
> > Hi,
> >
> > Running Spark 1.0.1 on Yarn 2.5
> >
> > When i specify --executor-memory 4g, the spark UI shows each executor as
> > having only 2.3 GB, and similarly for 8g, only 4.6 GB.
> >
> > I am guessing that the executor memory corresponds to the container
> memory,
> > and that the task JVM gets only a percentage of the container total
> memory.
> > Is there a yarn or spark parameter to tune this so that my task JVM
> actually
> > gets 6GB out of the 8GB for example?
> >
> >
> > Thanks.
> >
> >
>


when will the spark 1.3.0 be released?

2014-12-16 Thread 张建轶
Hi !

when will the spark 1.3.0 be released?
I want to use new LDA feature.
Thank you!

Re: when will the spark 1.3.0 be released?

2014-12-16 Thread Marco Shaw
When it is ready. 



> On Dec 16, 2014, at 11:43 PM, 张建轶  wrote:
> 
> Hi £¡
> 
> when will the spark 1.3.0 be released£¿
> I want to use new LDA feature.
> Thank 
> you!B‹CB•È[œÝXœØÜšX™KK[XZ[ˆ\Ù\‹][œÝXœØÜšX™PÜ\šË˜\XÚK›Ü™ÃB‘›ÜˆY][Û˜[ÛÛ[X[™ËK[XZ[ˆ\Ù\‹Z[Ü\šË˜\XÚK›Ü™ÃBƒB

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDDs being cleaned too fast

2014-12-16 Thread Harihar Nahak
RDD.persist() can be useful here.

On 11 December 2014 at 14:34, ankits [via Apache Spark User List] <
ml-node+s1001560n20613...@n3.nabble.com> wrote:
>
> I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too
> fast. How can i inspect the size of RDD in memory and get more information
> about why it was cleaned up. There should be more than enough memory
> available on the cluster to store them, and by default, the
> spark.cleaner.ttl is infinite, so I want more information about why this is
> happening and how to prevent it.
>
> Spark just logs this when removing RDDs:
>
> [2014-12-11 01:19:34,006] INFO  spark.storage.BlockManager [] [] -
> Removing RDD 33
> [2014-12-11 01:19:34,010] INFO  pache.spark.ContextCleaner []
> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
> [2014-12-11 01:19:34,012] INFO  spark.storage.BlockManager [] [] -
> Removing RDD 33
> [2014-12-11 01:19:34,016] INFO  pache.spark.ContextCleaner []
> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>


-- 
Regards,
Harihar Nahak
BigData Developer
Wynyard
Email:hna...@wynyardgroup.com | Extn: 8019




-
--Harihar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613p20738.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

答复: 答复: Fetch Failed caused job failed.

2014-12-16 Thread Ma,Xi
Actually there was still Fetch failure. However, after I upgrade the spark to 
1.1.1, this error was not met again.

Thanks,
Mars


发件人: Akhil Das [mailto:ak...@sigmoidanalytics.com]
发送时间: 2014年12月16日 17:52
收件人: Ma,Xi
抄送: u...@spark.incubator.apache.org
主题: Re: 答复: Fetch Failed caused job failed.

So the fetch failure error is gone? Can you paste the code that you are 
executing? What is the size of the data and your cluster setup?

Thanks
Best Regards

On Tue, Dec 16, 2014 at 3:16 PM, Ma,Xi mailto:m...@baidu.com>> 
wrote:
Hi Das,

Thanks for your advice.

I'm not sure what's the usage of setting memoryFraction to 1. I've tried to 
rerun the test again with the following parameters in spark_default.conf, but 
failed again:

spark.rdd.compress  true
spark.akka.frameSize  50
spark.storage.memoryFraction 0.8
spark.core.connection.ack.wait.timeout 6000

14/12/16 16:45:08 ERROR PythonRDD: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/spark/spark-1.1/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 146, in 
_read_with_length
length = read_int(stream)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 464, in 
read_int
raise EOFError
EOFError
 at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
 at 
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)

I suspect that there something wrong in shuffle stage, but not sure what's the 
error ?

Thanks,

Mars


发件人: Akhil Das 
[mailto:ak...@sigmoidanalytics.com]
发送时间: 2014年12月16日 14:57
收件人: Ma,Xi
抄送: u...@spark.incubator.apache.org
主题: Re: Fetch Failed caused job failed.

You could try setting the following while creating the sparkContext


  .set("spark.rdd.compress","true")

  .set("spark.storage.memoryFraction","1")

  .set("spark.core.connection.ack.wait.timeout","600")

  .set("spark.akka.frameSize","50")


Thanks
Best Regards

On Tue, Dec 16, 2014 at 8:30 AM, Mars Max 
mailto:m...@baidu.com>> wrote:
While I was running spark MR job, there was FetchFailed(BlockManagerId(47,
xx.com, 40975, 0), shuffleId=2, mapId=5, 
reduceId=286), then there
were many retries, and the job failed finally.

And the log showed the following error, does anybody meet this error ? or is
it a known issue in Spark ? Thanks.

4/12/16 10:43:43 ERROR PythonRDD: Python worker exited unexpectedly
(crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/home/spark/spark-1.1/python/pyspark/worker.py", line 75, in main
command = pickleSer._read_with_length(infile)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 146, in
_read_with_length
length = read_int(stream)
  File "/home/spark/spark-1.1/python/pyspark/serializers.py", line 464, in
read_int
raise EOFError
EOFError

at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
at
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:154)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:265)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.apache.spark.shuffle.FetchFailedException: Fetch failed:
Bloc

Spark SQL DSL for joins?

2014-12-16 Thread Jerry Raj

Hi,
I'm using the Scala DSL for Spark SQL, but I'm not able to do joins. I 
have two tables (backed by Parquet files) and I need to do a join across 
them using a common field (user_id). This works fine using standard SQL 
but not using the language-integrated DSL neither


t1.join(t2, on = 't1.user_id == t2.user_id)

nor

t1.join(t2, on = Some('t1.user_id == t2.user_id))

work, or even compile. I could not find any examples of how to perform a 
join using the DSL. Any pointers will be appreciated :)


Thanks
-Jerry

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: when will the spark 1.3.0 be released?

2014-12-16 Thread Andrew Ash
Releases are roughly every 3mo so you should expect around March if the
pace stays steady.

2014-12-16 22:56 GMT-05:00 Marco Shaw :
>
> When it is ready.
>
>
>
> > On Dec 16, 2014, at 11:43 PM, 张建轶  wrote:
> >
> > Hi £¡
> >
> > when will the spark 1.3.0 be released£¿
> > I want to use new LDA feature.
> > Thank you!
> B‹CB• È
> [œÝXœØÜšX™K  K[XZ[ ˆ \Ù\‹][œÝXœØÜšX™P Ü \šË˜\ XÚ K›Ü™ÃB‘›Üˆ Y  ] [Û˜[
> ÛÛ[X[™ Ë  K[XZ[ ˆ \Ù\‹Z [   Ü \šË˜\ XÚ K›Ü™ÃBƒB
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Running Spark Job on Yarn from Java Code

2014-12-16 Thread Kyle Lin
Hi there

I also got exception when running PI example on YARN

Spark version: spark-1.1.1-bin-hadoop2.4

My environment: Hortonworks HDP 2.2

My command:
./bin/spark-submit --master yarn-cluster --class
org.apache.spark.examples.SparkPi lib/spark-examples*.jar 10

Output logs:
14/12/17 14:06:32 INFO yarn.Client: Application report from ResourceManager:
 application identifier: application_1418282263779_0024
 appId: 24
 clientToAMToken: null
 appDiagnostics: Application application_1418282263779_0024 failed 2 times
due to AM Container for appattempt_1418282263779_0024_02 exited with
 exitCode: 1
For more detailed output, check application tracking page:
http://sparkvm.localdomain:8088/proxy/application_1418282263779_0024/Then,
click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1418282263779_0024_02_01
Exit code: 1
Exception message:
/hadoop/yarn/local/usercache/root/appcache/application_1418282263779_0024/container_1418282263779_0024_02_01/launch_container.sh:
line 27:
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:$PWD/__app__.jar:$PWD/*:
bad substitution

Stack trace: ExitCodeException exitCode=1:
/hadoop/yarn/local/usercache/root/appcache/application_1418282263779_0024/container_1418282263779_0024_02_01/launch_container.sh:
line 27:
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:$PWD/__app__.jar:$PWD/*:
bad substitution

at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
 appMasterHost: N/A
 appQueue: default
 appMasterRpcPort: -1
 appStartTime: 1418796384166
 yarnAppState: FAILED
 distributedFinalState: FAILED
 appTrackingUrl:
http://sparkvm.localdomain:8088/cluster/app/application_1418282263779_0024
 appUser: root
Exception in thread "main" org.apache.spark.SparkException: Application is
not successful
at org.apache.spark.deploy.yarn.Client.run(Client.scala:98)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:176)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Kyle


2014-12-17 2:45 GMT+08:00 Rahul Swaminathan :
>
>  Hi all,
>
>  I am trying to run a simple Spark_Pi application through Yarn from Java
> code. I have the Spark_Pi class and everything works fine if I run on
> Spark. However, when I set mast

RE: Control default partition when load a RDD from HDFS

2014-12-16 Thread Sun, Rui
Hi, Shuai,

How did you turn off the file split in Hadoop? I guess you might have 
implemented a customized FileInputFormat which overrides isSplitable() to 
return FALSE. If you do have such FileInputFormat, you can simply pass it as a 
constructor parameter to HadoopRDD or NewHadoopRDD in Spark.

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Wednesday, December 17, 2014 4:16 AM
To: user@spark.apache.org
Subject: Control default partition when load a RDD from HDFS

Hi All,

My application load 1000 files, each file from 200M -  a few GB, and combine 
with other data to do calculation.
Some pre-calculation must be done on each file level, then after that, the 
result need to combine to do further calculation.
In Hadoop, it is simple because I can turn-off the file split for input format 
(to enforce each file will go to same mapper), then I will do the file level 
calculation in mapper and pass result to reducer. But in spark, how can I do it?
Basically I want to make sure after I load these files into RDD, it is 
partitioned by file (not split file and also no merge there), so I can call 
mapPartitions. Is it any way I can control the default partition when I load 
the RDD?
This might be the default behavior that spark do the partition (partitioned by 
file when first time load the RDD), but I can't find any document to support my 
guess, if not, can I enforce this kind of partition? Because the total file 
size is bigger, I don't want to re-partition in the code.

Regards,

Shuai


Re: Spark SQL DSL for joins?

2014-12-16 Thread Jerry Raj

Another problem with the DSL:

t1.where('term == "dmin").count() returns zero. But
sqlCtx.sql("select * from t1 where term = 'dmin').count() returns 700, 
which I know is correct from the data. Is there something wrong with how 
I'm using the DSL?


Thanks


On 17/12/14 11:13 am, Jerry Raj wrote:

Hi,
I'm using the Scala DSL for Spark SQL, but I'm not able to do joins. I
have two tables (backed by Parquet files) and I need to do a join across
them using a common field (user_id). This works fine using standard SQL
but not using the language-integrated DSL neither

t1.join(t2, on = 't1.user_id == t2.user_id)

nor

t1.join(t2, on = Some('t1.user_id == t2.user_id))

work, or even compile. I could not find any examples of how to perform a
join using the DSL. Any pointers will be appreciated :)

Thanks
-Jerry

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: pyspark sc.textFile uses only 4 out of 32 threads per node

2014-12-16 Thread Sun, Rui
Gautham,

How many number of gz files do you have?  Maybe the reason is that gz file is 
compressed that can't be splitted for processing by Mapreduce. A  single gz  
file can only be processed by a single Mapper so that the CPU treads can't be 
fully utilized.

-Original Message-
From: Gautham [mailto:gautham.a...@gmail.com] 
Sent: Wednesday, December 10, 2014 3:00 AM
To: u...@spark.incubator.apache.org
Subject: pyspark sc.textFile uses only 4 out of 32 threads per node

I am having an issue with pyspark launched in ec2 (using spark-ec2) with 5 
r3.4xlarge machines where each has 32 threads and 240GB of RAM. When I do 
sc.textFile to load data from a number of gz files, it does not progress as 
fast as expected. When I log-in to a child node and run top, I see only 4 
threads at 100 cpu. All remaining 28 cores were idle. This is not an issue when 
processing the strings after loading, when all the cores are used to process 
the data.

Please help me with this? What setting can be changed to get the CPU usage back 
up to full?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-sc-textFile-uses-only-4-out-of-32-threads-per-node-tp20595.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Rolling upgrade Spark cluster

2014-12-16 Thread Kenichi Maehashi
Hi,

I have a Spark cluster using standalone mode. Spark Master is
configured as High Availablity mode.
Now I am going to upgrade Spark from 1.0 to 1.1, but don't want to
interrupt the currently running jobs.

(1) Are there any way to perform a rolling upgrade (while running a job)?
(2) If not, when using YARN as a cluster manager, can I perform a
rolling upgrade?

Thanks,

Kenichi

-- 
Kenichi Maehashi 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL DSL for joins?

2014-12-16 Thread Tobias Pfeiffer
Jerry,

On Wed, Dec 17, 2014 at 3:35 PM, Jerry Raj  wrote:
>
> Another problem with the DSL:
>
> t1.where('term == "dmin").count() returns zero.


Looks like you need ===:
https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD

Tobias


Re: "toArray","first" get the different result from one element RDD

2014-12-16 Thread buring
I get the key point . The problem is in sc.sequenceFile,From API description
"RDD will create many references to the same objecty" ,So I revise the code 
"sessions.getBytes" to "sessions.getBytes.clone",
It seems to work. 
Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/toArray-first-get-the-different-result-from-one-element-RDD-tp20734p20739.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3 globbing

2014-12-16 Thread Akhil Das
Did you try something like:

//Get the last hour
val d = (System.currentTimeMillis() - 3600 * 1000)
val ex = "abc_" + d.toString().substring(0,7) + "*.json"


[image: Inline image 1]

Thanks
Best Regards

On Wed, Dec 17, 2014 at 5:05 AM, durga  wrote:
>
> Hi All,
>
> I need help with regex in my sc.textFile()
>
> I have lots of files with with epoch millisecond timestamp.
>
> ex:abc_1418759383723.json
>
> Now I need to consume last one hour files using the epoch time stamp as
> mentioned above.
>
> I tried couple of options , nothing seems working for me.
>
> If any one of you face this issue and got a solution , please help me.
>
> Appreciating your help,
>
> Thanks,
> D
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/S3-globbing-tp20731.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2014-12-16 Thread Kyle Lin
I also got the same problem..

2014-12-09 22:58 GMT+08:00 Daniel Haviv :
>
> Hi,
> I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I
> get the following exception:
>
> 14/12/09 06:54:24 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4040
> 14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI'
> on port 4040.
> 14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040
> 14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address:
> http://0.0.0.0:8188/ws/v1/timeline/
> java.lang.NoClassDefFoundError:
> org/codehaus/jackson/map/deser/std/StdDeserializer
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> Any idea why ?
>
> Thanks,
> Daniel
>
>
>


Re: NullPointerException on cluster mode when using foreachPartition

2014-12-16 Thread Shixiong Zhu
Could you post the stack trace?


Best Regards,
Shixiong Zhu

2014-12-16 23:21 GMT+08:00 richiesgr :
>
> Hi
>
> This time I need expert.
> On 1.1.1 and only in cluster (standalone or EC2)
> when I use this code :
>
> countersPublishers.foreachRDD(rdd => {
> rdd.foreachPartition(partitionRecords => {
>   partitionRecords.foreach(record => {
> //dbActorUpdater ! updateDBMessage(record)
> println(record)
>   })
> })
>   })
>
> Get NPP (When I run this locally all is OK)
>
> If I use this
>   countersPublishers.foreachRDD(rdd => rdd.collect().foreach(r =>
> dbActorUpdater ! updateDBMessage(r)))
>
> There is no problem. I think something is misconfigured
> Thanks for help
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-on-cluster-mode-when-using-foreachPartition-tp20719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Locality Level Kryo

2014-12-16 Thread aecc
Hi guys,

I get Kryo exceptions of the type "unregistered class id" and "cannot cast
to class" when the locality level of the tasks go beyond LOCAL. 
However I get no Kryo exceptions during shuffling operations.

If the locality level never goes beyond LOCAL everything works fine.

Is there a special reason for this? How does kryo handles the tasks that are
not in the same locality?

Cheers



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Locality-Level-Kryo-tp20740.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org