Check your cluster UI to ensure that workers are registered and have sufficient memory

2014-05-05 Thread Sai Prasanna
I executed the following commands to launch spark app with yarn client
mode. I have Hadoop 2.3.0, Spark 0.8.1 and Scala 2.9.3

SPARK_HADOOP_VERSION=2.3.0 SPARK_YARN=true sbt/sbt assembly

SPARK_YARN_MODE=true \
SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.3.0.jar
\
SPARK_YARN_APP_JAR=examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar
MASTER=yarn-client ./spark-shell

The spark context in the interactive shell is set properly, but after that
when i submit jobs, it tells that the application has not received any
resources.

LOGS:
DAGScheduler: Submitting 4 missing tasks from Stage 0 (MappedRDD[1] at
textFile at :12)
YarnClientClusterScheduler: Adding task set 0.0 with 4 tasks
WARN YarnClientClusterScheduler: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory

What have i missed, i did start spark master and worker and have configured
SPARK_MEM.

Any help will be great !!


unsibscribe

2014-05-05 Thread Konstantin Kudryavtsev
unsibscribe

Thank you,
Konstantin Kudryavtsev


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
How could I do iteration? because the persist is lazy and recomputing may
required, all the path of iteration will be save, memory overflow can not be
escaped?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5359.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


unsubscribe

2014-05-05 Thread Shubhabrata Roy

unsubscribe


