Re: Limit pyspark.daemon threads

2016-03-26 Thread Sven Krasser
n’t stack up by > implementing flock in the jobs and changing how teardowns of the spark > cluster work as far as failed workers. > > Thanks again, > —Ken > > On Mar 26, 2016, at 4:08 PM, Sven Krasser wrote: > > My understanding is that the spark.executor.cores setting

Re: Limit pyspark.daemon threads

2016-03-26 Thread Sven Krasser
the high load nukes the nodes. I don’t have the > spark.executor.cores set, but will setting that to say, 12 limit the > pyspark threads, or will it just limit the jvm threads? > > Thanks! > Ken > > On Mar 25, 2016, at 9:10 PM, Sven Krasser wrote: > > Hey Ken, > > I also

Re: Testing spark with AWS spot instances

2016-03-25 Thread Sven Krasser
When a spot instance terminates, you lose all data (RDD partitions) stored in the executors that ran on that instance. Spark can recreate the partitions from input data, but if that requires going through multiple preceding shuffles a good chunk of the job will need to be redone. -Sven On Thu, Mar

Re: Limit pyspark.daemon threads

2016-03-25 Thread Sven Krasser
Hey Ken, I also frequently see more pyspark daemons than configured concurrency, often it's a low multiple. (There was an issue pre-1.3.0 that caused this to be quite a bit higher, so make sure you at least have a recent version; see SPARK-5395.) Each pyspark daemon tries to stay below the config

Re: pyspark issue

2015-07-27 Thread Sven Krasser
It expects an iterable, and if you iterate over a string, you get the individual characters. Use a list instead: pyfiles=['/path/to/file'] On Mon, Jul 27, 2015 at 2:40 PM, Naveen Madhire wrote: > Hi, > > I am running pyspark in windows and I am seeing an error while adding > pyfiles to the spark

Re: reduceByKey - add values to a list

