spark metrics in graphite missing for some executors

2015-12-11 Thread rok
I'm using graphite/grafana to collect and visualize metrics from my spark
jobs.

It appears that not all executors report all the metrics -- for example,
even jvm heap data is missing from some. Is there an obvious reason why
this happens? Are metrics somehow held back? Often, an executor's metrics
will show up with a delay, but since they are aggregate metrics (e.g.
number of completed tasks), it is clear that they are being collected from
the beginning (the level once it appears matches other executors) but for
some reason just don't show up initially.

Any experience with this? How can it be fixed? Right now it's rendering
many metrics useless since I want to have a complete view into the
application and I'm only seeing a few executors at a time.

Thanks,

rok




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-metrics-in-graphite-missing-for-some-executors-tp25688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Is stddev not a supported aggregation function in SparkSQL WindowSpec?

2016-02-18 Thread rok
There is a stddev function since 1.6:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.stddev


If you are using spark < 1.6 you can write your own more or less easily.

On Wed, Feb 17, 2016 at 5:06 PM, mayx [via Apache Spark User List] <
ml-node+s1001560n26250...@n3.nabble.com> wrote:

> I'd like to use standard deviation over window partitions on the Spark
> dataframe, but it didn't work. Is it not supported? Looks like it supports
> many aggregation functions, such as mean, min, etc. How can I make a
> feature request for this?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-stddev-not-a-supported-aggregation-function-in-SparkSQL-WindowSpec-tp26250p26263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

very slow parquet file write

2015-11-05 Thread rok
Apologies if this appears a second time! 

I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into a
parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
the size of file this is way over-provisioned (I've tried it with fewer
partitions and fewer nodes, no obvious effect). I was expecting the dump to
disk to be very fast -- the DataFrame is cached in memory and contains just
14 columns (13 are floats and one is a string). When I write it out in json
format, this is indeed reasonably fast (though it still takes a few minutes,
which is longer than I would expect). 

However, when I try to write a parquet file it takes way longer -- the first
set of tasks finishes in a few minutes, but the subsequent tasks take more
than twice as long or longer. In the end it takes over half an hour to write
the file. I've looked at the disk I/O and cpu usage on the compute nodes and
it looks like the processors are fully loaded while the disk I/O is
essentially zero for long periods of time. I don't see any obvious garbage
collection issues and there are no problems with memory. 

Any ideas on how to debug/fix this? 

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-tp25295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



problems running Spark on a firewalled remote YARN cluster via SOCKS proxy

2015-07-22 Thread rok
I am trying to run Spark applications with the driver running locally and
interacting with a firewalled remote cluster via a SOCKS proxy. 

I have to modify the hadoop configuration on the *local machine* to try to
make this work, adding 


   hadoop.rpc.socket.factory.class.default
   org.apache.hadoop.net.SocksSocketFactory


   hadoop.socks.server
   localhost:9998


and on the *remote cluster* side


hadoop.rpc.socket.factory.class.default
org.apache.hadoop.net.StandardSocketFactory
true


With this setup, and running "ssh -D 9998 gateway.host" to start the proxy
connection, MapReduce jobs started on the local machine execute fine on the
remote cluster. However, trying to launch a Spark job fails with the nodes
of the cluster apparently unable to communicate with one another: 

java.io.IOException: Failed on local exception: java.net.SocketException:
Connection refused; Host Details : local host is: "node3/10.211.55.103";
destination host is: "node1":8030;

Looking at the packets being sent to node1 from node3, it's clear that no
requests are made on port 8030, hinting that the connection is somehow being
proxied. 

Is it possible that the Spark job is not honoring the socket.factory
settings on the *cluster* side for some reason? 

Note that  Spark JIRA 5004
   addresses a similar
problem, though it looks like they are actually not the same (since in that
case it sounds like a standalone cluster is being used). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problems-running-Spark-on-a-firewalled-remote-YARN-cluster-via-SOCKS-proxy-tp23955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: help plz! how to use zipWithIndex to each subset of a RDD

2015-07-30 Thread rok
zipWithIndex gives you global indices, which is not what you want. You'll
want to use flatMap with a map function that iterates through each iterable
and returns the (String, Int, String) tuple for each element.

On Thu, Jul 30, 2015 at 4:13 AM, askformore [via Apache Spark User List] <
ml-node+s1001560n24071...@n3.nabble.com> wrote:

> I have some data like this: RDD[(String, String)] = ((*key-1*, a), (
> *key-1*,b), (*key-2*,a), (*key-2*,c),(*key-3*,b),(*key-4*,d)) and I want
> to group the data by Key, and for each group, add index fields to the
> groupmember, at last I can transform the data to below : RDD[(String,
> *Int*, String)] = ((key-1,*1*, a), (key-1,*2,*b), (key-2,*1*,a), (key-2,
> *2*,b),(key-3,*1*,b),(key-4,*1*,d)) I tried to groupByKey firstly, then I
> got a RDD[(String, Iterable[String])], but I don't know how to use
> zipWithIndex function to each Iterable... thanks.
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071p24074.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: PySpark Serialization/Deserialization (Pickling) Overhead

2017-03-08 Thread rok
My guess is that the UI serialization times show the Java side only. To get
a feeling for the python pickling/unpickling, use the show_profiles()
method of the SparkContext instance: http://spark.apache.
org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.show_profiles

That will show you how much of the execution time is used up by
cPickle.load() and .dump() methods.

Hope that helps,

Rok

On Wed, Mar 8, 2017 at 3:18 AM, Yeoul Na [via Apache Spark User List] <
ml-node+s1001560n28468...@n3.nabble.com> wrote:

>
> Hi all,
>
> I am trying to analyze PySpark performance overhead. People just say
> PySpark
> is slower than Scala due to the Serialization/Deserialization overhead. I
> tried with the example in this post:
> https://0x0fff.com/spark-dataframes-are-faster-arent-they/. This and many
> articles say straight-forward Python implementation is the slowest due to
> the serialization/deserialization overhead.
>
> However, when I actually looked at the log in the Web UI, serialization
> and deserialization time of PySpark do not seem to be any bigger than that
> of Scala. The main contributor was "Executor Computing Time". Thus, we
> cannot sure whether this is due to serialization or because Python code is
> basically slower than Scala code.
>
> So my question is that does "Task Deserialization Time" in Spark WebUI
> actually include serialization/deserialization times in PySpark? If this is
> not the case, how can I actually measure the serialization/deserialization
> overhead?
>
> Thanks,
> Yeoul
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-
> Serialization-Deserialization-Pickling-Overhead-tp28468.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=cm9rcm9za2FyQGdtYWlsLmNvbXwxfC0xNDM4OTI3NjU3>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Serialization-Deserialization-Pickling-Overhead-tp28468p28469.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

"java.lang.IllegalStateException: There is no space for new record" in GraphFrames

2017-04-28 Thread rok
When running the connectedComponents algorithm in GraphFrames on a
sufficiently large dataset, I get the following error I have not encountered
before: 

17/04/20 20:35:26 WARN TaskSetManager: Lost task 3.0 in stage 101.0 (TID
53644, 172.19.1.206, executor 40): java.lang.IllegalStateException: There is
no space for new record
at
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:225)
at
org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:130)
at
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:244)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Any thoughts on how to avoid this? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-There-is-no-space-for-new-record-in-GraphFrames-tp28635.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



NegativeArraySizeException in pyspark when loading an RDD pickleFile

2015-01-27 Thread rok
I've got an dataset saved with saveAsPickleFile using pyspark -- it saves
without problems. When I try to read it back in, it fails with: 

Job aborted due to stage failure: Task 401 in stage 0.0 failed 4 times, most
recent failure: Lost task 401.3 in stage 0.0 (TID 449,
e1326.hpc-lca.ethz.ch): java.lang.NegativeArraySizeException: 
   
org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:119)
org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:98)
   
org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:153)
   
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
   
org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
   
org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1875)
   
org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1848)
   
org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103)
   
org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78)
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219)
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
   
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
   
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:330)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
   
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
   
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)


Not really sure where to start looking for the culprit -- any suggestions
most welcome. Thanks!

Rok




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-in-pyspark-when-loading-an-RDD-pickleFile-tp21395.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



pyspark: Java null pointer exception when accessing broadcast variables