Re: what`s the meaning of primitive in "gradient descent primitive"?

2014-05-05 Thread Sean Owen
I understood it to mean module, unit of functionality, subroutine.

On May 5, 2014 3:50 AM, "phoenix bai"  wrote:
>
> Hi all,
>
> I am reading the doc of spark (
http://spark.apache.org/docs/0.9.0/mllib-guide.html#gradient-descent-primitive).
I am trying to translate the doc into Chinese, and there it talks about
gradient descent primitive, and but i am not quite sure what it mean by
primitive?
>
> I know gradient descent, but I am not sure what he is referring to by
gradient descent primitive? does it mean the most basic form of gradient
descent?
>
> please enlighten me. thank you in advance.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
.set("spark.cleaner.ttl", "120") drops broadcast_0 which makes a Exception
below. It is strange, because broadcast_0 is no need, and I have broadcast_3
instead, and recent RDD is persisted, there is no need for recomputing...
what is the problem? need help.


~~~
14/05/05 17:03:12 INFO storage.MemoryStore: ensureFreeSpace(52474562) called
with curMem=145126640, maxMem=1145359564
14/05/05 17:03:12 INFO storage.MemoryStore: Block broadcast_3 stored as
values to memory (estimated size 50.0 MB, free 903.9 MB)
14/05/05 17:03:12 INFO scheduler.DAGScheduler: shuffleToMapStage 0 --> 0
14/05/05 17:03:12 INFO scheduler.DAGScheduler: stageIdToStage 0 --> 0
14/05/05 17:03:12 INFO scheduler.DAGScheduler: stageIdToJobIds 0 --> 0
~

Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154)
Caused by: org.apache.spark.SparkException: Job aborted: Task 9.0:48 failed
4 times (most recent failure: Exception failure:
java.io.FileNotFoundException: http://192.168.7.41:3503/broadcast_0)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Is any idea on architecture based on Spark + Spray + Akka

2014-05-05 Thread Quintus Zhou
Hi, Yi

Your project sounds interesting to me, Im also working on 3g4g communication 
domain, besides Ive also done a tiny project based on hadoop, which analyzes 
execution logs.   Recently, Im planed to pick it up again. So, if you don't 
mind, may i know the   introduction of your log analyzing project. 

Regards
Yuding


Sent from my iPhone

On 2014-5-5, at 11:37, ZhangYi  wrote:

> Hi all,
> 
> Currently, our project is planning to adopt spark to be big data platform. 
> For the client side, we decide expose REST api based on Spray. Our domain is 
> focus on communication field for 3G and 4G user of processing some data 
> analyst and statictics . Now, Spark + Spray is brand new for us, and we can't 
> find any best practice via google. 
> 
> In our opinion, event-driven architecture is good choice for our project 
> maybe. However, more idea is welcome. Thanks.  
> 
> -- 
> ZhangYi (张逸)
> Developer
> tel: 15023157626
> blog: agiledon.github.com
> weibo: tw张逸
> Sent with Sparrow
> 


unsibscribe

2014-05-05 Thread Chhaya Vishwakarma
unsibscribe

Regards,
Chhaya Vishwakarma



The contents of this e-mail and any attachment(s) may contain confidential or 
privileged information for the intended recipient(s). Unintended recipients are 
prohibited from taking action on the basis of information in this e-mail and 
using or disseminating the information, and must notify the sender and delete 
it from their system. L&T Infotech will not accept responsibility or liability 
for the accuracy or completeness of, or the presence of any virus or disabling 
code in this e-mail"


java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-0000/2/stdout (No such file or directory)

2014-05-05 Thread Francis . Hu
Hi,All

 

 

We run a spark cluster with three workers. 

created a spark streaming application,

then run the spark project using below command:

 

shell> sbt run spark://192.168.219.129:7077 tcp://192.168.20.118:5556 foo

 

we looked at the webui of workers, jobs failed without any error or info,
but FileNotFoundException occurred in workers' log file as below:

Is this an existent issue of spark? 

 

 

-in workers'
logs/spark-francis-org.apache.spark.deploy.worker.Worker-1-ubuntu-4.out-
---

 

14/05/05 02:39:39 WARN AbstractHttpConnection:
/logPage/?appId=app-20140505053550-&executorId=2&logType=stdout

java.io.FileNotFoundException:
/test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or
directory)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.(FileInputStream.java:138)

at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s
cala:52)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s
cala:52)

at
org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)

at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java
:1040)

at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:
976)

at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135
)

at
org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)

at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1
16)

at org.eclipse.jetty.server.Server.handle(Server.java:363)

at
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo
nnection.java:483)

at
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC
onnection.java:920)

at
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet
e(AbstractHttpConnection.java:982)

at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635)

at
org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)

at
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java
:82)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.
java:628)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j
ava:52)

at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:
608)

at
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:5
43)

at java.lang.Thread.run(Thread.java:722)

14/05/05 02:39:41 WARN AbstractHttpConnection:
/logPage/?appId=app-20140505053550-&executorId=9&logType=stderr

java.io.FileNotFoundException:
/test/spark-0.9.1/work/app-20140505053550-/9/stderr (No such file or
directory)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.(FileInputStream.java:138)

at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s
cala:52)

at
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.s
cala:52)

at
org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)

at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java
:1040)

at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:
976)

at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135
)

at
org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)

at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:1
16)

at org.eclipse.jetty.server.Server.handle(Server.java:363)

at
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpCo
nnection.java:483)

at
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpC
onnection.java:920)

at
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplet
e(AbstractHttpConnection.java:982)

at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635)

at
org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)

at
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java
:82)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.
java:628)

at
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.j
ava:52)

Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
Using checkpoint. It removes dependences:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
RDD.checkpoint works fine. But spark.cleaner.ttl is really ugly for broadcast
cleaning. May be it could be removed automatically when no dependences.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5369.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Cheng Lian
Have you tried Broadcast.unpersist()?


On Mon, May 5, 2014 at 6:34 PM, Earthson  wrote:

> RDD.checkpoint works fine. But spark.cleaner.ttl is really ugly for
> broadcast
> cleaning. May be it could be removed automatically when no dependences.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cache-issue-for-iteration-with-broadcast-tp5350p5369.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Spark Streaming and JMS

2014-05-05 Thread Patrick McGloin
Hi all,

Is there a "best practice" for subscribing to JMS with Spark Streaming?  I
have searched but not found anything conclusive.

In the absence of a standard practice the solution I was thinking of was to
use Akka + Camel (akka.camel.Consumer) to create a subscription for a Spark
Streaming Custom Receiver.  So the actor would look something like this:

class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
Consumer {
  //e.g. "jms:sonicmq://localhost:2506/queue?destination=SampleQ1"
  def endpointUri = jmsURI
  lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)

  protected override def onStart() {
blockGenerator.start
  }

  def receive = {
case msg: CamelMessage => { blockGenerator += msg.body }
case _ => { /* ... */ }
  }

  protected override def onStop() {
blockGenerator.stop
  }
}

And then in the main application create receivers like this:

val ssc = new StreamingContext(...)
object tascQueue extends JmsReceiver[String](ssc) {
override def getReceiver():JmsReceiver[String] = {
new JmsReceiver("jms:sonicmq://localhost:2506/queue?destination=TascQueue")
}
}
ssc.registerInputStream(tascQueue)

Is this the best way to go?

Best regards,
Patrick


Re: master attempted to re-register the worker and then took all workers as unregistered

2014-05-05 Thread Nan Zhu
Ah, I think this should be fixed in 0.9.1?  

Did you see the exception is thrown in the worker side?

Best, 

-- 
Nan Zhu


On Sunday, May 4, 2014 at 10:15 PM, Cheney Sun wrote:

> Hi Nan, 
> 
> Have you found a way to fix the issue? Now I run into the same problem with
> version 0.9.1.
> 
> Thanks,
> Cheney
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/master-attempted-to-re-register-the-worker-and-then-took-all-workers-as-unregistered-tp553p5341.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com 
> (http://Nabble.com).
> 
> 




Re: Shark on cloudera CDH5 error

2014-05-05 Thread manas Kar
No replies yet. Guess everyone who had this problem knew the obvious reason
why the error occurred. 
It took me some time to figure out the work around though. 

It seems shark depends on
/var/lib/spark/shark-0.9.1/lib_managed/jars/org.apache.hadoop/hadoop-core/hadoop-core.jar
for client server communication.

CDH5 should rely on hadoop-core-2.3.0-mr1-cdh5.0.0.jar. 

1) Grab it from other CDH modules(I chose hadoop) and get this jar from it's
library. 
2) Remove the jar in
/var/lib/spark/shark-0.9.1/lib_managed/jars/org.apache.hadoop/hadoop-core
3) place the jar from(step1) in hadoop-core folder of step2.

Hope this saves some time for some one who has the similar problem.

..Manas




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shark-on-cloudera-CDH5-error-tp5226p5374.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: configure spark history server for running on Yarn

2014-05-05 Thread Tom Graves
Since 1.0 is still in development you can pick up the latest docs in git: 
https://github.com/apache/spark/tree/branch-1.0/docs

I didn't see anywhere that you said you started the spark history server?

there are multiple things that need to happen for the spark history server to 
work.

1) configure your application to save the history logs - see the eventLog 
settings here 
https://github.com/apache/spark/blob/branch-1.0/docs/configuration.md

2) On yarn -  know the host/port where you are going to start the spark history 
server and configure: spark.yarn.historyServer.address to point to it.  Note 
that this purely makes the link from the ResourceManager UI properly point to 
the Spark History Server Daemon.

3) Start the spark history server pointing to the same directory as specified 
in your application (spark.eventLog.dir)

4) run your application. once it finishes then you can either go to the RM UI 
to link to the spark history UI or go directly to the spark history server ui.

Tom
On Thursday, May 1, 2014 7:09 PM, Jenny Zhao  wrote:
 
Hi,

I have installed spark 1.0 from the branch-1.0, build went fine, and I have 
tried running the example on Yarn client mode, here is my command: 

/home/hadoop/spark-branch-1.0/bin/spark-submit 
/home/hadoop/spark-branch-1.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.2.0.jar
 --master yarn --deploy-mode client --executor-memory 6g --executor-cores 3 
--driver-memory 3g --name SparkPi --num-executors 2 --class 
org.apache.spark.examples.SparkPi yarn-client 5

after the run, I was not being able to retrieve the log from Yarn's web UI, 
while I have tried to specify the history server in spark-env.sh 

export SPARK_DAEMON_JAVA_OPTS="-Dspark.yarn.historyServer.address=master:18080"


I also tried to specify it in spark-defaults.conf, doesn't work as well, I 
would appreciate if someone can tell me what is the way of specifying it either 
in spark-env.sh or spark-defaults.conf, so that this option can be applied to 
any spark application. 


another thing I found is the usage output for spark-submit is not complete/not 
in sync with the online documentation, hope it is addressed with the formal 
release. 

and is this the latest documentation for spark 1.0? 
http://people.csail.mit.edu/matei/spark-unified-docs/running-on-yarn.html

Thank you! 

Spark GCE Script

2014-05-05 Thread Akhil Das
Hi Sparkers,

We have created a quick spark_gce script which can launch a spark cluster
in the Google Cloud. I'm sharing it because it might be helpful for someone
using the Google Cloud for deployment rather than AWS.

Here's the link to the script

https://github.com/sigmoidanalytics/spark_gce

Feel free to use it and suggest any feedback around it.

In short here's what it does:

Just like the spark_ec2 script, this one also reads certain command-line
arguments (See the github
page for
more details) like the cluster name and all, then starts the machines in
the google cloud, sets up the network, adds a 500GB empty disk to all
machines, generate the ssh keys on master and transfer it to all slaves and
install java and downloads and configures Spark/Shark/Hadoop. Also it
starts the shark server automatically. Currently the version is 0.9.1 but
I'm happy to add/support more versions if anyone is interested.


Cheers.


Thanks
Best Regards


Re: Using google cloud storage for spark big data

2014-05-05 Thread Akhil Das
Hi Aureliano,

You might want to check this script out,
https://github.com/sigmoidanalytics/spark_gce
Let me know if you need any help around that.

Thanks
Best Regards


On Tue, Apr 22, 2014 at 7:12 PM, Aureliano Buendia wrote:

>
>
>
> On Tue, Apr 22, 2014 at 10:50 AM, Andras Nemeth <
> andras.nem...@lynxanalytics.com> wrote:
>
>> We don't have anything fancy. It's basically some very thin layer of
>> google specifics on top of a stand alone cluster. We basically created two
>> disk snapshots, one for the master and one for the workers. The snapshots
>> contain initialization scripts so that the master/worker daemons are
>> started on boot. So if I want a cluster I just create a new instance (with
>> a fixed name) using the master snapshot for the master. When it is up I
>> start as many slave instances as I need using the slave snapshot. By the
>> time the machines are up the cluster is ready to be used.
>>
>>
> This sounds like being a lot simpler than the existing spark-ec2 script.
> Does google compute engine api makes this happen in a simple way, when
> compared to ec2 api? Does your script do everything spark-ec2 does?
>
> Also, any plans to make this open source?
>
>
>> Andras
>>
>>
>>
>> On Mon, Apr 21, 2014 at 10:04 PM, Mayur Rustagi 
>> wrote:
>>
>>> Okay just commented on another thread :)
>>> I have one that I use internally. Can give it out but will need some
>>> support from you to fix bugs etc. Let me know if you are interested.
>>>
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi 
>>>
>>>
>>>
>>> On Fri, Apr 18, 2014 at 9:08 PM, Aureliano Buendia >> > wrote:
>>>
 Thanks, Andras. What approach did you use to setup a spark cluster on
 google compute engine? Currently, there is no production-ready official
 support for an equivalent of spark-ec2 on gce. Did you roll your own?


 On Thu, Apr 17, 2014 at 10:24 AM, Andras Nemeth <
 andras.nem...@lynxanalytics.com> wrote:

> Hello!
>
> On Wed, Apr 16, 2014 at 7:59 PM, Aureliano Buendia <
> buendia...@gmail.com> wrote:
>
>> Hi,
>>
>> Google has publisheed a new connector for hadoop: google cloud
>> storage, which is an equivalent of amazon s3:
>>
>>
>> googlecloudplatform.blogspot.com/2014/04/google-bigquery-and-datastore-connectors-for-hadoop.html
>>
> This is actually about Cloud Datastore and not Cloud Storage (yeah,
> quite confusing naming ;) ). But they do already have for a while a cloud
> storage connector, also linked from your article:
> https://developers.google.com/hadoop/google-cloud-storage-connector
>
>
>>
>>
>> How can spark be configured to use this connector?
>>
> Yes, it can, but in a somewhat hacky way. The problem is that for some
> reason Google does not officially publish the library jar alone, you get 
> it
> installed as part of a Hadoop on Google Cloud installation. So, the
> official way would be (we did not try that) to have a Hadoop on Google
> Cloud installation and run spark on top of that.
>
> The other option - that we did try and which works fine for us - is to
> snatch the jar:
> https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-1.2.4.jar,
> make sure it's shipped to your workers (e.g. with setJars on SparkConf 
> when
> you create your SparkContext). Then create a core-site.xml file which you
> make sure is on the classpath both in your driver and your cluster (e.g.
> you can make sure it ends up in one of the jars you send with setJars
> above) with this content (with YOUR_* replaced):
> 
>
> fs.gs.implcom.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
>   fs.gs.project.id
> YOUR_PROJECT_ID
>
> fs.gs.system.bucketYOUR_FAVORITE_BUCKET
> 
>
> From this point on you can simply use gs://... filenames to read/write
> data on Cloud Storage.
>
> Note that you should run your cluster and driver program on Google
> Compute Engine for this to work as is. Probably it's possible to configure
> access from the outside too but we didn't do that.
>
> Hope this helps,
> Andras
>
>
>
>
>

>>>
>>
>


Re: performance improvement on second operation...without caching?

2014-05-05 Thread Ethan Jewett
Thanks Patrick and Matei for the clarification. I actually have to update
some code now, as I was apparently relying on the fact that the output
files are being re-used. Explains some edge-case behavior that I've seen.

For me, at least, I read the guide, did some tests on fairly extensive RDD
dependency graphs, saw that tasks earlier in the dependency graphs were not
being regenerated and assumed (very much incorrectly I just found out!)
that it was because the RDDs themselves were being cached. I wonder if
there is a way to explain this distinction concisely in the programming
guide. Or maybe I'm the only one that went down this incorrect learning
path :-)

Ethan


On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia wrote:

> Yes, this happens as long as you use the same RDD. For example say you do
> the following:
>
> data1 = sc.textFile(…).map(…).reduceByKey(…)
> data1.count()
> data1.filter(…).count()
>
> The first count() causes outputs of the map/reduce pair in there to be
> written out to shuffle files. Next time you do a count, on either this RDD
> or a child (e.g. after the filter), we notice that output files were
> already generated for this shuffle so we don’t rerun the map stage. Note
> that the output does get read again over the network, which is kind of
> wasteful (if you really wanted to reuse this as quickly as possible you’d
> use cache()).
>
> Matei
>
> On May 3, 2014, at 8:44 PM, Koert Kuipers  wrote:
>
> Hey Matei,
> Not sure i understand that. These are 2 separate jobs. So the second job
> takes advantage of the fact that there is map output left somewhere on disk
> from the first job, and re-uses that?
>
>
> On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia wrote:
>
>> Hi Diana,
>>
>> Apart from these reasons, in a multi-stage job, Spark saves the map
>> output files from map stages to the filesystem, so it only needs to rerun
>> the last reduce stage. This is why you only saw one stage executing. These
>> files are saved for fault recovery but they speed up subsequent runs.
>>
>> Matei
>>
>> On May 3, 2014, at 5:21 PM, Patrick Wendell  wrote:
>>
>> Ethan,
>>
>> What you said is actually not true, Spark won't cache RDD's unless you
>> ask it to.
>>
>> The observation here - that running the same job can speed up
>> substantially even without caching - is common. This is because other
>> components in the stack are performing caching and optimizations. Two that
>> can make a huge difference are:
>>
>> 1. The OS buffer cache. Which will keep recently read disk blocks in
>> memory.
>> 2. The Java just-in-time compiler (JIT) which will use runtime profiling
>> to significantly speed up execution speed.
>>
>> These can make a huge difference if you are running the same job
>> over-and-over. And there are other things like the OS network stack
>> increasing TCP windows and so fourth. These will all improve response time
>> as a spark program executes.
>>
>>
>> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett  wrote:
>>
>>> I believe Spark caches RDDs it has memory for regardless of whether you
>>> actually call the 'cache' method on the RDD. The 'cache' method just tips
>>> off Spark that the RDD should have higher priority. At least, that is my
>>> experience and it seems to correspond with your experience and with my
>>> recollection of other discussions on this topic on the list. However, going
>>> back and looking at the programming guide, this is not the way the
>>> cache/persist behavior is described. Does the guide need to be updated?
>>>
>>>
>>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll wrote:
>>>
 I'm just Posty McPostalot this week, sorry folks! :-)

 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end
 of the message):
 It reads in a bunch of XML files, parses them, extracts some data in a
 map, counts (using reduce), and then sorts.   All stages are executed when
 I do a final operation (take).  The first stage is the most expensive: on
 first run it takes 30s to a minute.

 I'm not caching anything.

 When I re-execute that take at the end, I expected it to re-execute all
 the same stages, and take approximately the same amount of time, but it
 didn't.  The second "take" executes only a single stage which collectively
 run very fast: the whole operation takes less than 1 second (down from 5
 minutes!)

 While this is awesome (!) I don't understand it.  If I'm not caching
 data, why would I see such a marked performance improvement on subsequent
 execution?

 (or is this related to the known .9.1 bug about sortByKey executing an
 action when it shouldn't?)

 Thanks,
 Diana
 

 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree

 # Given a partition containing multi-line XML, parse the contents.
 # Retu

Caused by: java.lang.OutOfMemoryError: unable to create new native thread

2014-05-05 Thread Soumya Simanta
I just upgraded my Spark version to 1.0.0_SNAPSHOT.


commit f25ebed9f4552bc2c88a96aef06729d9fc2ee5b3

Author: witgo 

Date:   Fri May 2 12:40:27 2014 -0700


I'm running a standalone cluster with 3 workers.

   - *Workers:* 3
   - *Cores:* 48 Total, 0 Used
   - *Memory:* 469.8 GB Total, 0.0 B Used

However, when I try to run bin/spark-shell I get the following error after
sometime even if I don't perform any operations on the Spark shell.




14/05/05 10:20:52 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.

Exception in thread "main" java.lang.reflect.InvocationTargetException

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:622)

at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:256)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:54)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

*Caused by: java.lang.OutOfMemoryError: unable to create new native thread*

at java.lang.Thread.start0(Native Method)

at java.lang.Thread.start(Thread.java:679)

at java.lang.UNIXProcess$1.run(UNIXProcess.java:157)

at java.security.AccessController.doPrivileged(Native Method)

at java.lang.UNIXProcess.(UNIXProcess.java:119)

at java.lang.ProcessImpl.start(ProcessImpl.java:81)

at java.lang.ProcessBuilder.start(ProcessBuilder.java:470)

at java.lang.Runtime.exec(Runtime.java:612)

at java.lang.Runtime.exec(Runtime.java:485)

at
scala.tools.jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:178)

at
scala.tools.jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:168)

at
scala.tools.jline.internal.TerminalLineSettings.stty(TerminalLineSettings.java:163)

at
scala.tools.jline.internal.TerminalLineSettings.get(TerminalLineSettings.java:67)

at
scala.tools.jline.internal.TerminalLineSettings.getProperty(TerminalLineSettings.java:87)

at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:127)

at
scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.java:933)

at
org.apache.spark.repl.SparkJLineReader$JLineConsoleReader.readOneKey(SparkJLineReader.scala:54)

at
org.apache.spark.repl.SparkJLineReader.readOneKey(SparkJLineReader.scala:81)

at
scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:29)

at
org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25)

at org.apache.spark.repl.SparkILoop$$anonfun$1.org
$apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576)

at
org.apache.spark.repl.SparkILoop$$anonfun$1$$anonfun$org$apache$spark$repl$SparkILoop$$anonfun$$fn$1$1.apply$mcZ$sp(SparkILoop.scala:576)

at
scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:32)

at
org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25)

at org.apache.spark.repl.SparkILoop$$anonfun$1.org
$apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576)

at
org.apache.spark.repl.SparkILoop$$anonfun$1$$anonfun$org$apache$spark$repl$SparkILoop$$anonfun$$fn$1$1.apply$mcZ$sp(SparkILoop.scala:576)

at
scala.tools.nsc.interpreter.InteractiveReader$class.readYesOrNo(InteractiveReader.scala:32)

at
org.apache.spark.repl.SparkJLineReader.readYesOrNo(SparkJLineReader.scala:25)

at org.apache.spark.repl.SparkILoop$$anonfun$1.org
$apache$spark$repl$SparkILoop$$anonfun$$fn$1(SparkILoop.scala:576)

at
org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:579)

at
org.apache.spark.repl.SparkILoop$$anonfun$1.applyOrElse(SparkILoop.scala:566)

at
scala.runtime.AbstractPartialFunction$mcZL$sp.apply$mcZL$sp(AbstractPartialFunction.scala:33)

at
scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:33)

at
scala.runtime.AbstractPartialFunction$mcZL$sp.apply(AbstractPartialFunction.scala:25)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)

at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:983)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

... 7 more


RE: another updateStateByKey question - updated w possible Spark bug

2014-05-05 Thread Adrian Mocanu
I’ve encountered this issue again and am able to reproduce it about 10% of the 
time.

1. Here is the input:
RDD[ (a, 126232566, 1), (a, 126232566, 2) ]
RDD[ (a, 126232566, 1), (a, 126232566, 3) ]
RDD[ (a, 126232566, 3) ]
RDD[ (a, 126232566, 4) ]
RDD[ (a, 126232566, 2) ]
RDD[ (a, 126232566, 5), (a, 126232566, 5) ]

2. Here are the actual results (printed DStream – each line is a new RDD with 
RDD Id being the last number on each line):
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(10,5,ArrayBuffer())),26)   <-empty elements 
Seq[V]
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)
(((a,126232566),StateClass(26,9,ArrayBuffer())),53)  <-empty elements Seq[V]
(((a,126232566),StateClass(26,9,ArrayBuffer())),59)  <-empty elements Seq[V]

3. Here are the expected results: (all tuples from #2 except those with empty 
Seq[V] )
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

4. Here is the code:
case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

val updateSumFunc = (values: Seq[(String, Long, Int)], state: 
Option[StateClass]) => {
//  if (values.isEmpty) {
//// if RDD cannot find values for this key (which is from prev RDD,
//// the tuple will not be shown in this RDD w values of 0
//None
//  } else {
val previousState = state.getOrElse(StateClass(0, 0, Seq()))
val currentCount = values.size + previousState.count
var currentSum=0
for (newValue <- values) yield ({
  currentSum = currentSum + newValue._3
})
currentSum= currentSum +previousState.sum
val elements = for (newValues <- values) yield ({
  newValues._3.toDouble
})
Some(StateClass(currentSum, currentCount, elements))
//  }
}

val partialResultSums= inputStream.map((x:(String, Long, Int)) =>((x._1), 
(x._1, x._2, x._3)))  //re map
.updateStateByKey[StateClass](updateSumFunc)  //update state
.transform(rdd=>rdd.map(t=>(t,rdd.id)))   //add RDD ID to RDD tuples

partialResultSums.print()

Now this is how I generate the RDDs and I suspect the delay is why the issue 
surfaces:

rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = 
List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) )

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= 
List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= 
List[(String, Long, Int)]((a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= 
List[(String, Long, Int)]((a, 126232566, 4))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= 
List[(String, Long, Int)]((a, 126232566, 2))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= 
List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5))

Thread.sleep(3100)
//ssc.awaitTermination()
ssc.stop()

In my use case when I detect an empty Seq[V] in updateStateByKey function I 
return None so I can filter the tuples out. However, given that Spark calls 
updateStateByKey function with empty Seq[V] when it should not, messes my logic 
up.
I wonder how to bypass this bug/feature of Spark.

Thanks
-Adrian
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-02-14 3:10 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: another updateStateByKey question


Could be a bug. Can you share a code with data that I can use to reproduce this?

TD
On May 2, 2014 9:49 AM, "Adrian Mocanu" 
mailto:amoc...@verticalscope.com>> wrote:
Has anyone else noticed that sometimes the same tuple calls update state 
function twice?
I have 2 tuples with the same key in 1 RDD part of DStream: RDD[ (a,1), (a,2) ]
When the update function is called the first time Seq[V] has data: 1, 2 which 
is correct: StateClass(3,2, ArrayBuffer(1, 2))
Then right away (in my output I see this) the same key is used and the function 
is called again but this time Seq is empty: StateClass(3,2, ArrayBuffer( ))

In the update function I also save Seq[V] to state so I can see it in the RDD. 
I also show a count

RE: another updateStateByKey question - updated w possible Spark bug

2014-05-05 Thread Adrian Mocanu
Forgot to mention my batch interval is 1 second:
val ssc = new StreamingContext(conf, Seconds(1))
hence the Thread.sleep(1100)

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: May-05-14 12:06 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: RE: another updateStateByKey question - updated w possible Spark bug

I’ve encountered this issue again and am able to reproduce it about 10% of the 
time.

1. Here is the input:
RDD[ (a, 126232566, 1), (a, 126232566, 2) ]
RDD[ (a, 126232566, 1), (a, 126232566, 3) ]
RDD[ (a, 126232566, 3) ]
RDD[ (a, 126232566, 4) ]
RDD[ (a, 126232566, 2) ]
RDD[ (a, 126232566, 5), (a, 126232566, 5) ]

2. Here are the actual results (printed DStream – each line is a new RDD with 
RDD Id being the last number on each line):
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(10,5,ArrayBuffer())),26)   <-empty elements 
Seq[V]
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)
(((a,126232566),StateClass(26,9,ArrayBuffer())),53)  <-empty elements Seq[V]
(((a,126232566),StateClass(26,9,ArrayBuffer())),59)  <-empty elements Seq[V]

3. Here are the expected results: (all tuples from #2 except those with empty 
Seq[V] )
(((a,126232566),StateClass(3,2,ArrayBuffer(1.0, 2.0))),6)
(((a,126232566),StateClass(7,4,ArrayBuffer(1.0, 3.0))),13)
(((a,126232566),StateClass(10,5,ArrayBuffer(3.0))),20)
(((a,126232566),StateClass(14,6,ArrayBuffer(4.0))),33)
(((a,126232566),StateClass(16,7,ArrayBuffer(2.0))),40)
(((a,126232566),StateClass(26,9,ArrayBuffer(5.0, 5.0))),47)

4. Here is the code:
case class StateClass(sum:Integer, count:Integer, elements:Seq[Double])

val updateSumFunc = (values: Seq[(String, Long, Int)], state: 
Option[StateClass]) => {
//  if (values.isEmpty) {
//// if RDD cannot find values for this key (which is from prev RDD,
//// the tuple will not be shown in this RDD w values of 0
//None
//  } else {
val previousState = state.getOrElse(StateClass(0, 0, Seq()))
val currentCount = values.size + previousState.count
var currentSum=0
for (newValue <- values) yield ({
  currentSum = currentSum + newValue._3
})
currentSum= currentSum +previousState.sum
val elements = for (newValues <- values) yield ({
  newValues._3.toDouble
})
Some(StateClass(currentSum, currentCount, elements))
//  }
}

val partialResultSums= inputStream.map((x:(String, Long, Int)) =>((x._1), 
(x._1, x._2, x._3)))  //re map
.updateStateByKey[StateClass](updateSumFunc)  //update state
.transform(rdd=>rdd.map(t=>(t,rdd.id)))   //add RDD ID to RDD tuples

partialResultSums.print()

Now this is how I generate the RDDs and I suspect the delay is why the issue 
surfaces:

rddQueue += ssc.sparkContext.makeRDD(smallWindow1)  // smallWindow1 = 
List[(String, Long, Int)]( (a, 126232566, 1), (a, 126232566, 2) )

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow2)   // smallWindow2= 
List[(String, Long, Int)]((a, 126232566, 1), (a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow3)   // smallWindow3= 
List[(String, Long, Int)]((a, 126232566, 3))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow4)   // smallWindow4= 
List[(String, Long, Int)]((a, 126232566, 4))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow5)   // smallWindow5= 
List[(String, Long, Int)]((a, 126232566, 2))

Thread.sleep(1100)
rddQueue += ssc.sparkContext.makeRDD(smallWindow6)   // smallWindow6= 
List[(String, Long, Int)]((a, 126232566, 5), (a, 126232566, 5))

Thread.sleep(3100)
//ssc.awaitTermination()
ssc.stop()

In my use case when I detect an empty Seq[V] in updateStateByKey function I 
return None so I can filter the tuples out. However, given that Spark calls 
updateStateByKey function with empty Seq[V] when it should not, messes my logic 
up.
I wonder how to bypass this bug/feature of Spark.

Thanks
-Adrian
From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: May-02-14 3:10 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: another updateStateByKey question


Could be a bug. Can you share a code with data that I can use to reproduce this?

TD
On May 2, 2014 9:49 AM, "Adrian Mocanu" 
mailto:amoc...@verticalscope.com>> wrote:
Has anyone else noticed that sometimes the same tuple calls update state 
function twice?
I have 2 tuples with the same key in 1 RDD p

Re: Caused by: java.lang.OutOfMemoryError: unable to create new native thread

2014-05-05 Thread ssimanta
Thanks Wyane. 

Maybe that's is what is happening. My current limits are. 

$ ps -u ssimanta -L | wc -l (with Spark  and spark-shell *not* running) 
790
$ ulimit -u
1024

Once I start Spark my limit increases to 

$ ps -u ssimanta -L | wc -l (with Spark and spark-shell running) 

982

Any recommendations about how large should this limit be ? I'm assuming this
limit needs to be changed on all my Spark nodes.

thanks
-Soumya





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Caused-by-java-lang-OutOfMemoryError-unable-to-create-new-native-thread-tp5379p5383.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Comprehensive Port Configuration reference?

2014-05-05 Thread Scott Clasen
Is there somewhere documented how one would go about configuring every open
port a spark application needs?

This seems like one of the main things that make running spark hard in
places like EC2 where you arent using the canned spark scripts.

Starting an app looks like you'll see ports open for

BlockManager
OutoutTracker
FileServer
WebUI
Local port to get callbacks from mesos master..

What else?

How do I configure all of these?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Comprehensive-Port-Configuration-reference-tp5384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Cache issue for iteration with broadcast

2014-05-05 Thread Earthson
Yes, I've tried.

The problem is new broadcast object generated by every step until eat up all
of the memory. 

I solved it by using RDD.checkpoint to remove dependences to old broadcast
object, and use cleanner.ttl to clean up these broadcast object
automatically. 

If there's more elegant way to solve this problem, please tell me:) 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Cache-issue-for-iteration-with-broadcast-tp5350p5385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark GCE Script

2014-05-05 Thread Matei Zaharia
Very cool! Have you thought about sending this as a pull request? We’d be happy 
to maintain it inside Spark, though it might be interesting to find a single 
Python package that can manage clusters across both EC2 and GCE.

Matei

On May 5, 2014, at 7:18 AM, Akhil Das  wrote:

> Hi Sparkers,
> 
> We have created a quick spark_gce script which can launch a spark cluster in 
> the Google Cloud. I'm sharing it because it might be helpful for someone 
> using the Google Cloud for deployment rather than AWS.
> 
> Here's the link to the script
> 
> https://github.com/sigmoidanalytics/spark_gce
> 
> Feel free to use it and suggest any feedback around it.
> 
> In short here's what it does:
> 
> Just like the spark_ec2 script, this one also reads certain command-line 
> arguments (See the github page for more details) like the cluster name and 
> all, then starts the machines in the google cloud, sets up the network, adds 
> a 500GB empty disk to all machines, generate the ssh keys on master and 
> transfer it to all slaves and install java and downloads and configures 
> Spark/Shark/Hadoop. Also it starts the shark server automatically. Currently 
> the version is 0.9.1 but I'm happy to add/support more versions if anyone is 
> interested.
> 
> 
> Cheers.
> 
> 
> Thanks
> Best Regards



RE: spark-shell driver interacting with Workers in YARN mode - firewall blocking communication

2014-05-05 Thread Jacob Eisinger

Howdy Andrew,

I agree; the subnet idea is a good one...  unfortunately, it doesn't really
help to secure the network.

You mentioned that the drivers need to talk to the workers.  I think it is
slightly broader - all of the workers and the driver/shell need to be
addressable from/to each other on any dynamic port.

I would check out setting the environment variable SPARK_LOCAL_IP [1].
This seems to enable Spark to bind correctly to a private subnet.

Jacob

[1]  http://spark.apache.org/docs/latest/configuration.html

Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075



From:   Andrew Lee 
To: "user@spark.apache.org" 
Date:   05/04/2014 09:57 PM
Subject:RE: spark-shell driver interacting with Workers in YARN mode -
firewall blocking communication



Hi Jacob,

Taking both concerns into account, I'm actually thinking about using a
separate subnet to isolate the Spark Workers, but need to look into how to
bind the process onto the correct interface first. This may require some
code change.
Separate subnet doesn't limit itself with port range so port exhaustion
should rarely happen, and won't impact performance.

By opening up all port between 32768-61000 is actually the same as no
firewall, this expose some security concerns, but need more information
whether that is critical or not.

The bottom line is the driver needs to talk to the Workers. The way how
user access the Driver should be easier to solve such as launching Spark
(shell) driver on a specific interface.

Likewise, if you found out any interesting solutions, please let me know.
I'll share the solution once I have something up and running. Currently, it
is running ok with iptables off, but still need to figure out how to
product-ionize the security part.

Subject: RE: spark-shell driver interacting with Workers in YARN mode -
firewall blocking communication
To: user@spark.apache.org
From: jeis...@us.ibm.com
Date: Fri, 2 May 2014 16:07:50 -0500

Howdy Andrew,

I think I am running into the same issue [1] as you.  It appears that Spark
opens up dynamic / ephemera [2] ports for each job on the shell and the
workers.  As you are finding out, this makes securing and managing the
network for Spark very difficult.

> Any idea how to restrict the 'Workers' port range?
The port range can be found by running:
  $ sysctl net.ipv4.ip_local_port_range
  net.ipv4.ip_local_port_range = 32768 61000

With that being said, a couple avenues you may try:
  Limit the dynamic ports [3] to a more reasonable number and open all
  of these ports on your firewall; obviously, this might have
  unintended consequences like port exhaustion.
  Secure the network another way like through a private VPN; this may
  reduce Spark's performance.

If you have other workarounds, I am all ears --- please let me know!
Jacob

[1]
http://apache-spark-user-list.1001560.n3.nabble.com/Securing-Spark-s-Network-tp4832p4984.html

[2] http://en.wikipedia.org/wiki/Ephemeral_port
[3]
http://www.cyberciti.biz/tips/linux-increase-outgoing-network-sockets-range.html


Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075

Andrew Lee ---05/02/2014 03:15:42 PM---Hi Yana,  I did. I configured the
the port in spark-env.sh, the problem is not the driver port which

From: Andrew Lee 
To: "user@spark.apache.org" 
Date: 05/02/2014 03:15 PM
Subject: RE: spark-shell driver interacting with Workers in YARN mode -
firewall blocking communication



Hi Yana,

I did. I configured the the port in spark-env.sh, the problem is not the
driver port which is fixed.
it's the Workers port that are dynamic every time when they are launched in
the YARN container. :-(

Any idea how to restrict the 'Workers' port range?

Date: Fri, 2 May 2014 14:49:23 -0400
Subject: Re: spark-shell driver interacting with Workers in YARN mode -
firewall blocking communication
From: yana.kadiy...@gmail.com
To: user@spark.apache.org

I think what you want to do is set spark.driver.port to a fixed port.


On Fri, May 2, 2014 at 1:52 PM, Andrew Lee  wrote:
  Hi All,

  I encountered this problem when the firewall is enabled between the
  spark-shell and the Workers.

  When I launch spark-shell in yarn-client mode, I notice that Workers
  on the YARN containers are trying to talk to the driver
  (spark-shell), however, the firewall is not opened and caused
  timeout.

  For the Workers, it tries to open listening ports on 54xxx for each
  Worker? Is the port random in such case?
  What will be the better way to predict the ports so I can configure
  the firewall correctly between the driver (spark-shell) and the
  Workers? Is there a range of ports we can specify in the
  firewall/iptables?

  Any ideas?


Re: CDH 5.0 and Spark 0.9.0

2014-05-05 Thread Paul Schooss
Hello Sean,

Thanks a bunch, I am not currently working HA mode.

The configuration is identical to our CDH4 setup which perfectly fine. It's
really strange how only spark breaks with this enabled.

On Thu, May 1, 2014 at 3:06 AM, Sean Owen  wrote:

> This codec does require native libraries to be installed, IIRC, but
> they are installed with CDH 5.
>
> The error you show does not look related though. Are you sure your HA
> setup is working and that you have configured it correctly in whatever
> config spark is seeing?
> --
> Sean Owen | Director, Data Science | London
>
>
> On Thu, May 1, 2014 at 12:44 AM, Paul Schooss 
> wrote:
> > Hello,
> >
> > So I was unable to run the following commands from the spark shell with
> CDH
> > 5.0 and spark 0.9.0, see below.
> >
> > Once I removed the property
> >
> > 
> > io.compression.codec.lzo.class
> > com.hadoop.compression.lzo.LzoCodec
> > true
> > 
> >
> > from the core-site.xml on the node, the spark commands worked. Is there a
> > specific setup I am missing?
> >
> > scala> var log = sc.textFile("hdfs://jobs-ab-hnn1//input/core-site.xml")
> > 14/04/30 22:43:16 INFO MemoryStore: ensureFreeSpace(78800) called with
> > curMem=150115, maxMem=308713881
> > 14/04/30 22:43:16 INFO MemoryStore: Block broadcast_1 stored as values to
> > memory (estimated size 77.0 KB, free 294.2 MB)
> > 14/04/30 22:43:16 WARN Configuration: mapred-default.xml:an attempt to
> > override final parameter: mapreduce.tasktracker.cache.local.size;
> Ignoring.
> > 14/04/30 22:43:16 WARN Configuration: yarn-site.xml:an attempt to
> override
> > final parameter: mapreduce.output.fileoutputformat.compress.type;
> Ignoring.
> > 14/04/30 22:43:16 WARN Configuration: hdfs-site.xml:an attempt to
> override
> > final parameter: mapreduce.map.output.compress.codec; Ignoring.
> > log: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at
> > :12
> >
> > scala> log.count()
> > 14/04/30 22:43:03 WARN JobConf: The variable mapred.child.ulimit is no
> > longer used.
> > 14/04/30 22:43:04 WARN Configuration: mapred-default.xml:an attempt to
> > override final parameter: mapreduce.tasktracker.cache.local.size;
> Ignoring.
> > 14/04/30 22:43:04 WARN Configuration: yarn-site.xml:an attempt to
> override
> > final parameter: mapreduce.output.fileoutputformat.compress.type;
> Ignoring.
> > 14/04/30 22:43:04 WARN Configuration: hdfs-site.xml:an attempt to
> override
> > final parameter: mapreduce.map.output.compress.codec; Ignoring.
> > java.lang.IllegalArgumentException: java.net.UnknownHostException:
> > jobs-a-hnn1
> > at
> >
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
> > at
> >
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237)
> > at
> >
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141)
> > at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:576)
> > at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:521)
> > at
> >
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
> > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
> > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
> > at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
> > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
> > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
> > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> > at
> >
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
> > at
> >
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
> > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
> > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> > at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
> > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> > at scala.Option.getOrElse(Option.scala:120)
> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> > at org.apache.spark.SparkContext.runJob(SparkContext.scala:902)
> > at org.apache.spark.rdd.RDD.count(RDD.scala:720)
> > at $iwC$$iwC$$iwC$$iwC.(:15)
> > at $iwC$$iwC$$iwC.(:20)
> > at $iwC$$iwC.(:22)
> > at $iwC.(:24)
> > at (:26)
> > at .(:30)
> > at .()
> > at .(:7)
> > at .()
> > at $print()
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> > at java.lang.reflect.Method.invoke(Method.java:597)
> > at
> org.apache.spark.repl.SparkIMain$ReadEvalPri

Problem with sharing class across worker nodes using spark-shell on Spark 1.0.0

2014-05-05 Thread Soumya Simanta
Hi,

I'm trying to run a simple Spark job that uses a 3rd party class (in this
case twitter4j.Status) in the spark-shell using spark-1.0.0_SNAPSHOT

I'm starting my bin/spark-shell with the following command.

./spark-shell 
*--driver-class-path*"$LIBPATH/jodatime2.3/joda-convert-1.2.jar:$LIBPATH/jodatime2.3/joda-time-2.3/joda-time-2.3.jar:$LIBPATH/twitter4j-core-3.0.5.jar"
*--jars*
$LIBPATH/jodatime2.3/joda-convert-1.2.jar,$LIBPATH/jodatime2.3/joda-time-2.3/joda-time-2.3.jar,$LIBPATH/twitter4j-core-3.0.5.jar


My code was working fine in 0.9.1 when I used the following options that
were pointing to the same jar above.

export SPARK_CLASSPATH

export ADD_JAR


Now I'm getting a NoClassDefFoundError on each of my worker nodes

14/05/05 14:03:30 INFO TaskSetManager: Loss was due to
java.lang.NoClassDefFoundError: twitter4j/Status [duplicate 40]

14/05/05 14:03:30 INFO TaskSetManager: Starting task 0.0:26 as TID 73 on
executor 2: *worker1.xxx..* (NODE_LOCAL)


What am I missing here?


Thanks

-Soumya


Re: Spark GCE Script

2014-05-05 Thread Nicholas Chammas
I second this motion. :)

A unified "cloud deployment" tool would be absolutely great.


On Mon, May 5, 2014 at 1:34 PM, Matei Zaharia wrote:

> Very cool! Have you thought about sending this as a pull request? We’d be
> happy to maintain it inside Spark, though it might be interesting to find a
> single Python package that can manage clusters across both EC2 and GCE.
>
> Matei
>
> On May 5, 2014, at 7:18 AM, Akhil Das  wrote:
>
> Hi Sparkers,
>
> We have created a quick spark_gce script which can launch a spark cluster
> in the Google Cloud. I'm sharing it because it might be helpful for someone
> using the Google Cloud for deployment rather than AWS.
>
> Here's the link to the script
>
> https://github.com/sigmoidanalytics/spark_gce
>
> Feel free to use it and suggest any feedback around it.
>
> In short here's what it does:
>
> Just like the spark_ec2 script, this one also reads certain command-line
> arguments (See the github page 
> for
> more details) like the cluster name and all, then starts the machines in
> the google cloud, sets up the network, adds a 500GB empty disk to all
> machines, generate the ssh keys on master and transfer it to all slaves and
> install java and downloads and configures Spark/Shark/Hadoop. Also it
> starts the shark server automatically. Currently the version is 0.9.1 but
> I'm happy to add/support more versions if anyone is interested.
>
>
> Cheers.
>
>
> Thanks
> Best Regards
>
>
>


Re: Spark GCE Script

2014-05-05 Thread François Le lay
Has anyone considered using jclouds tooling to support multiple cloud 
providers? Maybe using Pallet?

François

> On May 5, 2014, at 3:22 PM, Nicholas Chammas  
> wrote:
> 
> I second this motion. :)
> 
> A unified "cloud deployment" tool would be absolutely great.
> 
> 
> On Mon, May 5, 2014 at 1:34 PM, Matei Zaharia  wrote:
>> Very cool! Have you thought about sending this as a pull request? We’d be 
>> happy to maintain it inside Spark, though it might be interesting to find a 
>> single Python package that can manage clusters across both EC2 and GCE.
>> 
>> Matei
>> 
>>> On May 5, 2014, at 7:18 AM, Akhil Das  wrote:
>>> 
>>> Hi Sparkers,
>>> 
>>> We have created a quick spark_gce script which can launch a spark cluster 
>>> in the Google Cloud. I'm sharing it because it might be helpful for someone 
>>> using the Google Cloud for deployment rather than AWS.
>>> 
>>> Here's the link to the script
>>> 
>>> https://github.com/sigmoidanalytics/spark_gce
>>> 
>>> Feel free to use it and suggest any feedback around it.
>>> 
>>> In short here's what it does:
>>> 
>>> Just like the spark_ec2 script, this one also reads certain command-line 
>>> arguments (See the github page for more details) like the cluster name and 
>>> all, then starts the machines in the google cloud, sets up the network, 
>>> adds a 500GB empty disk to all machines, generate the ssh keys on master 
>>> and transfer it to all slaves and install java and downloads and configures 
>>> Spark/Shark/Hadoop. Also it starts the shark server automatically. 
>>> Currently the version is 0.9.1 but I'm happy to add/support more versions 
>>> if anyone is interested.
>>> 
>>> 
>>> Cheers.
>>> 
>>> 
>>> Thanks
>>> Best Regards
> 


Re: sbt run with spark.ContextCleaner ERROR

2014-05-05 Thread Tathagata Das
Well there has been more bug fixes added to RC3 as well. So its best to try
out the current master and let us know whether you still get the scary logs.

TD


On Sun, May 4, 2014 at 3:52 AM, wxhsdp  wrote:

> Hi, TD
>
> actually, i'am not very clear with my spark version. i check out from
> https://github.com/apache/spark/trunk on Apr 30.
>
> please tell me from where do you get the version Spark 1.0 RC3
>
> i do not call sparkContext.stop. now i add it to the end of my code
>
> here's the log
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/metrics/json,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/static,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/executors/json,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/executors,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/environment/json,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/environment,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/rdd,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/json,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/pool/json,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/pool,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage/json,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/json,null}
> 14/05/04 18:48:21 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages,null}
> 14/05/04 18:48:21 INFO ui.SparkUI: Stopped Spark web UI at
> http://ubuntu.local:4040
> 14/05/04 18:48:21 INFO scheduler.DAGScheduler: Stopping DAGScheduler
> 14/05/04 18:48:22 INFO spark.MapOutputTrackerMasterActor:
> MapOutputTrackerActor stopped!
> 14/05/04 18:48:23 INFO network.ConnectionManager: Selector thread was
> interrupted!
> 14/05/04 18:48:23 INFO network.ConnectionManager: ConnectionManager stopped
> 14/05/04 18:48:23 INFO storage.MemoryStore: MemoryStore cleared
> 14/05/04 18:48:23 INFO storage.BlockManager: BlockManager stopped
> 14/05/04 18:48:23 INFO storage.BlockManagerMasterActor: Stopping
> BlockManagerMaster
> 14/05/04 18:48:23 INFO storage.BlockManagerMaster: BlockManagerMaster
> stopped
> 14/05/04 18:48:23 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
> 14/05/04 18:48:23 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remote daemon shut down; proceeding with flushing remote transports.
> 14/05/04 18:48:23 INFO spark.SparkContext: Successfully stopped
> SparkContext
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #3
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #1
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #2
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #3
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #1
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #2
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #6
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #4
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #5
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #6
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #4
> 14/05/04 18:48:23 ERROR nio.AbstractNioSelector: Interrupted while wait for
> resources to be released #5
> 14/05/04 18:48:23 INFO Remoting: Remoting shut down
> 14/05/04 18:48:23 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
>
>
>
>
>
>
> --
> View this message in context:

Re: java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-0000/2/stdout (No such file or directory)

2014-05-05 Thread Tathagata Das
Do those file actually exist? Those stdout/stderr should have the output of
the spark's executors running in the workers, and its weird that they dont
exist. Could be permission issue - maybe the directories/files are not
being generated because it cannot?

TD


On Mon, May 5, 2014 at 3:06 AM, Francis.Hu wrote:

>  Hi,All
>
>
>
>
>
> We run a spark cluster with three workers.
>
> created a spark streaming application,
>
> then run the spark project using below command:
>
>
>
> shell> sbt run spark://192.168.219.129:7077 tcp://192.168.20.118:5556 foo
>
>
>
> we looked at the webui of workers, jobs failed without any error or info,
> but FileNotFoundException occurred in workers' log file as below:
>
> Is this an existent issue of spark?
>
>
>
>
>
> -in workers'
> logs/spark-francis-org.apache.spark.deploy.worker.Worker-1-ubuntu-4.out
>
>
>
> 14/05/05 02:39:39 WARN AbstractHttpConnection:
> /logPage/?appId=app-20140505053550-&executorId=2&logType=stdout
>
> java.io.FileNotFoundException:
> /test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or
> directory)
>
> at java.io.FileInputStream.open(Native Method)
>
> at java.io.FileInputStream.(FileInputStream.java:138)
>
> at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)
>
> at
> org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)
>
> at
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040)
>
> at
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976)
>
> at
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
>
> at
> org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)
>
> at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
>
> at org.eclipse.jetty.server.Server.handle(Server.java:363)
>
> at
> org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:483)
>
> at
> org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:920)
>
> at
> org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:982)
>
> at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635)
>
> at
> org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
> at
> org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
>
> at
> org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:628)
>
> at
> org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
>
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> at java.lang.Thread.run(Thread.java:722)
>
> 14/05/05 02:39:41 WARN AbstractHttpConnection:
> /logPage/?appId=app-20140505053550-&executorId=9&logType=stderr
>
> java.io.FileNotFoundException:
> /test/spark-0.9.1/work/app-20140505053550-/9/stderr (No such file or
> directory)
>
> at java.io.FileInputStream.open(Native Method)
>
> at java.io.FileInputStream.(FileInputStream.java:138)
>
> at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)
>
> at
> org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)
>
> at
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040)
>
> at
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976)
>
> at
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
>
> at
> org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)
>
> at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
>
> at org.eclipse.jetty.server.Server.handle(Server.java:363)
>
> at
> org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:483)
>
> at
> org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:920)
>
> at
> org.eclipse.jetty.server.Abstrac

Re: performance improvement on second operation...without caching?

2014-05-05 Thread Diana Carroll
Ethan, you're not the only one, which is why I was asking about this! :-)

Matei, thanks for your response. your answer explains the performance jump
in my code, but shows I've missed something key in my understanding of
Spark!

I was not aware until just now that map output was saved to disk (other
than if explicitly told to do use using persist.)  It raises almost as many
questions as it answers.

Where are the shuffle files saved?  Locally on the mapper nodes?  Is it the
same location that disk-spilled cache is saved to?  Doesn't the necessity
of saving to disk result in increased i/o that would slow the job down?  I
thought part of the goal of Spark was to do everything in memory unless the
user specifically chose to persist...thereby making a choice to incur
time/disk space expense up front in return for fast failure recovery?

Not that I'm complaining, mind you, but I do think people should be made
clearthis not only affects performance, but also, for instance, whether
the data is fresh/out of date.  I had assumed if I did not set caching,
that each time I performed an operation on an RDD, it would re-compute
based on lineage, including re-reading the files...so I didn't have to
worry about the possibility of my file content changing.  But if it's
auto-caching shuffle files, my base files won't get re-read even if the
content has changed. (Or does it check timestamps?)

Thanks,
Diana








On Mon, May 5, 2014 at 11:07 AM, Ethan Jewett  wrote:

> Thanks Patrick and Matei for the clarification. I actually have to update
> some code now, as I was apparently relying on the fact that the output
> files are being re-used. Explains some edge-case behavior that I've seen.
>
> For me, at least, I read the guide, did some tests on fairly extensive RDD
> dependency graphs, saw that tasks earlier in the dependency graphs were not
> being regenerated and assumed (very much incorrectly I just found out!)
> that it was because the RDDs themselves were being cached. I wonder if
> there is a way to explain this distinction concisely in the programming
> guide. Or maybe I'm the only one that went down this incorrect learning
> path :-)
>
> Ethan
>
>
> On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia wrote:
>
>> Yes, this happens as long as you use the same RDD. For example say you do
>> the following:
>>
>> data1 = sc.textFile(…).map(…).reduceByKey(…)
>> data1.count()
>> data1.filter(…).count()
>>
>> The first count() causes outputs of the map/reduce pair in there to be
>> written out to shuffle files. Next time you do a count, on either this RDD
>> or a child (e.g. after the filter), we notice that output files were
>> already generated for this shuffle so we don’t rerun the map stage. Note
>> that the output does get read again over the network, which is kind of
>> wasteful (if you really wanted to reuse this as quickly as possible you’d
>> use cache()).
>>
>> Matei
>>
>> On May 3, 2014, at 8:44 PM, Koert Kuipers  wrote:
>>
>> Hey Matei,
>> Not sure i understand that. These are 2 separate jobs. So the second job
>> takes advantage of the fact that there is map output left somewhere on disk
>> from the first job, and re-uses that?
>>
>>
>> On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia wrote:
>>
>>> Hi Diana,
>>>
>>> Apart from these reasons, in a multi-stage job, Spark saves the map
>>> output files from map stages to the filesystem, so it only needs to rerun
>>> the last reduce stage. This is why you only saw one stage executing. These
>>> files are saved for fault recovery but they speed up subsequent runs.
>>>
>>> Matei
>>>
>>> On May 3, 2014, at 5:21 PM, Patrick Wendell  wrote:
>>>
>>> Ethan,
>>>
>>> What you said is actually not true, Spark won't cache RDD's unless you
>>> ask it to.
>>>
>>> The observation here - that running the same job can speed up
>>> substantially even without caching - is common. This is because other
>>> components in the stack are performing caching and optimizations. Two that
>>> can make a huge difference are:
>>>
>>> 1. The OS buffer cache. Which will keep recently read disk blocks in
>>> memory.
>>> 2. The Java just-in-time compiler (JIT) which will use runtime profiling
>>> to significantly speed up execution speed.
>>>
>>> These can make a huge difference if you are running the same job
>>> over-and-over. And there are other things like the OS network stack
>>> increasing TCP windows and so fourth. These will all improve response time
>>> as a spark program executes.
>>>
>>>
>>> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett  wrote:
>>>
 I believe Spark caches RDDs it has memory for regardless of whether you
 actually call the 'cache' method on the RDD. The 'cache' method just tips
 off Spark that the RDD should have higher priority. At least, that is my
 experience and it seems to correspond with your experience and with my
 recollection of other discussions on this topic on the list. However, going
 back and looking at the programming guide, this is not

Re: spark streaming kafka output

2014-05-05 Thread Tathagata Das
There is not in-built code in Spark Streaming to output to Kafka yet.
However, I have heard people have use Twitter Storehaus with foreachRDD and
Storehaus has a kafka output. Something that you might look into.

TD


On Sun, May 4, 2014 at 11:45 PM, Weide Zhang  wrote:

> Hi ,
>
> Is there any code to implement a kafka output for spark streaming? My use
> case is all the output need to be dumped back to kafka cluster again after
> data is processed ?  What will be guideline to implement such function ? I
> heard foreachRDD will create one instance of producer per batch ? If so,
> will that hurt performance ?
>
> Thanks,
>
> Weide
>
>
>


Re: spark streaming question

2014-05-05 Thread Tathagata Das
One main reason why Spark Streaming can achieve higher throughput than
Storm is because Spark Streaming operates in coarser-grained batches -
second-scale massive batches - which reduce per-tuple of overheads in
shuffles, and other kinds of data movements, etc.

Note that, this is also true that this increased throughput does not come
for free: larger batches ---> larger end-to-end latency. Storm may give a
lower end-to-end latency than Spark Streaming (second-scale latency with
second-scale batches). However, we have observed that for a large variety
of streaming usecases, people are often okay with second-scale latencies
but find it much harder work around the atleast-once
semantics  (double-counting, etc.) and lack of in-built state management
(state kept locally in worker can get lost if worker dies). Plus Spark
Streaming has the major advantage of having a simpler, higher-level API
than Storm and the whole Spark ecosystem (Spark SQL, MLlib, etc.) around it
that it can use for writing streaming analytics applications very easily.

Regarding Trident, we have heard from many developers that Trident gives
lower throughput than Storm due to its transactional guarantees. Its hard
to say the reasons behind the performance penalty without doing a very
detailed head-to-head analysis.

TD


On Sun, May 4, 2014 at 5:11 PM, Chris Fregly  wrote:

> great questions, weide.  in addition, i'd also like to hear more about how
> to horizontally scale a spark-streaming cluster.
>
> i've gone through the samples (standalone mode) and read the
> documentation, but it's still not clear to me how to scale this puppy out
> under high load.  i assume i add more receivers (kinesis, flume, etc), but
> physically how does this work?
>
> @TD:  can you comment?
>
> thanks!
>
> -chris
>
>
> On Sun, May 4, 2014 at 2:10 PM, Weide Zhang  wrote:
>
>> Hi ,
>>
>> It might be a very general question to ask here but I'm curious to know
>> why spark streaming can achieve better throughput than storm as claimed in
>> the spark streaming paper. Does it depend on certain use cases and/or data
>> source ? What drives better performance in spark streaming case or in other
>> ways, what makes storm not as performant as spark streaming ?
>>
>> Also, in order to guarantee exact-once semantics when node failure
>> happens,  spark makes replicas of RDDs and checkpoints so that data can be
>> recomputed on the fly while on Trident case, they use transactional object
>> to persist the state and result but it's not obvious to me which approach
>> is more costly and why ? Any one can provide some experience here ?
>>
>> Thanks a lot,
>>
>> Weide
>>
>
>


Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-05-05 Thread Gerard Maas
Hi all,

I'm currently working on creating a set of docker images to facilitate
local development with Spark/streaming on Mesos (+zk, hdfs, kafka)

After solving the initial hurdles to get things working together in docker
containers, now everything seems to start-up correctly and the mesos UI
shows slaves as they are started.

I'm trying to submit a job from IntelliJ and the jobs submissions seem to
get lost in Mesos translation. The logs are not helping me to figure out
what's wrong, so I'm posting them here in the hope that they can ring a
bell and somebdoy could provide me a hint on what's wrong/missing with my
setup.


 DRIVER (IntelliJ running a Job.scala main) 
14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
SHUFFLE_BLOCK_MANAGER
14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than
1399319251962
14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older
than 1399319251962
14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
BROADCAST_VARS
14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
BLOCK_MANAGER
14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
HTTP_BROADCAST
14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
MAP_OUTPUT_TRACKER
14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
SPARK_CONTEXT


 MESOS MASTER 
I0505 19:52:39.718080   388 master.cpp:690] Registering framework
201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115
I0505 19:52:39.718261   388 master.cpp:493] Framework
201405051517-67113388-5050-383-6995 disconnected
I0505 19:52:39.718277   389 hierarchical_allocator_process.hpp:332] Added
framework 201405051517-67113388-5050-383-6995
I0505 19:52:39.718312   388 master.cpp:520] Giving framework
201405051517-67113388-5050-383-6995 0ns to failover
I0505 19:52:39.718431   389 hierarchical_allocator_process.hpp:408]
Deactivated framework 201405051517-67113388-5050-383-6995
W0505 19:52:39.718459   388 master.cpp:1388] Master returning resources
offered to framework 201405051517-67113388-5050-383-6995 because the
framework has terminated or is inactive
I0505 19:52:39.718567   388 master.cpp:1376] Framework failover timeout,
removing framework 201405051517-67113388-5050-383-6995



 MESOS SLAVE 
I0505 19:49:27.66201920 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6803 by master@172.17.0.4:5050
W0505 19:49:27.66207220 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6803
I0505 19:49:28.66215318 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6804 by master@172.17.0.4:5050
W0505 19:49:28.66221218 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6804
I0505 19:49:29.66219913 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6805 by master@172.17.0.4:5050
W0505 19:49:29.66225613 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6805
I0505 19:49:30.66244316 slave.cpp:1191] Asked to shut down framework
201405051517-67113388-5050-383-6806 by master@172.17.0.4:5050
W0505 19:49:30.66248916 slave.cpp:1206] Cannot shut down unknown
framework 201405051517-67113388-5050-383-6806


Thanks in advance,

Gerard.


Re: Spark Streaming and JMS

2014-05-05 Thread Tathagata Das
A few high-level suggestions.

1. I recommend using the new Receiver API in almost-released Spark 1.0 (see
branch-1.0 / master branch on github). Its a slightly better version of the
earlier NetworkReceiver, as it hides away blockgenerator (which needed to
be unnecessarily manually started and stopped) and add other lifecycle
management methods like stop, restart, reportError to deal with errors in
receiving data. Also, adds ability to write custom receiver from Java. Take
a look at this 
example
of
writing custom receiver in the new API. I am updating the custom receiver
guide right now (https://github.com/apache/spark/pull/652).

2. Once you create a JMSReceiver class by extending
NetworkReceiver/Receiver, you can create DStream out of the receiver by

val jmsStream = ssc.networkStream(new JMSReceiver(""))

3. As far as i understand from seeing the docs of
akka,camel.Consumer,
it is essentially a specialized Akka actor. For Akka actors, there is a
ssc.actorStream, where you can specify your own actor class. You get actor
supervision (and therefore error handling, etc.) with that. See the example
AkkaWordCount - old style using
NetworkReceiver,
or new style using
Receiver
.

I havent personally played around with JMS before so cant comment much on
JMS specific intricacies.

TD



On Mon, May 5, 2014 at 5:31 AM, Patrick McGloin
wrote:

> Hi all,
>
> Is there a "best practice" for subscribing to JMS with Spark Streaming?  I
> have searched but not found anything conclusive.
>
> In the absence of a standard practice the solution I was thinking of was
> to use Akka + Camel (akka.camel.Consumer) to create a subscription for a
> Spark Streaming Custom Receiver.  So the actor would look something like
> this:
>
> class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
> Consumer {
>   //e.g. "jms:sonicmq://localhost:2506/queue?destination=SampleQ1"
>   def endpointUri = jmsURI
>   lazy val blockGenerator = new
> BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
>
>   protected override def onStart() {
> blockGenerator.start
>   }
>
>   def receive = {
> case msg: CamelMessage => { blockGenerator += msg.body }
> case _ => { /* ... */ }
>   }
>
>   protected override def onStop() {
> blockGenerator.stop
>   }
> }
>
> And then in the main application create receivers like this:
>
> val ssc = new StreamingContext(...)
> object tascQueue extends JmsReceiver[String](ssc) {
> override def getReceiver():JmsReceiver[String] = {
>  new JmsReceiver("jms
> :sonicmq://localhost:2506/queue?destination=TascQueue")
>  }
> }
> ssc.registerInputStream(tascQueue)
>
> Is this the best way to go?
>
> Best regards,
> Patrick
>


Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-05-05 Thread Benjamin
Hi,

Before considering running on Mesos, did you try to submit the application
on Spark deployed without Mesos on Docker containers ?

Currently investigating this idea to deploy quickly a complete set of
clusters with Docker, I'm interested by your findings on sharing the
settings of Kafka and Zookeeper across nodes. How many broker and zookeeper
do you use ?

Regards,



On Mon, May 5, 2014 at 10:11 PM, Gerard Maas  wrote:

> Hi all,
>
> I'm currently working on creating a set of docker images to facilitate
> local development with Spark/streaming on Mesos (+zk, hdfs, kafka)
>
> After solving the initial hurdles to get things working together in docker
> containers, now everything seems to start-up correctly and the mesos UI
> shows slaves as they are started.
>
> I'm trying to submit a job from IntelliJ and the jobs submissions seem to
> get lost in Mesos translation. The logs are not helping me to figure out
> what's wrong, so I'm posting them here in the hope that they can ring a
> bell and somebdoy could provide me a hint on what's wrong/missing with my
> setup.
>
>
>  DRIVER (IntelliJ running a Job.scala main) 
> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
> SHUFFLE_BLOCK_MANAGER
> 14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than
> 1399319251962
> 14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older
> than 1399319251962
> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
> BROADCAST_VARS
> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
> BLOCK_MANAGER
> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
> HTTP_BROADCAST
> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
> MAP_OUTPUT_TRACKER
> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
> SPARK_CONTEXT
>
>
>  MESOS MASTER 
> I0505 19:52:39.718080   388 master.cpp:690] Registering framework
> 201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115
> I0505 19:52:39.718261   388 master.cpp:493] Framework
> 201405051517-67113388-5050-383-6995 disconnected
> I0505 19:52:39.718277   389 hierarchical_allocator_process.hpp:332] Added
> framework 201405051517-67113388-5050-383-6995
> I0505 19:52:39.718312   388 master.cpp:520] Giving framework
> 201405051517-67113388-5050-383-6995 0ns to failover
> I0505 19:52:39.718431   389 hierarchical_allocator_process.hpp:408]
> Deactivated framework 201405051517-67113388-5050-383-6995
> W0505 19:52:39.718459   388 master.cpp:1388] Master returning resources
> offered to framework 201405051517-67113388-5050-383-6995 because the
> framework has terminated or is inactive
> I0505 19:52:39.718567   388 master.cpp:1376] Framework failover timeout,
> removing framework 201405051517-67113388-5050-383-6995
>
>
>
>  MESOS SLAVE 
> I0505 19:49:27.66201920 slave.cpp:1191] Asked to shut down framework
> 201405051517-67113388-5050-383-6803 by master@172.17.0.4:5050
> W0505 19:49:27.66207220 slave.cpp:1206] Cannot shut down unknown
> framework 201405051517-67113388-5050-383-6803
> I0505 19:49:28.66215318 slave.cpp:1191] Asked to shut down framework
> 201405051517-67113388-5050-383-6804 by master@172.17.0.4:5050
> W0505 19:49:28.66221218 slave.cpp:1206] Cannot shut down unknown
> framework 201405051517-67113388-5050-383-6804
> I0505 19:49:29.66219913 slave.cpp:1191] Asked to shut down framework
> 201405051517-67113388-5050-383-6805 by master@172.17.0.4:5050
> W0505 19:49:29.66225613 slave.cpp:1206] Cannot shut down unknown
> framework 201405051517-67113388-5050-383-6805
> I0505 19:49:30.66244316 slave.cpp:1191] Asked to shut down framework
> 201405051517-67113388-5050-383-6806 by master@172.17.0.4:5050
> W0505 19:49:30.66248916 slave.cpp:1206] Cannot shut down unknown
> framework 201405051517-67113388-5050-383-6806
>
>
> Thanks in advance,
>
> Gerard.
>



-- 
Benjamin Bouillé
+33 665 050 285


Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-05-05 Thread Gerard Maas
Hi Benjamin,

Yes, we initially used a modified version of the AmpLabs docker scripts
[1]. The amplab docker images are a good starting point.
One of the biggest hurdles has been HDFS, which requires reverse-DNS and I
didn't want to go the dnsmasq route to keep the containers relatively
simple to use without the need of external scripts. Ended up running a
1-node setup nnode+dnode. I'm still looking for a better solution for HDFS
[2]

Our usecase using docker is to easily create local dev environments both
for development and for automated functional testing (using cucumber). My
aim is to strongly reduce the time of the develop-deploy-test cycle.
That  also means that we run the minimum number of instances required to
have a functionally working setup. E.g. 1 Zookeeper, 1 Kafka broker, ...

For the actual cluster deployment we have Chef-based devops toolchain that
 put things in place on public cloud providers.
Personally, I think Docker rocks and would like to replace those complex
cookbooks with Dockerfiles once the technology is mature enough.

-greetz, Gerard.

[1] https://github.com/amplab/docker-scripts
[2]
http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns


On Mon, May 5, 2014 at 11:00 PM, Benjamin  wrote:

> Hi,
>
> Before considering running on Mesos, did you try to submit the application
> on Spark deployed without Mesos on Docker containers ?
>
> Currently investigating this idea to deploy quickly a complete set of
> clusters with Docker, I'm interested by your findings on sharing the
> settings of Kafka and Zookeeper across nodes. How many broker and zookeeper
> do you use ?
>
> Regards,
>
>
>
> On Mon, May 5, 2014 at 10:11 PM, Gerard Maas wrote:
>
>> Hi all,
>>
>> I'm currently working on creating a set of docker images to facilitate
>> local development with Spark/streaming on Mesos (+zk, hdfs, kafka)
>>
>> After solving the initial hurdles to get things working together in
>> docker containers, now everything seems to start-up correctly and the mesos
>> UI shows slaves as they are started.
>>
>> I'm trying to submit a job from IntelliJ and the jobs submissions seem to
>> get lost in Mesos translation. The logs are not helping me to figure out
>> what's wrong, so I'm posting them here in the hope that they can ring a
>> bell and somebdoy could provide me a hint on what's wrong/missing with my
>> setup.
>>
>>
>>  DRIVER (IntelliJ running a Job.scala main) 
>> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
>> SHUFFLE_BLOCK_MANAGER
>> 14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older than
>> 1399319251962
>> 14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older
>> than 1399319251962
>> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
>> BROADCAST_VARS
>> 14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
>> BLOCK_MANAGER
>> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
>> HTTP_BROADCAST
>> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
>> MAP_OUTPUT_TRACKER
>> 14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
>> SPARK_CONTEXT
>>
>>
>>  MESOS MASTER 
>> I0505 19:52:39.718080   388 master.cpp:690] Registering framework
>> 201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115
>> I0505 19:52:39.718261   388 master.cpp:493] Framework
>> 201405051517-67113388-5050-383-6995 disconnected
>> I0505 19:52:39.718277   389 hierarchical_allocator_process.hpp:332] Added
>> framework 201405051517-67113388-5050-383-6995
>> I0505 19:52:39.718312   388 master.cpp:520] Giving framework
>> 201405051517-67113388-5050-383-6995 0ns to failover
>> I0505 19:52:39.718431   389 hierarchical_allocator_process.hpp:408]
>> Deactivated framework 201405051517-67113388-5050-383-6995
>> W0505 19:52:39.718459   388 master.cpp:1388] Master returning resources
>> offered to framework 201405051517-67113388-5050-383-6995 because the
>> framework has terminated or is inactive
>> I0505 19:52:39.718567   388 master.cpp:1376] Framework failover timeout,
>> removing framework 201405051517-67113388-5050-383-6995
>>
>>
>>
>>  MESOS SLAVE 
>> I0505 19:49:27.66201920 slave.cpp:1191] Asked to shut down framework
>> 201405051517-67113388-5050-383-6803 by master@172.17.0.4:5050
>> W0505 19:49:27.66207220 slave.cpp:1206] Cannot shut down unknown
>> framework 201405051517-67113388-5050-383-6803
>> I0505 19:49:28.66215318 slave.cpp:1191] Asked to shut down framework
>> 201405051517-67113388-5050-383-6804 by master@172.17.0.4:5050
>> W0505 19:49:28.66221218 slave.cpp:1206] Cannot shut down unknown
>> framework 201405051517-67113388-5050-383-6804
>> I0505 19:49:29.66219913 slave.cpp:1191] Asked to shut down framework
>> 201405051517-67113388-5050-383-6805 by master@172.17.0.4:5050
>> W0505 19:49:29.66225613 slave.cpp:1206] Cannot shut down unknown
>> framework 201405051517-67113388-5050-383-6805
>> I0505 19:49:30.66244316

Increase Stack Size Workers

2014-05-05 Thread Andrea Esposito
Hi there,

i'm doing an iterative algorithm and sometimes i ended up with
StackOverflowError, doesn't matter if i do checkpoints or not.

Remaining i don't understand why this is happening, i figure out that
increasing the stack size is a workaround.

Developing using "local[n]" so the local mode i can set the stack size
through the -Xss parameter. How can i do the same for the standalone mode
for each worker? Setting it as "java -Xss16m Worker" seems useless because
the actual computation are done on CoarseGrainExecutor..

Best,
EA


Spark 0.9.1 - saveAsSequenceFile and large RDD

2014-05-05 Thread Allen Lee
Hi,

Fairly new to Spark.  I'm using Spark's saveAsSequenceFile() to write large
Sequence Files to HDFS.  The Sequence Files need to be large to be
efficiently accessed in HDFS, preferably larger than Hadoop's block size,
64MB.  The task works for files smaller than 64 MiB (with a warning for
sequence files close to 64 MiB).  For files larger than 64 MiB, the task
fails with a libprotobuf error. Here is the full log:

14/05/05 18:18:00 INFO MesosSchedulerBackend: Registered as framework ID
201404231353-1315739402-5050-26649-0091
14/05/05 18:18:12 INFO SequenceFileRDDFunctions: Saving as sequence file of
type (LongWritable,BytesWritable)
14/05/05 18:18:14 INFO SparkContext: Starting job: saveAsSequenceFile at
X.scala:171
14/05/05 18:18:14 INFO DAGScheduler: Got job 0 (saveAsSequenceFile at
X.scala:171) with 1 output partitions (allowLocal=false)
14/05/05 18:18:14 INFO DAGScheduler: Final stage: Stage 0
(saveAsSequenceFile at X.scala:171)
14/05/05 18:18:14 INFO DAGScheduler: Parents of final stage: List()
14/05/05 18:18:14 INFO DAGScheduler: Missing parents: List()
14/05/05 18:18:14 INFO DAGScheduler: Submitting Stage 0
(ParallelCollectionRDD[0] at makeRDD at X.scala:170), which has no
missing parents
14/05/05 18:18:19 INFO DAGScheduler: Submitting 1 missing tasks from Stage
0 (ParallelCollectionRDD[0] at makeRDD at X.scala:170)
14/05/05 18:18:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/05/05 18:18:19 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 201404231353-1315739402-5050-26649-3: dn-04 (PROCESS_LOCAL)
14/05/05 18:18:23 INFO TaskSetManager: Serialized task 0.0:0 as 113006452
bytes in 3890 ms
[libprotobuf ERROR google/protobuf/io/coded_stream.cc:171] A protocol
message was rejected because it was too big (more than 67108864 bytes).  To
increase the limit (or to disable these warnings), see
CodedInputStream::SetTotalBytesLimit() in google/protobuf/io/coded_stream.h.
F0505 18:18:24.616025 27889 construct.cpp:48] Check failed: parsed
Unexpected failure while parsing protobuf
*** Check failure stack trace: ***
@ 0x7fc8d49ba96d  google::LogMessage::Fail()
@ 0x7fc8d49be987  google::LogMessage::SendToLog()
@ 0x7fc8d49bc809  google::LogMessage::Flush()
@ 0x7fc8d49bcb0d  google::LogMessageFatal::~LogMessageFatal()



The code is fairly simple

val kv = 

//set parallelism to 1 to keep the file from being partitioned
sc.makeRDD(kv,1)
   .saveAsSequenceFile(path)


Does anyone have any pointers on how to get past this?

Thanks,

-- 
*Allen Lee*
Software Engineer
MediaCrossing Inc.


Re: Incredible slow iterative computation

2014-05-05 Thread Andrea Esposito
Update: Checkpointing it doesn't perform. I checked by the "isCheckpointed"
method but it returns always false. ???


2014-05-05 23:14 GMT+02:00 Andrea Esposito :

> Checkpoint doesn't help it seems. I do it at each iteration/superstep.
>
> Looking deeply, the RDDs are recomputed just few times at the initial
> 'phase' after they aren't recomputed anymore. I attach screenshots:
> bootstrap phase, recompute section and after. This is still unexpected
> because i persist all the intermediate results.
>
> Anyway the time of each iteration degrates perpetually, as instance: at
> the first superstep it takes 3 sec and at 70 superstep it takes 8 sec.
>
> An iteration, looking at the screenshot, is from row 528 to 122.
>
> Any idea where to investigate?
>
>
> 2014-05-02 22:28 GMT+02:00 Andrew Ash :
>
> If you end up with a really long dependency tree between RDDs (like 100+)
>> people have reported success with using the .checkpoint() method.  This
>> computes the RDD and then saves it, flattening the dependency tree.  It
>> turns out that having a really long RDD dependency graph causes
>> serialization sizes of tasks to go up, plus any failures causes a long
>> sequence of operations to regenerate the missing partition.
>>
>> Maybe give that a shot and see if it helps?
>>
>>
>> On Fri, May 2, 2014 at 3:29 AM, Andrea Esposito wrote:
>>
>>> Sorry for the very late answer.
>>>
>>> I carefully follow what you have pointed out and i figure out that the
>>> structure used for each record was too big with many small objects.
>>> Changing it the memory usage drastically decrease.
>>>
>>> Despite that i'm still struggling with the behaviour of decreasing
>>> performance along supersteps. Now the memory footprint is much less than
>>> before and GC time is not noticeable anymore.
>>> I supposed that some RDDs are recomputed and watching carefully the
>>> stages there is evidence of that but i don't understand why it's happening.
>>>
>>> Recalling my usage pattern:
>>>
 newRdd = oldRdd.map(myFun).persist(myStorageLevel)

>>> newRdd.foreach(x => {}) // Force evaluation

>>> oldRdd.unpersist(true)

>>>
>>> According to my usage pattern i tried to don't unpersist the
>>> intermediate RDDs (i.e. oldRdd) but nothing change.
>>>
>>> Any hints? How could i debug this?
>>>
>>>
>>>
>>> 2014-04-14 12:55 GMT+02:00 Andrew Ash :
>>>
>>> A lot of your time is being spent in garbage collection (second image).
  Maybe your dataset doesn't easily fit into memory?  Can you reduce the
 number of new objects created in myFun?

 How big are your heap sizes?

 Another observation is that in the 4th image some of your RDDs are
 massive and some are tiny.


 On Mon, Apr 14, 2014 at 11:45 AM, Andrea Esposito wrote:

> Hi all,
>
> i'm developing an iterative computation over graphs but i'm struggling
> with some embarrassing low performaces.
>
> The computation is heavily iterative and i'm following this rdd usage
> pattern:
>
> newRdd = oldRdd.map(myFun).persist(myStorageLevel)
>>
> newRdd.foreach(x => {}) // Force evaluation
>> oldRdd.unpersist(true)
>>
>
> I'm using a machine equips by 30 cores and 120 GB of RAM.
> As an example i've run with a small graph of 4000 verts and 80
> thousand edges and the performance at the first iterations are 10+ minutes
> and after they take lots more.
> I attach the Spark UI screenshots of just the first 2 iterations.
>
> I tried with MEMORY_ONLY_SER and MEMORY_AND_DISK_SER and also i
> changed the "spark.shuffle.memoryFraction" to 0.3 but nothing changed 
> (with
> so lot of RAM for 4E10 verts these settings are quite pointless i guess).
>
> How should i continue to investigate?
>
> Any advices are very very welcome, thanks.
>
> Best,
> EA
>


>>>
>>
>


Re: Incredible slow iterative computation

2014-05-05 Thread Matei Zaharia
It may be slow because of serialization (have you tried Kryo there?) or just 
because at some point the data starts to be on disk. Try profiling the tasks 
while it’s running (e.g. just use jstack to see what they’re doing) and 
definitely try Kryo if you’re currently using Java Serialization. Kryo will 
reduce both the size on disk and the serialization time.

Matei

On May 5, 2014, at 2:54 PM, Andrea Esposito  wrote:

> Update: Checkpointing it doesn't perform. I checked by the "isCheckpointed" 
> method but it returns always false. ???
> 
> 
> 2014-05-05 23:14 GMT+02:00 Andrea Esposito :
> Checkpoint doesn't help it seems. I do it at each iteration/superstep.
> 
> Looking deeply, the RDDs are recomputed just few times at the initial 'phase' 
> after they aren't recomputed anymore. I attach screenshots: bootstrap phase, 
> recompute section and after. This is still unexpected because i persist all 
> the intermediate results.
> 
> Anyway the time of each iteration degrates perpetually, as instance: at the 
> first superstep it takes 3 sec and at 70 superstep it takes 8 sec.
> 
> An iteration, looking at the screenshot, is from row 528 to 122.
> 
> Any idea where to investigate?
> 
> 
> 2014-05-02 22:28 GMT+02:00 Andrew Ash :
> 
> If you end up with a really long dependency tree between RDDs (like 100+) 
> people have reported success with using the .checkpoint() method.  This 
> computes the RDD and then saves it, flattening the dependency tree.  It turns 
> out that having a really long RDD dependency graph causes serialization sizes 
> of tasks to go up, plus any failures causes a long sequence of operations to 
> regenerate the missing partition.
> 
> Maybe give that a shot and see if it helps?
> 
> 
> On Fri, May 2, 2014 at 3:29 AM, Andrea Esposito  wrote:
> Sorry for the very late answer.
> 
> I carefully follow what you have pointed out and i figure out that the 
> structure used for each record was too big with many small objects. Changing 
> it the memory usage drastically decrease.
> 
> Despite that i'm still struggling with the behaviour of decreasing 
> performance along supersteps. Now the memory footprint is much less than 
> before and GC time is not noticeable anymore.
> I supposed that some RDDs are recomputed and watching carefully the stages 
> there is evidence of that but i don't understand why it's happening.
> 
> Recalling my usage pattern: 
> newRdd = oldRdd.map(myFun).persist(myStorageLevel)
> newRdd.foreach(x => {}) // Force evaluation
> oldRdd.unpersist(true) 
>  
> According to my usage pattern i tried to don't unpersist the intermediate 
> RDDs (i.e. oldRdd) but nothing change.
> 
> Any hints? How could i debug this?
> 
> 
> 
> 2014-04-14 12:55 GMT+02:00 Andrew Ash :
> 
> A lot of your time is being spent in garbage collection (second image).  
> Maybe your dataset doesn't easily fit into memory?  Can you reduce the number 
> of new objects created in myFun?
> 
> How big are your heap sizes?
> 
> Another observation is that in the 4th image some of your RDDs are massive 
> and some are tiny.  
> 
> 
> On Mon, Apr 14, 2014 at 11:45 AM, Andrea Esposito  wrote:
> Hi all,
> 
> i'm developing an iterative computation over graphs but i'm struggling with 
> some embarrassing low performaces.
> 
> The computation is heavily iterative and i'm following this rdd usage pattern:
> 
> newRdd = oldRdd.map(myFun).persist(myStorageLevel)
> newRdd.foreach(x => {}) // Force evaluation
> oldRdd.unpersist(true)
> 
> I'm using a machine equips by 30 cores and 120 GB of RAM.
> As an example i've run with a small graph of 4000 verts and 80 thousand edges 
> and the performance at the first iterations are 10+ minutes and after they 
> take lots more.
> I attach the Spark UI screenshots of just the first 2 iterations.
> 
> I tried with MEMORY_ONLY_SER and MEMORY_AND_DISK_SER and also i changed the 
> "spark.shuffle.memoryFraction" to 0.3 but nothing changed (with so lot of RAM 
> for 4E10 verts these settings are quite pointless i guess).
> 
> How should i continue to investigate?
> 
> Any advices are very very welcome, thanks.
> 
> Best,
> EA
> 
> 
> 
> 
> 



How can adding a random count() change the behavior of my program?

2014-05-05 Thread Nicholas Chammas
I’m running into something very strange today. I’m getting an error on the
follow innocuous operations.

a = sc.textFile('s3n://...')
a = a.repartition(8)
a = a.map(...)
c = a.countByKey() # ERRORs out on this action. See below for traceback. [1]

If I add a count() right after the repartition(), this error magically goes
away.

a = sc.textFile('s3n://...')
a = a.repartition(8)
print a.count()
a = a.map(...)
c = a.countByKey() # A-OK! WTF?

To top it off, this “fix” is inconsistent. Sometimes, I still get this
error.

This is strange. How do I get to the bottom of this?

Nick

[1] Here’s the traceback:

Traceback (most recent call last):
  File "", line 7, in 
  File "file.py", line 187, in function_blah
c = a.countByKey()
  File "/root/spark/python/pyspark/rdd.py", line 778, in countByKey
return self.map(lambda x: x[0]).countByValue()
  File "/root/spark/python/pyspark/rdd.py", line 624, in countByValue
return self.mapPartitions(countPartition).reduce(mergeMaps)
  File "/root/spark/python/pyspark/rdd.py", line 505, in reduce
vals = self.mapPartitions(func).collect()
  File "/root/spark/python/pyspark/rdd.py", line 469, in collect
bytesInJava = self._jrdd.collect().iterator()
  File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 537, in __call__
  File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o46.collect.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-can-adding-a-random-count-change-the-behavior-of-my-program-tp5406.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Incredible slow iterative computation

2014-05-05 Thread Earthson
checkpoint seems to be just add a CheckPoint mark? You need an action after
marked it. I have tried it with success:)

newRdd = oldRdd.map(myFun).persist(myStorageLevel)
newRdd.checkpoint // < {}) // Force evaluation
newRdd.isCheckpointed // true here
oldRdd.unpersist(true) 




If you have new broadcast object for each step of iteration, broadcast will
eat up all of the memory. You may need to set "spark.cleaner.ttl" to a small
enough value.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incredible-slow-iterative-computation-tp4204p5407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


答复: java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-0000/2/stdout (No such file or directory)

2014-05-05 Thread Francis . Hu
The file does not exist in fact and no permission issue. 

 

francis@ubuntu-4:/test/spark-0.9.1$ ll work/app-20140505053550-/

total 24

drwxrwxr-x  6 francis francis 4096 May  5 05:35 ./

drwxrwxr-x 11 francis francis 4096 May  5 06:18 ../

drwxrwxr-x  2 francis francis 4096 May  5 05:35 2/

drwxrwxr-x  2 francis francis 4096 May  5 05:35 4/

drwxrwxr-x  2 francis francis 4096 May  5 05:35 7/

drwxrwxr-x  2 francis francis 4096 May  5 05:35 9/

 

Francis

 

发件人: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
发送时间: Tuesday, May 06, 2014 3:45
收件人: user@spark.apache.org
主题: Re: java.io.FileNotFoundException: 
/test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or 
directory)

 

Do those file actually exist? Those stdout/stderr should have the output of the 
spark's executors running in the workers, and its weird that they dont exist. 
Could be permission issue - maybe the directories/files are not being generated 
because it cannot?

 

TD

 

On Mon, May 5, 2014 at 3:06 AM, Francis.Hu  wrote:

Hi,All

 

 

We run a spark cluster with three workers. 

created a spark streaming application,

then run the spark project using below command:

 

shell> sbt run spark://192.168.219.129:7077 tcp://192.168.20.118:5556 foo

 

we looked at the webui of workers, jobs failed without any error or info, but 
FileNotFoundException occurred in workers' log file as below:

Is this an existent issue of spark? 

 

 

-in workers' 
logs/spark-francis-org.apache.spark.deploy.worker.Worker-1-ubuntu-4.out

 

14/05/05 02:39:39 WARN AbstractHttpConnection: 
/logPage/?appId=app-20140505053550-&executorId=2&logType=stdout

java.io.FileNotFoundException: 
/test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or 
directory)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.(FileInputStream.java:138)

at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)

at org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)

at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040)

at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976)

at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)

at 
org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)

at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)

at org.eclipse.jetty.server.Server.handle(Server.java:363)

at 
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:483)

at 
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:920)

at 
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:982)

at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635)

at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)

at 
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)

at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:628)

at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)

at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)

at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)

at java.lang.Thread.run(Thread.java:722)

14/05/05 02:39:41 WARN AbstractHttpConnection: 
/logPage/?appId=app-20140505053550-&executorId=9&logType=stderr

java.io.FileNotFoundException: 
/test/spark-0.9.1/work/app-20140505053550-/9/stderr (No such file or 
directory)

at java.io.FileInputStream.open(Native Method)

at java.io.FileInputStream.(FileInputStream.java:138)

at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)

at 
org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)

at org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)

at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040)

at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976)

at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHand

Re: 答复: java.io.FileNotFoundException: /test/spark-0.9.1/work/app-20140505053550-0000/2/stdout (No such file or directory)

2014-05-05 Thread Tathagata Das
Can you check the Spark worker logs on that machine. Either from the web
ui, or directly. Should be /test/spark-XXX/logs/  See if that has any error.
If there is not permission issue, I am not why stdout and stderr is not
being generated.

TD


On Mon, May 5, 2014 at 7:13 PM, Francis.Hu wrote:

>  The file does not exist in fact and no permission issue.
>
>
>
> francis@ubuntu-4:/test/spark-0.9.1$ ll work/app-20140505053550-/
>
> total 24
>
> drwxrwxr-x  6 francis francis 4096 May  5 05:35 ./
>
> drwxrwxr-x 11 francis francis 4096 May  5 06:18 ../
>
> drwxrwxr-x  2 francis francis 4096 May  5 05:35 2/
>
> drwxrwxr-x  2 francis francis 4096 May  5 05:35 4/
>
> drwxrwxr-x  2 francis francis 4096 May  5 05:35 7/
>
> drwxrwxr-x  2 francis francis 4096 May  5 05:35 9/
>
>
>
> Francis
>
>
>
> *发件人:* Tathagata Das [mailto:tathagata.das1...@gmail.com]
> *发送时间:* Tuesday, May 06, 2014 3:45
> *收件人:* user@spark.apache.org
> *主题:* Re: java.io.FileNotFoundException:
> /test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or
> directory)
>
>
>
> Do those file actually exist? Those stdout/stderr should have the output
> of the spark's executors running in the workers, and its weird that they
> dont exist. Could be permission issue - maybe the directories/files are not
> being generated because it cannot?
>
>
>
> TD
>
>
>
> On Mon, May 5, 2014 at 3:06 AM, Francis.Hu 
> wrote:
>
> Hi,All
>
>
>
>
>
> We run a spark cluster with three workers.
>
> created a spark streaming application,
>
> then run the spark project using below command:
>
>
>
> shell> sbt run spark://192.168.219.129:7077 tcp://192.168.20.118:5556 foo
>
>
>
> we looked at the webui of workers, jobs failed without any error or info,
> but FileNotFoundException occurred in workers' log file as below:
>
> Is this an existent issue of spark?
>
>
>
>
>
> -in workers'
> logs/spark-francis-org.apache.spark.deploy.worker.Worker-1-ubuntu-4.out
>
>
>
> 14/05/05 02:39:39 WARN AbstractHttpConnection:
> /logPage/?appId=app-20140505053550-&executorId=2&logType=stdout
>
> java.io.FileNotFoundException:
> /test/spark-0.9.1/work/app-20140505053550-/2/stdout (No such file or
> directory)
>
> at java.io.FileInputStream.open(Native Method)
>
> at java.io.FileInputStream.(FileInputStream.java:138)
>
> at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI$$anonfun$6.apply(WorkerWebUI.scala:52)
>
> at
> org.apache.spark.ui.JettyUtils$$anon$1.handle(JettyUtils.scala:61)
>
> at
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1040)
>
> at
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:976)
>
> at
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
>
> at
> org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52)
>
> at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
>
> at org.eclipse.jetty.server.Server.handle(Server.java:363)
>
> at
> org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:483)
>
> at
> org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:920)
>
> at
> org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:982)
>
> at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:635)
>
> at
> org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
> at
> org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
>
> at
> org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:628)
>
> at
> org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
>
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> at java.lang.Thread.run(Thread.java:722)
>
> 14/05/05 02:39:41 WARN AbstractHttpConnection:
> /logPage/?appId=app-20140505053550-&executorId=9&logType=stderr
>
> java.io.FileNotFoundException:
> /test/spark-0.9.1/work/app-20140505053550-/9/stderr (No such file or
> directory)
>
> at java.io.FileInputStream.open(Native Method)
>
> at java.io.FileInputStream.(FileInputStream.java:138)
>
> at org.apache.spark.util.Utils$.offsetBytes(Utils.scala:687)
>
> at
> org.apache.spark.deploy.worker.ui.WorkerWebUI.logPage(WorkerWebUI.scala:119)
>
> at
> org.apac

How to use spark-submit

2014-05-05 Thread Stephen Boesch
I have a spark streaming application that uses the external streaming
modules (e.g. kafka, mqtt, ..) as well.  It is not clear how to properly
invoke the spark-submit script: what are the ---driver-class-path and/or
-Dspark.executor.extraClassPath parameters required?

 For reference, the following error is proving difficult to resolve:

java.lang.ClassNotFoundException:
org.apache.spark.streaming.examples.StreamingExamples


details about event log

2014-05-05 Thread wxhsdp
Hi,

i'am looking at the event log, i'am a little confuse about some metrics

here's the info of one task:

"Launch Time":1399336904603
"Finish Time":1399336906465
"Executor Run Time":1781
"Shuffle Read Metrics":"Shuffle Finish Time":1399336906027, "Fetch Wait
Time":0
"Shuffle Write Metrics":{"Shuffle Bytes Written":12865587,"Shuffle Write
Time":11804679}

(Shuffle Finish Time - Launch Time) is the time to fetch block written by
previous stage
(Finish Time - Shuffle Finish Time) is the time to do the task job
is that right?

and what does "Fetch Wait Time" mean?  i'am also confused about  "Shuffle
Write Time", why is it
so big? what's the measurement unit of it?

thank you :)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/details-about-event-log-tp5411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: "sbt/sbt run" command returns a JVM problem

2014-05-05 Thread Carter
hi I still have over 1g left for my program.

Date: Sun, 4 May 2014 19:14:30 -0700
From: ml-node+s1001560n5340...@n3.nabble.com
To: gyz...@hotmail.com
Subject: Re: "sbt/sbt run" command returns a JVM problem



the total memory of your machine is 2G right?then how much memory is 
left free? wouldn`t ubuntu take up quite a big portion of 2G?
just a guess!