2015-06-25 Thread Sven Krasser
On Thu, Jun 25, 2015 at 5:01 PM, Kannappan Sirchabesan wrote: > On Jun 26, 2015, at 12:46 AM, Sven Krasser wrote: > > In that case the reduceByKey operation will likely not give you any > benefit (since you are not aggregating data into smaller values but instead > building the

Re: reduceByKey - add values to a list

2015-06-25 Thread Sven Krasser
and the operation is a associative operation, so minimal shuffle > if done via reduceByKey. > > On Jun 26, 2015, at 12:25 AM, Sven Krasser wrote: > > Hey Kannappan, > > First of all, what is the reason for avoiding groupByKey since this is > exactly what it is for? If you mu

Re: reduceByKey - add values to a list

2015-06-25 Thread Sven Krasser
Hey Kannappan, First of all, what is the reason for avoiding groupByKey since this is exactly what it is for? If you must use reduceByKey with a one-liner, then take a look at this: lambda a,b: (a if type(a) == list else [a]) + (b if type(b) == list else [b]) In contrast to groupByKey, this wo

Re: indexing an RDD [Python]

2015-04-29 Thread Sven Krasser
ibly provide a quick example? > > > > Also, I’m not quite sure how this work, but the resulting RDD should be a > clone, as I may need to modify the values and preserve the original ones. > > > > Thank you, > > > > > > *From:* Sven Krasser [mailto:kras...@gma

Re: Slower performance when bigger memory?

2015-04-29 Thread Sven Krasser
On Mon, Apr 27, 2015 at 7:36 AM, Shuai Zheng wrote: > Thanks. So may I know what is your configuration for more/smaller > executors on r3.8xlarge, how big of the memory that you eventually decide > to give one executor without impact performance (for example: 64g? ). > We're currently using 16 e

Re: indexing an RDD [Python]

2015-04-24 Thread Sven Krasser
The solution depends largely on your use case. I assume the index is in the key. In that case, you can make a second RDD out of the list of indices and then use cogroup() on both. If the list of indices is small, just using filter() will work well. If you need to read back a few select values to

Re: How to debug Spark on Yarn?

2015-04-24 Thread Sven Krasser
On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin wrote: > > Spark 1.3 should have links to the executor logs in the UI while the > application is running. Not yet in the history server, though. You're absolutely correct -- didn't notice it until now. This is a great addition! -- www.skrasser.

Re: How to debug Spark on Yarn?

2015-04-24 Thread Sven Krasser
For #1, click on a worker node on the YARN dashboard. From there, Tools->Local logs->Userlogs has the logs for each application, and you can view them by executor even while an application is running. (This is for Hadoop 2.4, things may have changed in 2.6.) -Sven On Thu, Apr 23, 2015 at 6:27 AM,

Re: Slower performance when bigger memory?

2015-04-24 Thread Sven Krasser
FWIW, I ran into a similar issue on r3.8xlarge nodes and opted for more/smaller executors. Another observation was that one large executor results in less overall read throughput from S3 (using Amazon's EMRFS implementation) in case that matters to your application. -Sven On Thu, Apr 23, 2015 at 1

Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Sven Krasser
Hey Joe, With the ephemeral HDFS, you get the instance store of your worker nodes. For m3.xlarge that will be two 40 GB SSDs local to each instance, which are very fast. For the persistent HDFS, you get whatever EBS volumes the launch script configured. EBS volumes are always network drives, so t

Re: Programmatic Spark 1.2.0 on EMR | S3 filesystem is not working when using

2015-01-30 Thread Sven Krasser
>From your stacktrace it appears that the S3 writer tries to write the data to a temp file on the local file system first. Taking a guess, that local directory doesn't exist or you don't have permissions for it. -Sven On Fri, Jan 30, 2015 at 6:44 AM, Aniket Bhatnagar < aniket.bhatna...@gmail.com>

Re: Define size partitions

2015-01-30 Thread Sven Krasser
You can also use your InputFormat/RecordReader in Spark, e.g. using newAPIHadoopFile. See here: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext . -Sven On Fri, Jan 30, 2015 at 6:50 AM, Guillermo Ortiz wrote: > Hi, > > I want to process some files, there're

Snappy Crash

2015-01-28 Thread Sven Krasser
I'm running into a new issue with Snappy causing a crash (using Spark 1.2.0). Did anyone see this before? -Sven 2015-01-28 16:09:35,448 WARN [shuffle-server-1] storage.MemoryStore (Logging.scala:logWarning(71)) - Failed to reserve initial memory threshold of 1024.0 KB for computing block rdd_45_1

Re: Large number of pyspark.daemon processes

2015-01-27 Thread Sven Krasser
ment-tabpanel#comment-14294570 ). Any ideas to what coalesce() is doing that triggers the creation of additional workers? On Sat, Jan 24, 2015 at 12:27 AM, Sven Krasser wrote: > Hey Davies, > > Sure thing, it's filed here now: > https://issues.apache.org/jira/browse/SPARK-5395 >

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-01-27 Thread Sven Krasser
Since it's an executor running OOM it doesn't look like a container being killed by YARN to me. As a starting point, can you repartition your job into smaller tasks? -Sven On Tue, Jan 27, 2015 at 2:34 PM, Guru Medasani wrote: > Hi Anthony, > > What is the setting of the total amount of memory in

Re: Index wise most frequently occuring element

2015-01-27 Thread Sven Krasser
Use combineByKey. For top 10 as an example (bottom 10 work similarly): add the element to a list. If the list is larger than 10, delete the smallest elements until size is back to 10. -Sven On Tue, Jan 27, 2015 at 3:35 AM, kundan kumar wrote: > I have a an array of the form > > val array: Array[

Re: Large number of pyspark.daemon processes

2015-01-24 Thread Sven Krasser
difference. Thank you! -Sven On Fri, Jan 23, 2015 at 11:52 PM, Davies Liu wrote: > It should be a bug, the Python worker did not exit normally, could you > file a JIRA for this? > > Also, could you show how to reproduce this behavior? > > On Fri, Jan 23, 2015 at 11:45 PM, Sven

Re: Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
change >> that allows PySpark to share a pool of processes instead of starting a new >> one for each task. >> >> -Sandy >> >> On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser wrote: >> >>> Hey all, >>> >>> I am running into a probl

Re: Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
, > > What version of Spark are you running? Recent versions have a change that > allows PySpark to share a pool of processes instead of starting a new one > for each task. > > -Sandy > > On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser wrote: > >> Hey all, >> &

Re: Problems saving a large RDD (1 TB) to S3 as a sequence file

2015-01-23 Thread Sven Krasser
Hey Darin, Are you running this over EMR or as a standalone cluster? I've had occasional success in similar cases by digging through all executor logs and trying to find exceptions that are not caused by the application shutdown (but the logs remain my main pain point with Spark). That aside, ano

Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with 9

Re: Why always spilling to disk and how to improve it?

2015-01-13 Thread Sven Krasser
The distinct call causes a shuffle, which always results in data being written to disk. -Sven On Tue, Jan 13, 2015 at 12:21 PM, Shuai Zheng wrote: > Hi All, > > > > I am trying with some small data set. It is only 200m, and what I am doing > is just do a distinct count on it. > > But there are a

Re: quickly counting the number of rows in a partition?

2015-01-12 Thread Sven Krasser
Yes, using mapPartitionsWithIndex, e.g. in PySpark: >>> sc.parallelize(xrange(0,1000), 4).mapPartitionsWithIndex(lambda idx,iter: ((idx, len(list(iter))),)).collect() [(0, 250), (1, 250), (2, 250), (3, 250)] (This is not the most efficient way to get the length of an iterator, but you get the ide

Re: Trouble with large Yarn job

2015-01-12 Thread Sven Krasser
Anders, This could be related to this open ticket: https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also fixed that for us as a stopgap. Best, -Sven On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg wrote: > Yes sure Sandy, I've checked the logs and it's not a OOM issue. I

Re: OOM exception during row deserialization

2015-01-12 Thread Sven Krasser
Hey Pala, I also find it very hard to get to the bottom of memory issues such as this one based on what's in the logs (so if you come up with some findings, then please share here). In the interim, here are a few things you can try: - Provision more memory per executor. While in theory (and de

Re: Manually trigger RDD map function without action

2015-01-12 Thread Sven Krasser
Hey Kevin, I assume you want to trigger the map() for a side effect (since you don't care about the result). To Cody's point, you can use foreach() *instead* of map(). So instead of e.g. x.map(a => foo(a)).foreach(a => a), you'd run x.foreach(a => foo(a)). Best, -Sven On Mon, Jan 12, 2015 at 5:1

Re: Shuffle Problems in 1.2.0

2015-01-12 Thread Sven Krasser
I've filed a ticket for this issue here: https://issues.apache.org/jira/browse/SPARK-5209. (This reproduces the problem on a smaller cluster size.) -Sven On Wed, Jan 7, 2015 at 11:13 AM, Sven Krasser wrote: > Could you try it on AWS using EMR? That'd give you an exact r

Re: can I buffer flatMap input at each worker node?

2015-01-12 Thread Sven Krasser
Not sure I understand correctly, but it sounds like you're looking for mapPartitions(). -Sven On Mon, Jan 12, 2015 at 10:17 AM, maherrt wrote: > Dear All > > what i want to do is : > as the data is partitioned on many worker nodes I want to be able to > process > this partition of data as a whol

Re: Shuffle Problems in 1.2.0

2015-01-07 Thread Sven Krasser
> but in standalone mode, > it worked fine. > > Could you try to narrow down the possible change of configurations? > > Davies > >> On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser wrote: >> Hey Davies, >> >> Here are some more details on a configuration

Re: Shuffle Problems in 1.2.0

2015-01-06 Thread Sven Krasser
).filter(lambda (x, pc): > pc==3).collect() > > (also, no cache needed anymore) > > Davies > > > > On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser wrote: > > The issue has been sensitive to the number of executors and input data > size. > > I'm using 2 exe

Re: Shuffle Problems in 1.2.0

2015-01-06 Thread Sven Krasser
ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not > reproduce your failure. Should I test it with big memory node? > > On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser wrote: > > Thanks for the input! I've managed to come up with a repro of the error > with > > t

Re: Shuffle Problems in 1.2.0

2015-01-05 Thread Sven Krasser
D: unexpected value: List([B@130dc7ad)” error suggests that > maybe there’s an issue with PySpark’s serialization / tracking of types, > but it’s hard to say from this error trace alone. > > On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) > wrote: > > Hey Josh

Re: Spark or Tachyon: capture data lineage

2015-01-02 Thread Sven Krasser
Agreed with Jerry. Aside from Tachyon, seeing this for general debugging would be very helpful. Haoyuan, is that feature you are referring to related to https://issues.apache.org/jira/browse/SPARK-975? In the interim, I've found the "toDebugString()" method useful (but it renders execution as a t

Re: Shuffle Problems in 1.2.0

2014-12-30 Thread Sven Krasser
ogram that you can share which will allow me > to reproduce this issue? If you have a workload that runs into this, you > should be able to keep iteratively simplifying the job and reducing the > data set size until you hit a fairly minimal reproduction (assuming the > issue is determini

Re: S3 files , Spark job hungsup

2014-12-30 Thread Sven Krasser
This here may also be of help: http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html. Make sure to spread your objects across multiple partitions to not be rate limited by S3. -Sven On Mon, Dec 22, 2014 at 10:20 AM, durga katakam wrote: > Yes . I am reading thousan

Shuffle Problems in 1.2.0

2014-12-30 Thread Sven Krasser
Hey all, Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 fails during shuffle. I've tried reverting from the sort-based shuffle back to the hash one, and that fails as well. Does anyone see similar problems or has an idea on where to look next? For the sort-based shuffle I get a