2015-02-10 Thread rok
I'm trying to use a broadcasted dictionary inside a map function and am
consistently getting Java null pointer exceptions. This is inside an IPython
session connected to a standalone spark cluster. I seem to recall being able
to do this before but at the moment I am at a loss as to what to try next.
Is there a limit to the size of broadcast variables? This one is rather
large (a few Gb dict). Thanks!

Rok



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Java-null-pointer-exception-when-accessing-broadcast-variables-tp21580.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



iteratively modifying an RDD

2015-02-11 Thread rok
I was having trouble with memory exceptions when broadcasting a large lookup
table, so I've resorted to processing it iteratively -- but how can I modify
an RDD iteratively? 

I'm trying something like :

rdd = sc.parallelize(...)
lookup_tables = {...}

for lookup_table in lookup_tables : 
rdd = rdd.map(lambda x: func(x, lookup_table))

If I leave it as is, then only the last "lookup_table" is applied instead of
stringing together all the maps. However, if add a .cache() to the .map then
it seems to work fine. 

A second problem is that the runtime for each iteration roughly doubles at
each iteration so this clearly doesn't seem to be the way to do it. What is
the preferred way of doing such repeated modifications to an RDD and how can
the accumulation of overhead be minimized? 

Thanks!

Rok



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



cannot connect to Spark Application Master in YARN

2015-02-18 Thread rok
450)
at
org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
at 
org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
at org.mortbay.jetty.Server.handle(Server.java:326)
at 
org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
at
org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
at
org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
at
org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)


On the other hand, trying to access the Spark UI via the URL reported by
Spark when the application starts, it gets redirected to the cluster node
but is unable to connect. Are the applications somehow binding to the wrong
ports? Is this a spark setting I need to configure or something within YARN? 

Thanks!

Rok




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cannot-connect-to-Spark-Application-Master-in-YARN-tp21699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.util.NoSuchElementException: key not found:

2015-02-27 Thread rok
I'm seeing this java.util.NoSuchElementException: key not found: exception
pop up sometimes when I run operations on an RDD from multiple threads in a
python application. It ends up shutting down the SparkContext so I'm
assuming this is a bug -- from what I understand, I should be able to run
operations on the same RDD from multiple threads or is this not recommended? 

I can't reproduce it all the time and I've tried eliminating caching
wherever possible to see if that would have an effect, but it doesn't seem
to. Each thread first splits the base RDD and then runs the
LogisticRegressionWithSGD on the subset.  

Is there a workaround to this exception? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-key-not-found-tp21848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



failure to display logs on YARN UI with log aggregation on

2015-03-09 Thread rok
I'm using log aggregation on YARN with Spark and I am not able to see the
logs through the YARN web UI after the application completes: 

Failed redirect for container_1425390894284_0066_01_01

Failed while trying to construct the redirect url to the log server. Log
Server url may not be configured
java.lang.Exception: Unknown container. Container either has not started or
has already completed or doesn't belong to this node at all. 

I tried setting 

log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log

in log4j.properties as suggested in the documentation, but didn't seem to
change anything. 

Note that I can see the logs fine using "yarn logs" from the command line,
so aggregation is working properly. For regular mapreduce jobs, the YARN UI
displays the logs correctly as well. Is there a spark configuration option
that needs to be set if aggregation is used? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/failure-to-display-logs-on-YARN-UI-with-log-aggregation-on-tp21974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



StandardScaler failing with OOM errors in PySpark

2015-04-21 Thread rok
I'm trying to use the StandardScaler in pyspark on a relatively small (a few
hundred Mb) dataset of sparse vectors with 800k features. The fit method of
StandardScaler crashes with Java heap space or Direct buffer memory errors.
There should be plenty of memory around -- 10 executors with 2 cores each
and 8 Gb per core. I'm giving the executors 9g of memory and have also tried
lots of overhead (3g), thinking it might be the array creation in the
aggregators that's causing issues. 

The bizarre thing is that this isn't always reproducible -- sometimes it
actually works without problems. Should I be setting up executors
differently? 

Thanks,

Rok




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



FetchFailedException and MetadataFetchFailedException

2015-05-15 Thread rok
I am trying to sort a collection of key,value pairs (between several hundred
million to a few billion) and have recently been getting lots of
"FetchFailedException" errors that seem to originate when one of the
executors doesn't seem to find a temporary shuffle file on disk. E.g.: 

org.apache.spark.shuffle.FetchFailedException:
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
(No such file or directory)

This file actually exists: 

> ls -l
> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

-rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

This error repeats on several executors and is followed by a number of 

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0

This results on most tasks being lost and executors dying. 

There is plenty of space on all of the appropriate filesystems, so none of
the executors are running out of disk space. Any idea what might be causing
this? I am running this via YARN on approximately 100 nodes with 2 cores per
node. Any thoughts on what might be causing these errors? Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



sharing RDDs between PySpark and Scala

2014-10-30 Thread rok
I'm processing some data using PySpark and I'd like to save the RDDs to disk
(they are (k,v) RDDs of strings and SparseVector types) and read them in
using Scala to run them through some other analysis. Is this possible? 

Thanks,

Rok



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sharing-RDDs-between-PySpark-and-Scala-tp17718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



using LogisticRegressionWithSGD.train in Python crashes with "Broken pipe"

2014-11-05 Thread rok
I have a dataset comprised of ~200k labeled points whose features are
SparseVectors with ~20M features. I take 5% of the data for a training set. 

> model = LogisticRegressionWithSGD.train(training_set)

fails with 

ERROR:py4j.java_gateway:Error while sending or receiving.
Traceback (most recent call last):
  File
"/cluster/home/roskarr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 472, in send_command
self.socket.sendall(command.encode('utf-8'))
  File "/cluster/home/roskarr/miniconda/lib/python2.7/socket.py", line 224,
in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe

I'm at a loss as to where to begin to debug this... any suggestions? Thanks,

Rok



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: using LogisticRegressionWithSGD.train in Python crashes with "Broken pipe"

2014-11-05 Thread rok
yes, the training set is fine, I've verified it. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182p18195.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



minimizing disk I/O

2014-11-13 Thread rok
I'm trying to understand the disk I/O patterns for Spark -- specifically, I'd
like to reduce the number of files that are being written during shuffle
operations. A couple questions: 

* is the amount of file I/O performed independent of the memory I allocate
for the shuffles? 

* if this is the case, what is the purpose of this memory and is there any
way to see how much of it is actually being used?
 
* how can I minimize the number of files being written? With 24 cores per
node, the filesystem can't handle the large amount of simultaneous I/O very
well so it limits the number of cores I can use... 

Thanks for any insight you might have! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/minimizing-disk-I-O-tp18845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: using LogisticRegressionWithSGD.train in Python crashes with "Broken pipe"

2014-11-13 Thread rok
Hi, I'm using Spark 1.1.0. There is no error on the executors -- it appears
as if the job never gets properly dispatched -- the only message is the
"Broken Pipe" message in the driver. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-LogisticRegressionWithSGD-train-in-Python-crashes-with-Broken-pipe-tp18182p18846.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ensuring RDD indices remain immutable

2014-12-01 Thread rok
I have an RDD that serves as a feature look-up table downstream in my
analysis. I create it using the zipWithIndex() and because I suppose that
the elements of the RDD could end up in a different order if it is
regenerated at any point, I cache it to try and ensure that the (feature -->
index) mapping remains fixed. 

However, I'm having trouble verifying that this is actually robust -- can
someone comment whether using such a mapping should be stable or is there
another preferred method? zipWithUniqueID() isn't optimal since max ID
generated this way is always greater than the number of features so I'm
trying to avoid it. 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ensuring RDD indices remain immutable

2014-12-01 Thread rok
true though I was hoping to avoid having to sort... maybe there's no way
around it. Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094p20104.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



calculating the mean of SparseVector RDD

2015-01-07 Thread rok
I have an RDD of SparseVectors and I'd like to calculate the means returning
a dense vector. I've tried doing this with the following (using pyspark,
spark v1.2.0): 

def aggregate_partition_values(vec1, vec2) :
vec1[vec2.indices] += vec2.values
return vec1

def aggregate_combined_vectors(vec1, vec2) : 
if all(vec1 == vec2) : 
# then the vector came from only one partition
return vec1
else:
return vec1 + vec2

means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
aggregate_combined_vectors)
means = means / nvals

This turns out to be really slow -- and doesn't seem to depend on how many
vectors there are so there seems to be some overhead somewhere that I'm not
understanding. Is there a better way of doing this? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FetchFailedException and MetadataFetchFailedException