On Sat, May 3, 2014 at 8:15 PM, Carter <[hidden email]> wrote:

Hi, thanks for all your help.

I tried your setting in the sbt file, but the problem is still there.



The Java setting in my sbt file is:

java \

  -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \

  -jar ${JAR} \

  "$@"



I have tried to set these 3 parameters bigger and smaller, but nothing

works. Did I change the right thing?



Thank you very much.







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5267.html


Sent from the Apache Spark User List mailing list archive at Nabble.com.














If you reply to this email, your message will be added to the 
discussion below:

http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5340.html



To unsubscribe from "sbt/sbt run" command returns a JVM 
problem, click here.

NAML
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5412.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use spark-submit

2014-05-05 Thread Soumya Simanta


Yes, I'm struggling with a similar problem where my class are not found on the 
worker nodes. I'm using 1.0.0_SNAPSHOT.  I would really appreciate if someone 
can provide some documentation on the usage of spark-submit. 

Thanks 

> On May 5, 2014, at 10:24 PM, Stephen Boesch  wrote:
> 
> 
> I have a spark streaming application that uses the external streaming modules 
> (e.g. kafka, mqtt, ..) as well.  It is not clear how to properly invoke the 
> spark-submit script: what are the ---driver-class-path and/or 
> -Dspark.executor.extraClassPath parameters required?  
> 
>  For reference, the following error is proving difficult to resolve:
> 
> java.lang.ClassNotFoundException: 
> org.apache.spark.streaming.examples.StreamingExamples
> 


Can I share RDD between a pyspark and spark API

2014-05-05 Thread manas Kar
Hi experts.
 I have some pre-built python parsers that I am planning to use, just
because I don't want to write them again in scala. However after the data is
parsed I would like to take the RDD and use it in a scala program.(Yes, I
like scala more than python and more comfortable in scala :)

In doing so I don't want to push the parsed data to disk and then re-obtain
it via the scala class. Is there a way I can achieve what I want in an
efficient way?

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-I-share-RDD-between-a-pyspark-and-spark-API-tp5415.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


about broadcast

2014-05-05 Thread randylu
  In my code, there are two broadcast variables. Sometimes reading the small
one took more time than the big one, so strange!
  Log on slave node is as follows:
Block broadcast_2 stored as values to memory (estimated size *4.0 KB*, free
17.2 GB)
Reading broadcast variable 2 took *9.998537123* s
Block broadcast_1 stored as values to memory (estimated size *705.9 MB*,
free 16.5 GB)
Reading broadcast variable 1 took *2.596005629* s
  Reading the small one took about 0.004s normally, but more then 9s
Occasionally.
  




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-broadcast-tp5416.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: about broadcast

2014-05-05 Thread randylu
additional, Reading the big broadcast variable always took about 2s.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-broadcast-tp5416p5417.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Better option to use Querying in Spark