2015-05-22 Thread Rok Roskar
ction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:38)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

On Tue, May 19, 2015 at 3:38 AM, Imran Rashid  wrote:

> Hi,
>
> can you take a look at the logs and see what the first error you are
> getting is?  Its possible that the file doesn't exist when that error is
> produced, but it shows up later -- I've seen similar things happen, but
> only after there have already been some errors.  But, if you see that in
> the very first error, then I"m not sure what the cause is.  Would be
> helpful for you to send the logs.
>
> Imran
>
> On Fri, May 15, 2015 at 10:07 AM, rok  wrote:
>
>> I am trying to sort a collection of key,value pairs (between several
>> hundred
>> million to a few billion) and have recently been getting lots of
>> "FetchFailedException" errors that seem to originate when one of the
>> executors doesn't seem to find a temporary shuffle file on disk. E.g.:
>>
>> org.apache.spark.shuffle.FetchFailedException:
>>
>> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
>> (No such file or directory)
>>
>> This file actually exists:
>>
>> > ls -l
>> >
>> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
>>
>> -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52
>>
>> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
>>
>> This error repeats on several executors and is followed by a number of
>>
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
>> location for shuffle 0
>>
>> This results on most tasks being lost and executors dying.
>>
>> There is plenty of space on all of the appropriate filesystems, so none of
>> the executors are running out of disk space. Any idea what might be
>> causing
>> this? I am running this via YARN on approximately 100 nodes with 2 cores
>> per
>> node. Any thoughts on what might be causing these errors? Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: FetchFailedException and MetadataFetchFailedException

2015-05-28 Thread Rok Roskar
yes I've had errors with too many open files before, but this doesn't seem
to be the case here.

Hmm, you're right in that these errors are different from what I initially
stated -- I think what I assumed was that the failure to write resulted in
the worker to crash which in turn resulted in a failed fetch. I'll try to
see if I can make sense of it from the logs.

On Fri, May 22, 2015 at 9:29 PM, Imran Rashid  wrote:

> hmm, sorry I think that disproves my theory.  Nothing else is immediately
> coming to mind.  Its possible there is more info in the logs from the
> driver, couldn't hurt to send those (though I don't have high hopes of
> finding anything that way).  Offchance this could be from too many open
> files or something?  Normally there is a different error msg, but I figure
> its worth asking anyway.
>
> The error you reported here was slightly different from your original
> post.  This error is from writing the shuffle map output, while the
> original error you reported was a fetch failed, which is from reading the
> shuffle data on the "reduce" side in the next stage.  Does the map stage
> actually finish, even though the tasks are throwing these errors while
> writing the map output?  Or do you sometimes get failures on the shuffle
> write side, and sometimes on the shuffle read side?  (Not that I think you
> are doing anything wrong, but it may help narrow down the root cause and
> possibly file a bug.)
>
> thanks
>
>
> On Fri, May 22, 2015 at 4:40 AM, Rok Roskar  wrote:
>
>> on the worker/container that fails, the "file not found" is the first
>> error -- the output below is from the yarn log. There were some python
>> worker crashes for another job/stage earlier (see the warning at 18:36) but
>> I expect those to be unrelated to this file not found error.
>>
>>
>> ==
>> LogType:stderr
>> Log Upload Time:15-May-2015 18:50:05
>> LogLength:5706
>> Log Contents:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/filecache/89/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4
>> j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/05/15 18:33:09 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/05/15 18:36:37 WARN PythonRDD: Incomplete task interrupted: Attempting
>> to kill Python Worker
>> 15/05/15 18:50:03 ERROR Executor: Exception in task 319.0 in stage 12.0
>> (TID 995)
>> java.io.FileNotFoundException:
>> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3
>> -44da-9410-99c872a89489/03/shuffle_4_319_0.data (No such file or
>> directory)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.(FileOutputStream.java:212)
>> at
>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130)
>> at
>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at
>> org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:823)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758)
>> at
>> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754)
>> at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
>>

very slow parquet file write

2015-11-05 Thread Rok Roskar
I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into
a parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
the size of file this is way over-provisioned (I've tried it with fewer
partitions and fewer nodes, no obvious effect). I was expecting the dump to
disk to be very fast -- the DataFrame is cached in memory and contains just
14 columns (13 are floats and one is a string). When I write it out in json
format, this is indeed reasonably fast (though it still takes a few
minutes, which is longer than I would expect).

However, when I try to write a parquet file it takes way longer -- the
first set of tasks finishes in a few minutes, but the subsequent tasks take
more than twice as long or longer. In the end it takes over half an hour to
write the file. I've looked at the disk I/O and cpu usage on the compute
nodes and it looks like the processors are fully loaded while the disk I/O
is essentially zero for long periods of time. I don't see any obvious
garbage collection issues and there are no problems with memory.

Any ideas on how to debug/fix this?

Thanks!


Re: very slow parquet file write

2015-11-06 Thread Rok Roskar
yes I was expecting that too because of all the metadata generation and
compression. But I have not seen performance this bad for other parquet
files I’ve written and was wondering if there could be something obvious
(and wrong) to do with how I’ve specified the schema etc. It’s a very
simple schema consisting of a StructType with a few StructField floats and
a string. I’m using all the spark defaults for io compression.

I'll see what I can do about running a profiler -- can you point me to a
resource/example?

Thanks,

Rok

ps: my post on the mailing list is still listed as not accepted by the
mailing list:
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-td25295.html
-- none of your responses are there either. I am definitely subscribed to
the list though (I get daily digests). Any clue how to fix it?




On Nov 6, 2015, at 9:26 AM, Cheng Lian  wrote:

I'd expect writing Parquet files slower than writing JSON files since
Parquet involves more complicated encoders, but maybe not that slow. Would
you mind to try to profile one Spark executor using tools like YJP to see
what's the hotspot?

Cheng

On 11/6/15 7:34 AM, rok wrote:

Apologies if this appears a second time!

I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions into a
parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
the size of file this is way over-provisioned (I've tried it with fewer
partitions and fewer nodes, no obvious effect). I was expecting the dump to
disk to be very fast -- the DataFrame is cached in memory and contains just
14 columns (13 are floats and one is a string). When I write it out in json
format, this is indeed reasonably fast (though it still takes a few minutes,
which is longer than I would expect).

However, when I try to write a parquet file it takes way longer -- the first
set of tasks finishes in a few minutes, but the subsequent tasks take more
than twice as long or longer. In the end it takes over half an hour to write
the file. I've looked at the disk I/O and cpu usage on the compute nodes and
it looks like the processors are fully loaded while the disk I/O is
essentially zero for long periods of time. I don't see any obvious garbage
collection issues and there are no problems with memory.

Any ideas on how to debug/fix this?

Thanks!



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/very-slow-parquet-file-write-tp25295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Re: very slow parquet file write

2015-11-13 Thread Rok Roskar
I'm not sure what you mean? I didn't do anything specifically to partition
the columns
On Nov 14, 2015 00:38, "Davies Liu"  wrote:

> Do you have partitioned columns?
>
> On Thu, Nov 5, 2015 at 2:08 AM, Rok Roskar  wrote:
> > I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions
> into a
> > parquet file on HDFS. I've got a few hundred nodes in the cluster, so for
> > the size of file this is way over-provisioned (I've tried it with fewer
> > partitions and fewer nodes, no obvious effect). I was expecting the dump
> to
> > disk to be very fast -- the DataFrame is cached in memory and contains
> just
> > 14 columns (13 are floats and one is a string). When I write it out in
> json
> > format, this is indeed reasonably fast (though it still takes a few
> minutes,
> > which is longer than I would expect).
> >
> > However, when I try to write a parquet file it takes way longer -- the
> first
> > set of tasks finishes in a few minutes, but the subsequent tasks take
> more
> > than twice as long or longer. In the end it takes over half an hour to
> write
> > the file. I've looked at the disk I/O and cpu usage on the compute nodes
> and
> > it looks like the processors are fully loaded while the disk I/O is
> > essentially zero for long periods of time. I don't see any obvious
> garbage
> > collection issues and there are no problems with memory.
> >
> > Any ideas on how to debug/fix this?
> >
> > Thanks!
> >
> >
>


Re: problems running Spark on a firewalled remote YARN cluster via SOCKS proxy

2015-07-24 Thread Rok Roskar
Hi Akhil,