2014-05-05 Thread prabeesh k
Hi,

I have seen three different ways to query data from Spark

   1. Default SQL support(
   
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
   )
   2. Shark
   3. Blink DB

I would like know which one is more efficient

Regards.
prabeesh


Re: sbt run with spark.ContextCleaner ERROR

2014-05-05 Thread wxhsdp
Hi, TD

i tried on v1.0.0-rc3 and still got the error



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-run-with-spark-ContextCleaner-ERROR-tp5304p5421.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Better option to use Querying in Spark

2014-05-05 Thread Mayur Rustagi
All three have different usecases. If you are looking for more of a
warehouse you are better off with Shark.
SparkSQL is a way to query regular data in sql like syntax leveraging
columnar store.

BlinkDB is a experiment, meant to integrate with Shark in the long term.
Not meant for production usecase directly.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, May 6, 2014 at 11:22 AM, prabeesh k  wrote:

> Hi,
>
> I have seen three different ways to query data from Spark
>
>1. Default SQL support(
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
>)
>2. Shark
>3. Blink DB
>
> I would like know which one is more efficient
>
> Regards.
> prabeesh
>


Re: Better option to use Querying in Spark

2014-05-05 Thread prabeesh k
Thank you for your prompt reply.

Regards,
prabeesh


On Tue, May 6, 2014 at 11:44 AM, Mayur Rustagi wrote:

> All three have different usecases. If you are looking for more of a
> warehouse you are better off with Shark.
> SparkSQL is a way to query regular data in sql like syntax leveraging
> columnar store.
>
> BlinkDB is a experiment, meant to integrate with Shark in the long term.
> Not meant for production usecase directly.
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Tue, May 6, 2014 at 11:22 AM, prabeesh k  wrote:
>
>>  Hi,
>>
>> I have seen three different ways to query data from Spark
>>
>>1. Default SQL support(
>>
>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
>>)
>>2. Shark
>>3. Blink DB
>>
>>
>> I would like know which one is more efficient
>>
>> Regards.
>> prabeesh
>>
>
>


Re: Increase Stack Size Workers

2014-05-05 Thread Matei Zaharia
Add export SPARK_JAVA_OPTS=“-Xss16m” to conf/spark-env.sh. Then it should apply 
to the executor.

Matei


On May 5, 2014, at 2:20 PM, Andrea Esposito  wrote:

> Hi there,
> 
> i'm doing an iterative algorithm and sometimes i ended up with 
> StackOverflowError, doesn't matter if i do checkpoints or not.
> 
> Remaining i don't understand why this is happening, i figure out that 
> increasing the stack size is a workaround.
> 
> Developing using "local[n]" so the local mode i can set the stack size 
> through the -Xss parameter. How can i do the same for the standalone mode for 
> each worker? Setting it as "java -Xss16m Worker" seems useless because the 
> actual computation are done on CoarseGrainExecutor..
> 
> Best,
> EA