the namenode is definitely configured correctly, otherwise the job would
not start at all. It registers with YARN and starts up, but once the nodes
try to communicate to each other it fails. Note that a hadoop MR job using
the identical configuration executes without any problems. The driver also
connects just fine -- here is the log:

15/07/24 08:10:58 INFO yarn.ApplicationMaster: Registered signal
handlers for [TERM, HUP, INT]
15/07/24 08:10:59 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
15/07/24 08:10:59 INFO yarn.ApplicationMaster: ApplicationAttemptId:
appattempt_1437724871597_0001_01
15/07/24 08:11:00 INFO spark.SecurityManager: Changing view acls to: root,rok
15/07/24 08:11:00 INFO spark.SecurityManager: Changing modify acls to: root,rok
15/07/24 08:11:00 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(root, rok); users with modify permissions: Set(root,
rok)
15/07/24 08:11:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/24 08:11:01 INFO Remoting: Starting remoting
15/07/24 08:11:01 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkYarnAM@10.211.55.104:51896]
15/07/24 08:11:01 INFO util.Utils: Successfully started service
'sparkYarnAM' on port 51896.
15/07/24 08:11:01 INFO yarn.ApplicationMaster: Waiting for Spark
driver to be reachable.
15/07/24 08:11:01 INFO yarn.ApplicationMaster: Driver now available:
:58734
15/07/24 08:11:01 INFO yarn.ApplicationMaster$AMEndpoint: Add WebUI
Filter. 
AddWebUIFilter(org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,Map(PROXY_HOSTS
-> node1, PROXY_URI_BASES ->
http://node1:8089/proxy/application_1437724871597_0001),/proxy/application_1437724871597_0001)
15/07/24 08:11:01 INFO client.RMProxy: Connecting to ResourceManager
at node1/10.211.55.101:8030
15/07/24 08:11:01 INFO yarn.YarnRMClient: Registering the ApplicationMaster
15/07/24 08:11:02 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 0 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:03 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 1 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:04 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 2 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:05 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 3 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:06 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 4 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:07 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 5 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:08 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 6 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:09 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 7 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:10 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 8 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:11 INFO ipc.Client: Retrying connect to server:
node1/10.211.55.101:8030. Already tried 9 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/07/24 08:11:11 ERROR yarn.ApplicationMaster: Uncaught exception:
java.io.IOException: Failed on local exception:
java.net.SocketException: Connection refused; Host Details : local
host is: "node4/10.211.55.104"; destination host is: "node1":8030;




On Thu, Jul 23, 2015 at 7:00 PM, Akhil Das 
wrote:

> It looks like its picking up the wrong namenode uri from the
> HADOOP_CONF_DIR, make sure it is proper. Also for submitting a spark job to
> a remote cluster, you might want to look at the spark.driver host and
> spark.driver.port
>
> Thanks
> Best Regards
>
> On Wed, Jul 22, 2015 at 8:56 PM, rok  wrote:
>
>> I am trying to run Spark applications with the driver running locally and
>> interacting with a firewalled remote cluster via a SOCKS proxy.
>>
>> I have to 

Re: NegativeArraySizeException in pyspark when loading an RDD pickleFile

2015-01-28 Thread Rok Roskar
hi, thanks for the quick answer -- I suppose this is possible, though I
don't understand how it could come about. The largest individual RDD
elements are ~ 1 Mb in size (most are smaller) and the RDD is composed of
800k of them. The file is saved in 134 parts, but is being read in using
some 1916+ partitions (I don't know why actually -- how does this number
come about?). How can I check if any objects/batches are exceeding 2Gb?

Thanks,

Rok


On Tue, Jan 27, 2015 at 7:55 PM, Davies Liu  wrote:

> Maybe it's caused by integer overflow, is it possible that one object
> or batch bigger than 2G (after pickling)?
>
> On Tue, Jan 27, 2015 at 7:59 AM, rok  wrote:
> > I've got an dataset saved with saveAsPickleFile using pyspark -- it saves
> > without problems. When I try to read it back in, it fails with:
> >
> > Job aborted due to stage failure: Task 401 in stage 0.0 failed 4 times,
> most
> > recent failure: Lost task 401.3 in stage 0.0 (TID 449,
> > e1326.hpc-lca.ethz.ch): java.lang.NegativeArraySizeException:
> >
> > org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:119)
> > org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:98)
> >
> > org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:153)
> >
> >
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
> >
> >
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
> >
> >
> org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1875)
> >
> >
> org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1848)
> >
> >
> org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103)
> >
> >
> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78)
> >
>  org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219)
> >
>  org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188)
> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >
> >
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >
> >
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:330)
> >
> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
> >
> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> >
> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> >
>  org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
> >
> >
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
> >
> >
> > Not really sure where to start looking for the culprit -- any suggestions
> > most welcome. Thanks!
> >
> > Rok
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-in-pyspark-when-loading-an-RDD-pickleFile-tp21395.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: NegativeArraySizeException in pyspark when loading an RDD pickleFile

2015-01-29 Thread Rok Roskar
Thanks for the clarification on the partitioning.

I did what you suggested and tried reading in individual part-* files --
some of them are ~1.7Gb in size and that's where it's failing. When I
increase the number of partitions before writing to disk, it seems to work.
Would be nice if this was somehow automatically corrected!

Thanks,

Rok

On Wed, Jan 28, 2015 at 7:01 PM, Davies Liu  wrote:

> HadoopRDD will try to split the file as 64M partitions in size, so you
> got 1916+ partitions.
> (assume 100k per row, they are 80G in size).
>
> I think it has very small chance that one object or one batch will be
> bigger than 2G.
> Maybe there are a bug when it split the pickled file, could you create
> a RDD for each
> file, then see which file is cause the issue (maybe some of them)?
>
> On Wed, Jan 28, 2015 at 1:30 AM, Rok Roskar  wrote:
> > hi, thanks for the quick answer -- I suppose this is possible, though I
> > don't understand how it could come about. The largest individual RDD
> > elements are ~ 1 Mb in size (most are smaller) and the RDD is composed of
> > 800k of them. The file is saved in 134 parts, but is being read in using
> > some 1916+ partitions (I don't know why actually -- how does this number
> > come about?). How can I check if any objects/batches are exceeding 2Gb?
> >
> > Thanks,
> >
> > Rok
> >
> >
> > On Tue, Jan 27, 2015 at 7:55 PM, Davies Liu 
> wrote:
> >>
> >> Maybe it's caused by integer overflow, is it possible that one object
> >> or batch bigger than 2G (after pickling)?
> >>
> >> On Tue, Jan 27, 2015 at 7:59 AM, rok  wrote:
> >> > I've got an dataset saved with saveAsPickleFile using pyspark -- it
> >> > saves
> >> > without problems. When I try to read it back in, it fails with:
> >> >
> >> > Job aborted due to stage failure: Task 401 in stage 0.0 failed 4
> times,
> >> > most
> >> > recent failure: Lost task 401.3 in stage 0.0 (TID 449,
> >> > e1326.hpc-lca.ethz.ch): java.lang.NegativeArraySizeException:
> >> >
> >> > org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:119)
> >> >
> >> > org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:98)
> >> >
> >> > org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:153)
> >> >
> >> >
> >> >
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
> >> >
> >> >
> >> >
> org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
> >> >
> >> >
> >> >
> org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1875)
> >> >
> >> >
> >> >
> org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1848)
> >> >
> >> >
> >> >
> org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103)
> >> >
> >> >
> >> >
> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78)
> >> >
> >> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219)
> >> >
> >> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188)
> >> >
> >> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >> >
> >> >
> >> >
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >> > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:330)
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
> >> >
> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
> >> >
> >> >
> >> >
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
> >> >
> >> >
> >> > Not really sure where to start looking for the culprit -- any
> >> > suggestions
> >> > most welcome. Thanks!
> >> >
> >> > Rok
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> >
> http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-in-pyspark-when-loading-an-RDD-pickleFile-tp21395.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> > For additional commands, e-mail: user-h...@spark.apache.org
> >> >
> >
> >
>


Re: pyspark: Java null pointer exception when accessing broadcast variables

2015-02-10 Thread Rok Roskar
.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
... 4 more


What I find odd is that when I make the broadcast object, the logs don't show 
any significant amount of memory being allocated in any of the block managers 
-- but the dictionary is large, it's 8 Gb pickled on disk. 