Re: "sbt/sbt run" command returns a JVM problem

2014-05-05 Thread Akhil Das
Hi Carter,

Do a export JAVA_OPTS="-Xmx2g" before hitting sbt/sbt run. That will solve
your problem.

Thanks
Best Regards


On Tue, May 6, 2014 at 8:02 AM, Carter  wrote:

> hi I still have over 1g left for my program.
>
> --
> Date: Sun, 4 May 2014 19:14:30 -0700
> From: [hidden email] 
> To: [hidden email] 
> Subject: Re: "sbt/sbt run" command returns a JVM problem
>
>
> the total memory of your machine is 2G right?
> then how much memory is left free? wouldn`t ubuntu take up quite a big
> portion of 2G?
>
> just a guess!
>
>
> On Sat, May 3, 2014 at 8:15 PM, Carter <[hidden 
> email]
> > wrote:
> Hi, thanks for all your help.
> I tried your setting in the sbt file, but the problem is still there.
>
> The Java setting in my sbt file is:
> java \
>   -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \
>   -jar ${JAR} \
>   "$@"
>
> I have tried to set these 3 parameters bigger and smaller, but nothing
> works. Did I change the right thing?
>
> Thank you very much.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5267.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5340.html
>  To unsubscribe from "sbt/sbt run" command returns a JVM problem, click
> here
> .
> http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble:email.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble&%2358;email.naml-instant_emails%21nabble&%2358;email.naml-send_instant_email%21nabble&%2358;email.naml>"
> rel=nofollow style="font:9px serif;" target=_blank>NAML
>
> --
> View this message in context: RE: "sbt/sbt run" command returns a JVM
> problem
>
> Sent from the Apache Spark User List mailing list 
> archiveat Nabble.com.
>