On Feb 10, 2015, at 10:01 PM, Davies Liu  wrote:

> Could you paste the NPE stack trace here? It will better to create a
> JIRA for it, thanks!
> 
> On Tue, Feb 10, 2015 at 10:42 AM, rok  wrote:
>> I'm trying to use a broadcasted dictionary inside a map function and am
>> consistently getting Java null pointer exceptions. This is inside an IPython
>> session connected to a standalone spark cluster. I seem to recall being able
>> to do this before but at the moment I am at a loss as to what to try next.
>> Is there a limit to the size of broadcast variables? This one is rather
>> large (a few Gb dict). Thanks!
>> 
>> Rok
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Java-null-pointer-exception-when-accessing-broadcast-variables-tp21580.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark: Java null pointer exception when accessing broadcast variables

2015-02-10 Thread Rok Roskar
I didn't notice other errors -- I also thought such a large broadcast is a
bad idea but I tried something similar with a much smaller dictionary and
encountered the same problem. I'm not familiar enough with spark internals
to know whether the trace indicates an issue with the broadcast variables
or perhaps something different?

The driver and executors have 50gb of ram so memory should be fine.

Thanks,

Rok
On Feb 11, 2015 12:19 AM, "Davies Liu"  wrote:

> It's brave to broadcast 8G pickled data, it will take more than 15G in
> memory for each Python worker,
> how much memory do you have in executor and driver?
> Do you see any other exceptions in driver and executors? Something
> related to serialization in JVM.
>
> On Tue, Feb 10, 2015 at 2:16 PM, Rok Roskar  wrote:
> > I get this in the driver log:
>
> I think this should happen on executor, or you called first() or
> take() on the RDD?
>
> > java.lang.NullPointerException
> > at
> org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
> > at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)
> >
> > and on one of the executor's stderr:
> >
> > 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly
> (crashed)
> > org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
> >   File
> "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py",
> line 57, in main
> > split_index = read_int(infile)
> >   File
> "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py",
> line 511, in read_int
> > raise EOFError
> > EOFError
> >
> > at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
> > at
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:174)
> > at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> > at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> > at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
> > at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)
> > Caused by: java.lang.NullPointerException
> > at
> org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
> > ... 4 more
> > 15/02/10 23:10:35 ERROR PythonR

Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
Yes I actually do use mapPartitions already
On Feb 11, 2015 7:55 PM, "Charles Feduke"  wrote:

> If you use mapPartitions to iterate the lookup_tables does that improve
> the performance?
>
> This link is to Spark docs 1.1 because both latest and 1.2 for Python give
> me a 404:
> http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html#mapPartitions
>
> On Wed Feb 11 2015 at 1:48:42 PM rok  wrote:
>
>> I was having trouble with memory exceptions when broadcasting a large
>> lookup
>> table, so I've resorted to processing it iteratively -- but how can I
>> modify
>> an RDD iteratively?
>>
>> I'm trying something like :
>>
>> rdd = sc.parallelize(...)
>> lookup_tables = {...}
>>
>> for lookup_table in lookup_tables :
>> rdd = rdd.map(lambda x: func(x, lookup_table))
>>
>> If I leave it as is, then only the last "lookup_table" is applied instead
>> of
>> stringing together all the maps. However, if add a .cache() to the .map
>> then
>> it seems to work fine.
>>
>> A second problem is that the runtime for each iteration roughly doubles at
>> each iteration so this clearly doesn't seem to be the way to do it. What
>> is
>> the preferred way of doing such repeated modifications to an RDD and how
>> can
>> the accumulation of overhead be minimized?
>>
>> Thanks!
>>
>> Rok
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: pyspark: Java null pointer exception when accessing broadcast variables

2015-02-11 Thread Rok Roskar
I think the problem was related to the broadcasts being too large -- I've
now split it up into many smaller operations but it's still not quite there
-- see
http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-td21606.html

Thanks,

Rok

On Wed, Feb 11, 2015, 19:59 Davies Liu  wrote:

> Could you share a short script to reproduce this problem?
>
> On Tue, Feb 10, 2015 at 8:55 PM, Rok Roskar  wrote:
> > I didn't notice other errors -- I also thought such a large broadcast is
> a
> > bad idea but I tried something similar with a much smaller dictionary and
> > encountered the same problem. I'm not familiar enough with spark
> internals
> > to know whether the trace indicates an issue with the broadcast
> variables or
> > perhaps something different?
> >
> > The driver and executors have 50gb of ram so memory should be fine.
> >
> > Thanks,
> >
> > Rok
> >
> > On Feb 11, 2015 12:19 AM, "Davies Liu"  wrote:
> >>
> >> It's brave to broadcast 8G pickled data, it will take more than 15G in
> >> memory for each Python worker,
> >> how much memory do you have in executor and driver?
> >> Do you see any other exceptions in driver and executors? Something
> >> related to serialization in JVM.
> >>
> >> On Tue, Feb 10, 2015 at 2:16 PM, Rok Roskar 
> wrote:
> >> > I get this in the driver log:
> >>
> >> I think this should happen on executor, or you called first() or
> >> take() on the RDD?
> >>
> >> > java.lang.NullPointerException
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$
> anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$
> anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:
> 727)
> >> > at
> >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> > at
> >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >> > at scala.collection.AbstractIterable.foreach(
> Iterable.scala:54)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$
> anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$
> anonfun$run$1.apply(PythonRDD.scala:204)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$
> anonfun$run$1.apply(PythonRDD.scala:204)
> >> > at
> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$WriterThread.run(
> PythonRDD.scala:203)
> >> >
> >> > and on one of the executor's stderr:
> >> >
> >> > 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly
> >> > (crashed)
> >> > org.apache.spark.api.python.PythonException: Traceback (most recent
> call
> >> > last):
> >> >   File
> >> > "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/
> pyspark/worker.py",
> >> > line 57, in main
> >> > split_index = read_int(infile)
> >> >   File
> >> > "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/
> pyspark/serializers.py",
> >> > line 511, in read_int
> >> > raise EOFError
> >> > EOFError
> >> >
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$$anon$1.read(
> PythonRDD.scala:137)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$$anon$1.(
> PythonRDD.scala:174)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
> >> > at
> >> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> >> > at
> >> > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
> >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
> >> > at
> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$
> anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242)
> >> 

Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
Aha great! Thanks for the clarification!
On Feb 11, 2015 8:11 PM, "Davies Liu"  wrote:

> On Wed, Feb 11, 2015 at 10:47 AM, rok  wrote:
> > I was having trouble with memory exceptions when broadcasting a large
> lookup
> > table, so I've resorted to processing it iteratively -- but how can I
> modify
> > an RDD iteratively?
> >
> > I'm trying something like :
> >
> > rdd = sc.parallelize(...)
> > lookup_tables = {...}
> >
> > for lookup_table in lookup_tables :
> > rdd = rdd.map(lambda x: func(x, lookup_table))
> >
> > If I leave it as is, then only the last "lookup_table" is applied
> instead of
> > stringing together all the maps. However, if add a .cache() to the .map
> then
> > it seems to work fine.
>
> This is the something related to Python closure implementation, you should
> do it like this:
>
> def create_func(lookup_table):
>  return lambda x: func(x, lookup_table)
>
> for lookup_table in lookup_tables:
> rdd = rdd.map(create_func(lookup_table))
>
> The Python closure just remember the variable, not copy the value of it.
> In the loop, `lookup_table` is the same variable. When we serialize the
> final
> rdd, all the closures are referring to the same `lookup_table`, which
> points
> to the last value.
>
> When we create the closure in a function, Python create a variable for
> each closure, so it works.
>
> > A second problem is that the runtime for each iteration roughly doubles
> at
> > each iteration so this clearly doesn't seem to be the way to do it. What
> is
> > the preferred way of doing such repeated modifications to an RDD and how
> can
> > the accumulation of overhead be minimized?
> >
> > Thanks!
> >
> > Rok
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
the runtime for each consecutive iteration is still roughly twice as long as 
for the previous one -- is there a way to reduce whatever overhead is 
accumulating? 

On Feb 11, 2015, at 8:11 PM, Davies Liu  wrote:

> On Wed, Feb 11, 2015 at 10:47 AM, rok  wrote:
>> I was having trouble with memory exceptions when broadcasting a large lookup
>> table, so I've resorted to processing it iteratively -- but how can I modify
>> an RDD iteratively?
>> 
>> I'm trying something like :
>> 
>> rdd = sc.parallelize(...)
>> lookup_tables = {...}
>> 
>> for lookup_table in lookup_tables :
>>rdd = rdd.map(lambda x: func(x, lookup_table))
>> 
>> If I leave it as is, then only the last "lookup_table" is applied instead of
>> stringing together all the maps. However, if add a .cache() to the .map then
>> it seems to work fine.
> 
> This is the something related to Python closure implementation, you should
> do it like this:
> 
> def create_func(lookup_table):
> return lambda x: func(x, lookup_table)
> 
> for lookup_table in lookup_tables:
>rdd = rdd.map(create_func(lookup_table))
> 
> The Python closure just remember the variable, not copy the value of it.
> In the loop, `lookup_table` is the same variable. When we serialize the final
> rdd, all the closures are referring to the same `lookup_table`, which points
> to the last value.
> 
> When we create the closure in a function, Python create a variable for
> each closure, so it works.
> 
>> A second problem is that the runtime for each iteration roughly doubles at
>> each iteration so this clearly doesn't seem to be the way to do it. What is
>> the preferred way of doing such repeated modifications to an RDD and how can
>> the accumulation of overhead be minimized?
>> 
>> Thanks!
>> 
>> Rok
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: iteratively modifying an RDD

2015-02-11 Thread Rok Roskar
yes, sorry i wasn't clear -- I still have to trigger the calculation of the RDD 
at the end of each iteration. Otherwise all of the lookup tables are shipped to 
the cluster at the same time resulting in memory errors. Therefore this becomes 
several map jobs instead of one and each consecutive map is slower than the one 
before. I'll try the checkpoint, thanks for the suggestion. 


On Feb 12, 2015, at 12:13 AM, Davies Liu  wrote:

> On Wed, Feb 11, 2015 at 2:43 PM, Rok Roskar  wrote:
>> the runtime for each consecutive iteration is still roughly twice as long as 
>> for the previous one -- is there a way to reduce whatever overhead is 
>> accumulating?
> 
> Sorry, I didn't fully understand you question, which two are you comparing?
> 
> PySpark will try to combine the multiple map() together, then you will get
> a task which need all the lookup_tables (the same size as before).
> 
> You could add a checkpoint after some of the iterations.
> 
>> On Feb 11, 2015, at 8:11 PM, Davies Liu  wrote:
>> 
>>> On Wed, Feb 11, 2015 at 10:47 AM, rok  wrote:
>>>> I was having trouble with memory exceptions when broadcasting a large 
>>>> lookup
>>>> table, so I've resorted to processing it iteratively -- but how can I 
>>>> modify
>>>> an RDD iteratively?
>>>> 
>>>> I'm trying something like :
>>>> 
>>>> rdd = sc.parallelize(...)
>>>> lookup_tables = {...}
>>>> 
>>>> for lookup_table in lookup_tables :
>>>>   rdd = rdd.map(lambda x: func(x, lookup_table))
>>>> 
>>>> If I leave it as is, then only the last "lookup_table" is applied instead 
>>>> of
>>>> stringing together all the maps. However, if add a .cache() to the .map 
>>>> then
>>>> it seems to work fine.
>>> 
>>> This is the something related to Python closure implementation, you should
>>> do it like this:
>>> 
>>> def create_func(lookup_table):
>>>return lambda x: func(x, lookup_table)
>>> 
>>> for lookup_table in lookup_tables:
>>>   rdd = rdd.map(create_func(lookup_table))
>>> 
>>> The Python closure just remember the variable, not copy the value of it.
>>> In the loop, `lookup_table` is the same variable. When we serialize the 
>>> final
>>> rdd, all the closures are referring to the same `lookup_table`, which points
>>> to the last value.
>>> 
>>> When we create the closure in a function, Python create a variable for
>>> each closure, so it works.
>>> 
>>>> A second problem is that the runtime for each iteration roughly doubles at
>>>> each iteration so this clearly doesn't seem to be the way to do it. What is
>>>> the preferred way of doing such repeated modifications to an RDD and how 
>>>> can
>>>> the accumulation of overhead be minimized?
>>>> 
>>>> Thanks!
>>>> 
>>>> Rok
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: 
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-tp21606.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>> 
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>> 
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark: Java null pointer exception when accessing broadcast variables

2015-02-12 Thread Rok Roskar
 d[x]).collect()

Out[99]:
[0.39210713836346933,
 0.8636333432012482,
 0.28744831569153617,
 0.663815926356163,
 0.38274814840717364,
 0.6606453820150496,
 0.8610156719813942,
 0.6971353266345091,
 0.9896836700210551,
 0.05789392881996358]

Is there a size limit for objects serialized with Kryo? Or an option that
controls it? The Java serializer works fine.

On Wed, Feb 11, 2015 at 8:04 PM, Rok Roskar  wrote:

> I think the problem was related to the broadcasts being too large -- I've
> now split it up into many smaller operations but it's still not quite there
> -- see
> http://apache-spark-user-list.1001560.n3.nabble.com/iteratively-modifying-an-RDD-td21606.html
>
> Thanks,
>
> Rok
>
> On Wed, Feb 11, 2015, 19:59 Davies Liu  wrote:
>
>> Could you share a short script to reproduce this problem?
>>
>> On Tue, Feb 10, 2015 at 8:55 PM, Rok Roskar  wrote:
>> > I didn't notice other errors -- I also thought such a large broadcast
>> is a
>> > bad idea but I tried something similar with a much smaller dictionary
>> and
>> > encountered the same problem. I'm not familiar enough with spark
>> internals
>> > to know whether the trace indicates an issue with the broadcast
>> variables or
>> > perhaps something different?
>> >
>> > The driver and executors have 50gb of ram so memory should be fine.
>> >
>> > Thanks,
>> >
>> > Rok
>> >
>> > On Feb 11, 2015 12:19 AM, "Davies Liu"  wrote:
>> >>
>> >> It's brave to broadcast 8G pickled data, it will take more than 15G in
>> >> memory for each Python worker,
>> >> how much memory do you have in executor and driver?
>> >> Do you see any other exceptions in driver and executors? Something
>> >> related to serialization in JVM.
>> >>
>> >> On Tue, Feb 10, 2015 at 2:16 PM, Rok Roskar 
>> wrote:
>> >> > I get this in the driver log:
>> >>
>> >> I think this should happen on executor, or you called first() or
>> >> take() on the RDD?
>> >>
>> >> > java.lang.NullPointerException
>> >> > at
>> >> > org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
>> >> > at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
>> >> > at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
>> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:
>> 727)
>> >> > at
>> >> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> > at
>> >> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >> > at scala.collection.AbstractIterable.foreach(
>> Iterable.scala:54)
>> >> > at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply$mcV$sp(PythonRDD.scala:229)
>> >> > at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply(PythonRDD.scala:204)
>> >> > at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$
>> run$1.apply(PythonRDD.scala:204)
>> >> > at
>> >> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
>> >> > at
>> >> > org.apache.spark.api.python.PythonRDD$WriterThread.run(Pytho
>> nRDD.scala:203)
>> >> >
>> >> > and on one of the executor's stderr:
>> >> >
>> >> > 15/02/10 23:10:35 ERROR PythonRDD: Python worker exited unexpectedly
>> >> > (crashed)
>> >> > org.apache.spark.api.python.PythonException: Traceback (most recent
>> call
>> >> > last):
>> >> >   File
>> >> > "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pysp
>> ark/worker.py",
>> >> > line 57, in main
>> >> > split_index = read_int(infile)
>> >> >   File
>> >> > "/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pysp
>> ark/serializers.py",
>> >> > line 511, in read_int
>> >> > raise EOFError
>> >> > EOFError
>> >> >
>> >> > at
>> >&

Re: java.util.NoSuchElementException: key not found:

2015-03-02 Thread Rok Roskar
aha ok, thanks.

If I create different RDDs from a parent RDD and force evaluation
thread-by-thread, then it should presumably be fine, correct? Or do I need
to checkpoint the child RDDs as a precaution in case it needs to be removed
from memory and recomputed?

On Sat, Feb 28, 2015 at 4:28 AM, Shixiong Zhu  wrote:

> RDD is not thread-safe. You should not use it in multiple threads.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-02-27 23:14 GMT+08:00 rok :
>
>> I'm seeing this java.util.NoSuchElementException: key not found: exception
>> pop up sometimes when I run operations on an RDD from multiple threads in
>> a
>> python application. It ends up shutting down the SparkContext so I'm
>> assuming this is a bug -- from what I understand, I should be able to run
>> operations on the same RDD from multiple threads or is this not
>> recommended?
>>
>> I can't reproduce it all the time and I've tried eliminating caching
>> wherever possible to see if that would have an effect, but it doesn't seem
>> to. Each thread first splits the base RDD and then runs the
>> LogisticRegressionWithSGD on the subset.
>>
>> Is there a workaround to this exception?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-key-not-found-tp21848.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: StandardScaler failing with OOM errors in PySpark

2015-04-22 Thread Rok Roskar
the feature dimension is 800k.

yes, I believe the driver memory is likely the problem since it doesn't crash 
until the very last part of the tree aggregation. 

I'm running it via pyspark through YARN -- I have to run in client mode so I 
can't set spark.driver.memory -- I've tried setting the spark.yarn.am.memory 
and overhead parameters but it doesn't seem to have an effect. 

Thanks,

Rok

On Apr 23, 2015, at 7:47 AM, Xiangrui Meng  wrote:

> What is the feature dimension? Did you set the driver memory? -Xiangrui
> 
> On Tue, Apr 21, 2015 at 6:59 AM, rok  wrote:
>> I'm trying to use the StandardScaler in pyspark on a relatively small (a few
>> hundred Mb) dataset of sparse vectors with 800k features. The fit method of
>> StandardScaler crashes with Java heap space or Direct buffer memory errors.
>> There should be plenty of memory around -- 10 executors with 2 cores each
>> and 8 Gb per core. I'm giving the executors 9g of memory and have also tried
>> lots of overhead (3g), thinking it might be the array creation in the
>> aggregators that's causing issues.
>> 
>> The bizarre thing is that this isn't always reproducible -- sometimes it
>> actually works without problems. Should I be setting up executors
>> differently?
>> 
>> Thanks,
>> 
>> Rok
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-04-23 Thread Rok Roskar
ok yes, I think I have narrowed it down to being a problem with driver
memory settings. It looks like the application master/driver is not being
launched with the settings specified:

For the driver process on the main node I see "-XX:MaxPermSize=128m
-Xms512m -Xmx512m" as options used to start the JVM, even though I
specified

'spark.yarn.am.memory', '5g'
'spark.yarn.am.memoryOverhead', '2000'

The info shows that these options were read:

15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120
MB memory including 2000 MB overhead

Is there some reason why these options are being ignored and instead
starting the driver with just 512Mb of heap?

On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar  wrote:

> the feature dimension is 800k.
>
> yes, I believe the driver memory is likely the problem since it doesn't
> crash until the very last part of the tree aggregation.
>
> I'm running it via pyspark through YARN -- I have to run in client mode so
> I can't set spark.driver.memory -- I've tried setting the
> spark.yarn.am.memory and overhead parameters but it doesn't seem to have an
> effect.
>
> Thanks,
>
> Rok
>
> On Apr 23, 2015, at 7:47 AM, Xiangrui Meng  wrote:
>
> > What is the feature dimension? Did you set the driver memory? -Xiangrui
> >
> > On Tue, Apr 21, 2015 at 6:59 AM, rok  wrote:
> >> I'm trying to use the StandardScaler in pyspark on a relatively small
> (a few
> >> hundred Mb) dataset of sparse vectors with 800k features. The fit
> method of
> >> StandardScaler crashes with Java heap space or Direct buffer memory
> errors.
> >> There should be plenty of memory around -- 10 executors with 2 cores
> each
> >> and 8 Gb per core. I'm giving the executors 9g of memory and have also
> tried
> >> lots of overhead (3g), thinking it might be the array creation in the
> >> aggregators that's causing issues.
> >>
> >> The bizarre thing is that this isn't always reproducible -- sometimes it
> >> actually works without problems. Should I be setting up executors
> >> differently?
> >>
> >> Thanks,
> >>
> >> Rok
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
>
>


Re: StandardScaler failing with OOM errors in PySpark

2015-04-28 Thread Rok Roskar
That's exactly what I'm saying -- I specify the memory options using spark
options, but this is not reflected in how the JVM is created. No matter
which memory settings I specify, the JVM for the driver is always made with
512Mb of memory. So I'm not sure if this is a feature or a bug?

rok

On Mon, Apr 27, 2015 at 6:54 PM, Xiangrui Meng  wrote:

> You might need to specify driver memory in spark-submit instead of
> passing JVM options. spark-submit is designed to handle different
> deployments correctly. -Xiangrui
>
> On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar  wrote:
> > ok yes, I think I have narrowed it down to being a problem with driver
> > memory settings. It looks like the application master/driver is not being
> > launched with the settings specified:
> >
> > For the driver process on the main node I see "-XX:MaxPermSize=128m
> -Xms512m
> > -Xmx512m" as options used to start the JVM, even though I specified
> >
> > 'spark.yarn.am.memory', '5g'
> > 'spark.yarn.am.memoryOverhead', '2000'
> >
> > The info shows that these options were read:
> >
> > 15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with
> 7120 MB
> > memory including 2000 MB overhead
> >
> > Is there some reason why these options are being ignored and instead
> > starting the driver with just 512Mb of heap?
> >
> > On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar  wrote:
> >>
> >> the feature dimension is 800k.
> >>
> >> yes, I believe the driver memory is likely the problem since it doesn't
> >> crash until the very last part of the tree aggregation.
> >>
> >> I'm running it via pyspark through YARN -- I have to run in client mode
> so
> >> I can't set spark.driver.memory -- I've tried setting the
> >> spark.yarn.am.memory and overhead parameters but it doesn't seem to
> have an
> >> effect.
> >>
> >> Thanks,
> >>
> >> Rok
> >>
> >> On Apr 23, 2015, at 7:47 AM, Xiangrui Meng  wrote:
> >>
> >> > What is the feature dimension? Did you set the driver memory?
> -Xiangrui
> >> >
> >> > On Tue, Apr 21, 2015 at 6:59 AM, rok  wrote:
> >> >> I'm trying to use the StandardScaler in pyspark on a relatively small
> >> >> (a few
> >> >> hundred Mb) dataset of sparse vectors with 800k features. The fit
> >> >> method of
> >> >> StandardScaler crashes with Java heap space or Direct buffer memory
> >> >> errors.
> >> >> There should be plenty of memory around -- 10 executors with 2 cores
> >> >> each
> >> >> and 8 Gb per core. I'm giving the executors 9g of memory and have
> also
> >> >> tried
> >> >> lots of overhead (3g), thinking it might be the array creation in the
> >> >> aggregators that's causing issues.
> >> >>
> >> >> The bizarre thing is that this isn't always reproducible -- sometimes
> >> >> it
> >> >> actually works without problems. Should I be setting up executors
> >> >> differently?
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Rok
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> View this message in context:
> >> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
> >> >> Sent from the Apache Spark User List mailing list archive at
> >> >> Nabble.com.
> >> >>
> >> >> -
> >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> >> For additional commands, e-mail: user-h...@spark.apache.org
> >> >>
> >>
> >
>


PySpark, numpy arrays and binary data

2014-08-06 Thread Rok Roskar
Hello,

I'm interested in getting started with Spark to scale our scientific analysis 
package (http://pynbody.github.io) to larger data sets. The package is written 
in Python and makes heavy use of numpy/scipy and related frameworks. I've got a 
couple of questions that I have not been able to find easy answers to despite 
some research efforts... I hope someone here can clarify things for me a bit!

* is there a preferred way to read binary data off a local disk directly into 
an RDD? Our I/O routines are built to read data in chunks and each chunk could 
be read by a different process/RDD, but it's not clear to me how to accomplish 
this with the existing API. Since the idea is to process data sets that don't 
fit into a single node's memory, reading first and then distributing via 
sc.parallelize is obviously not an option. 

* related to the first question -- when an RDD is created by parallelizing a 
numpy array, the array gets serialized and distributed. I see in the source 
that it actually gets written into a file first (!?) -- but surely the Py4J 
bottleneck for python array types (mentioned in the source comment) doesn't 
really apply to numpy arrays? Is it really necessary to dump the data onto disk 
first? Conversely, the collect() seems really slow and I suspect that this is 
due to the combination of disk I/O and python list creation. Are there any ways 
of getting around this if numpy arrays are being used? 


I'd be curious about any other best-practices tips anyone might have for 
running pyspark with numpy data...! 

Thanks!


Rok



Re: PySpark, numpy arrays and binary data

2014-08-07 Thread Rok Roskar
thanks for the quick answer!

> numpy array only can support basic types, so we can not use it during 
> collect()
> by default.
> 

sure, but if you knew that a numpy array went in on one end, you could safely 
use it on the other end, no? Perhaps it would require an extension of the RDD 
class and overriding the colect() method. 

> Could you give a short example about how numpy array is used in your project?
> 

sure -- basically our main data structure is a container class (acts like a 
dictionary) that holds various arrays that represent particle data. Each 
particle has various properties, position, velocity, mass etc. you get at these 
individual properties by calling something like 

s['pos']

where 's' is the container object and 'pos' is the name of the array. A really 
common use case then is to select particles based on their properties and do 
some plotting, or take a slice of the particles, e.g. you might do 

r = np.sqrt((s['pos']**2).sum(axis=1))
ind = np.where(r < 5)
plot(s[ind]['x'], s[ind]['y'])

Internally, the various arrays are kept in a dictionary -- I'm hoping to write 
a class that keeps them in an RDD instead. To the user, this would have to be 
transparent, i.e. if the user wants to get at the data for specific particles, 
she would just have to do 

s['pos'][1,5,10] 

for example, and the data would be fetched for her from the RDD just like it 
would be if she were simply using the usual single-machine version. This is why 
the writing to/from files when retrieving data from the RDD really is a no-go 
-- can you recommend how this can be circumvented? 


>> 
>> * is there a preferred way to read binary data off a local disk directly
>> into an RDD? Our I/O routines are built to read data in chunks and each
>> chunk could be read by a different process/RDD, but it's not clear to me how
>> to accomplish this with the existing API. Since the idea is to process data
>> sets that don't fit into a single node's memory, reading first and then
>> distributing via sc.parallelize is obviously not an option.
> 
> If you already know how to partition the data, then you could use
> sc.parallelize()
> to distribute the description of your data, then read the data in parallel by
> given descriptions.
> 
> For examples, you can partition your data into (path, start, length), then
> 
> partitions = [(path1, start1, length), (path1, start2, length), ...]
> 
> def read_chunk(path, start, length):
>  f = open(path)
>  f.seek(start)
>      data = f.read(length)
>  #processing the data
> 
> rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk)
> 


right... this is totally obvious in retrospect!  Thanks!


Rok




>> * related to the first question -- when an RDD is created by parallelizing a
>> numpy array, the array gets serialized and distributed. I see in the source
>> that it actually gets written into a file first (!?) -- but surely the Py4J
>> bottleneck for python array types (mentioned in the source comment) doesn't
>> really apply to numpy arrays? Is it really necessary to dump the data onto
>> disk first? Conversely, the collect() seems really slow and I suspect that
>> this is due to the combination of disk I/O and python list creation. Are
>> there any ways of getting around this if numpy arrays are being used?
>> 
>> 
>> I'd be curious about any other best-practices tips anyone might have for
>> running pyspark with numpy data...!
>> 
>> Thanks!
>> 
>> 
>> Rok
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



out of memory errors -- per core memory limits?

2014-08-21 Thread Rok Roskar
I am having some issues with processes running out of memory and I'm wondering 
if I'm setting things up incorrectly. 

I am running a job on two nodes with 24 cores and 256Gb of memory each. I start 
the pyspark shell with SPARK_EXECUTOR_MEMORY=210gb. When I run the job with 
anything more than 8 cores, the processes start dying off with out of memory 
errors. But when I watch the memory consumption using top on the two execute 
nodes, the individual processes never seem to exceed the per-core memory and 
the nodes themselves are far from running out of memory. So I'm wondering if 
Spark is setting the per-core memory limit somewhere? 

Thanks,

Rok




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



repartitioning an RDD yielding imbalance

2014-08-28 Thread Rok Roskar
I've got an RDD where each element is a long string (a whole document). I'm 
using pyspark so some of the handy partition-handling functions aren't 
available, and I count the number of elements in each partition with: 

def count_partitions(id, iterator): 
c = sum(1 for _ in iterator)
yield (id,c) 

> rdd.mapPartitionsWithSplit(count_partitions).collectAsMap()

This returns the following: 

{0: 866, 1: 1158, 2: 828, 3: 876}

But if I do: 

> rdd.repartition(8).mapPartitionsWithSplit(count_partitions).collectAsMap()

I get

{0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 3594, 7: 134}

Why this strange redistribution of elements? I'm obviously misunderstanding how 
spark does the partitioning -- is it a problem with having a list of strings as 
an RDD? 

Help vey much appreciated! 

Thanks,

Rok


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: calculating the mean of SparseVector RDD

2015-01-09 Thread Rok Roskar
thanks for the suggestion -- however, looks like this is even slower. With
the small data set I'm using, my aggregate function takes ~ 9 seconds and
the colStats.mean() takes ~ 1 minute. However, I can't get it to run with
the Kyro serializer -- I get the error:

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
required: 8

is there an easy/obvious fix?


On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng  wrote:

> There is some serialization overhead. You can try
>
> https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
> . -Xiangrui
>
> On Wed, Jan 7, 2015 at 9:42 AM, rok  wrote:
> > I have an RDD of SparseVectors and I'd like to calculate the means
> returning
> > a dense vector. I've tried doing this with the following (using pyspark,
> > spark v1.2.0):
> >
> > def aggregate_partition_values(vec1, vec2) :
> > vec1[vec2.indices] += vec2.values
> > return vec1
> >
> > def aggregate_combined_vectors(vec1, vec2) :
> > if all(vec1 == vec2) :
> > # then the vector came from only one partition
> > return vec1
> > else:
> > return vec1 + vec2
> >
> > means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
> > aggregate_combined_vectors)
> > means = means / nvals
> >
> > This turns out to be really slow -- and doesn't seem to depend on how
> many
> > vectors there are so there seems to be some overhead somewhere that I'm
> not
> > understanding. Is there a better way of doing this?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: calculating the mean of SparseVector RDD

2015-01-12 Thread Rok Roskar
This was without using Kryo -- if I use kryo, I got errors about buffer
overflows (see above):

com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
required: 8

Just calling colStats doesn't actually compute those statistics, does it?
It looks like the computation is only carried out once you call the .mean()
method.



On Sat, Jan 10, 2015 at 7:04 AM, Xiangrui Meng  wrote:

> colStats() computes the mean values along with several other summary
> statistics, which makes it slower. How is the performance if you don't
> use kryo? -Xiangrui
>
> On Fri, Jan 9, 2015 at 3:46 AM, Rok Roskar  wrote:
> > thanks for the suggestion -- however, looks like this is even slower.
> With
> > the small data set I'm using, my aggregate function takes ~ 9 seconds and
> > the colStats.mean() takes ~ 1 minute. However, I can't get it to run with
> > the Kyro serializer -- I get the error:
> >
> > com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 5,
> > required: 8
> >
> > is there an easy/obvious fix?
> >
> >
> > On Wed, Jan 7, 2015 at 7:30 PM, Xiangrui Meng  wrote:
> >>
> >> There is some serialization overhead. You can try
> >>
> >>
> https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
> >> . -Xiangrui
> >>
> >> On Wed, Jan 7, 2015 at 9:42 AM, rok  wrote:
> >> > I have an RDD of SparseVectors and I'd like to calculate the means
> >> > returning
> >> > a dense vector. I've tried doing this with the following (using
> pyspark,
> >> > spark v1.2.0):
> >> >
> >> > def aggregate_partition_values(vec1, vec2) :
> >> > vec1[vec2.indices] += vec2.values
> >> > return vec1
> >> >
> >> > def aggregate_combined_vectors(vec1, vec2) :
> >> > if all(vec1 == vec2) :
> >> > # then the vector came from only one partition
> >> > return vec1
> >> > else:
> >> > return vec1 + vec2
> >> >
> >> > means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
> >> > aggregate_combined_vectors)
> >> > means = means / nvals
> >> >
> >> > This turns out to be really slow -- and doesn't seem to depend on how
> >> > many
> >> > vectors there are so there seems to be some overhead somewhere that
> I'm
> >> > not
> >> > understanding. Is there a better way of doing this?
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> >
> http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> > For additional commands, e-mail: user-h...@spark.apache.org
> >> >
> >
> >
>