CVE-2019-10099: Apache Spark unencrypted data on local disk

2019-08-06 Thread Imran Rashid
Severity: Important Vendor: The Apache Software Foundation Versions affected: All Spark 1.x, Spark 2.0.x, Spark 2.1.x, and 2.2.x versions Spark 2.3.0 to 2.3.2 Description: Prior to Spark 2.3.3, in certain situations Spark would write user data to local disk unencrypted, even if spark.io.encryp

Re: [SHUFFLE]FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle

2019-03-12 Thread Imran Rashid
We haven't seen many of these, but we have seen it a couple of times -- there is ongoing work under SPARK-26089 to address the issue we know about, namely that we don't detect corruption in large shuffle blocks. Do you believe the cases you have match that -- does it appear to be corruption in lar

Re: CVE-2018-11760: Apache Spark local privilege escalation vulnerability

2019-01-31 Thread Imran Rashid
I received some questions about what the exact change was which fixed the issue, and the PMC decided to post info in jira to make it easier for the community to track. The relevant details are all on https://issues.apache.org/jira/browse/SPARK-26802 On Mon, Jan 28, 2019 at 1:08 PM Imran Rashid

CVE-2018-11760: Apache Spark local privilege escalation vulnerability

2019-01-28 Thread Imran Rashid
Severity: Important Vendor: The Apache Software Foundation Versions affected: All Spark 1.x, Spark 2.0.x, and Spark 2.1.x versions Spark 2.2.0 to 2.2.2 Spark 2.3.0 to 2.3.1 Description: When using PySpark , it's possible for a different local user to connect to the Spark application and imperson

Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-23 Thread Imran Rashid
Serga, can you explain a bit more why you want this ability? If the node is really bad, wouldn't you want to decomission the NM entirely? If you've got heterogenous resources, than nodelabels seem like they would be more appropriate -- and I don't feel great about adding workarounds for the node-la

Re: Heap Memory in Spark 2.3.0

2018-07-17 Thread Imran Rashid
perhaps this is https://issues.apache.org/jira/browse/SPARK-24578? that was reported as a performance issue, not OOMs, but its in the exact same part of the code and the change was to reduce the memory pressure significantly. On Mon, Jul 16, 2018 at 1:43 PM, Bryan Jeffrey wrote: > Hello. > > I

Re: Spark Job Hangs on our production cluster

2015-08-18 Thread Imran Rashid
sorry, by "repl" I mean "spark-shell", I guess I'm used to them being used interchangeably. From that thread dump, the one thread that isn't stuck is trying to get classes specifically related to the shell / repl: java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketR

Re: Spark Job Hangs on our production cluster

2015-08-18 Thread Imran Rashid
just looking at the thread dump from your original email, the 3 executor threads are all trying to load classes. (One thread is actually loading some class, and the others are blocked waiting to load a class, most likely trying to load the same thing.) That is really weird, definitely not somethi

Re: Spark runs into an Infinite loop even if the tasks are completed successfully

2015-08-13 Thread Imran Rashid
} > } > > partitionsArray > > > > > Thanks > Best Regards > > On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid > wrote: > >> yikes. >> >> Was this a one-time thing? Or does it happen consistently? can you turn >> on debug logging

Re: Spark runs into an Infinite loop even if the tasks are completed successfully

2015-08-12 Thread Imran Rashid
yikes. Was this a one-time thing? Or does it happen consistently? can you turn on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...) On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das wrote: > Hi > > My Spark job (running in local[*] with spark 1.4.1) reads data from a > thrift

Re: takeSample() results in two stages

2015-06-12 Thread Imran Rashid
It launches two jobs because it doesn't know ahead of time how big your RDD is, so it doesn't know what the sampling rate should be. After counting all the records, it can determine what the sampling rate should be -- then it does another pass through the data, sampling by the rate its just determ

Re: flatMap output on disk / flatMap memory overhead

2015-06-09 Thread Imran Rashid
I agree with Richard. It looks like the issue here is shuffling, and shuffle data is always written to disk, so the issue is definitely not that all the output of flatMap has to be stored in memory. If at all possible, I'd first suggest upgrading to a new version of spark -- even in 1.2, there we

Re: Question about Serialization in Storage Level

2015-05-27 Thread Imran Rashid
Hi Zhipeng, yes, your understanding is correct. the "SER" portion just refers to how its stored in memory. On disk, the data always has to be serialized. On Fri, May 22, 2015 at 10:40 PM, Jiang, Zhipeng wrote: > Hi Todd, Howard, > > > > Thanks for your reply, I might not present my question

Re: Spark and logging

2015-05-27 Thread Imran Rashid
only an answer to one of your questions: What about log statements in the > partition processing functions? Will their log statements get logged into > a > file residing on a given 'slave' machine, or will Spark capture this log > output and divert it into the log file of the driver's machine? >

Re: Help reading Spark UI tea leaves..

2015-05-26 Thread Imran Rashid
0) -> x}.partitionBy(new > org.apache.spark.HashPartitioner(10)) > (0 until 5).foreach { idx => > val otherData = sc.parallelize(1 to (idx * 100)).map{ x => (x % 10) -> > x}.partitionBy(new org.apache.spark.HashPartitioner(10)) > println(idx + " ---> " + o

Re: FetchFailedException and MetadataFetchFailedException

2015-05-22 Thread Imran Rashid
at > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) >

Re: EOFException using KryoSerializer

2015-05-19 Thread Imran Rashid
Hi Jim, this is definitley strange. It sure sounds like a bug, but it also is a very commonly used code path, so it at the very least you must be hitting a corner case. Could you share a little more info with us? What version of spark are you using? How big is the object you are trying to broa

Re: org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3

2015-05-19 Thread Imran Rashid
The error you are posting is from attempt 6 for that stage ("stage 1.6"). I've found those files can get corrupted when a stage gets retried and there are multiple attempts. Hopefully that will get fixed soon (SPARK-7308), but in the meantime, I'd look further back in your logs to figure out why a

Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread Imran Rashid
data changes. However, > documentation seems to suggest that one cannot re-broadcast. Is my > understanding accurate? > > Thanks > NB > > > On Mon, May 18, 2015 at 6:24 PM, Imran Rashid > wrote: > >> Rather than "updating" the broadcast variable,

Re: LogisticRegressionWithLBFGS with large feature set

2015-05-18 Thread Imran Rashid
I'm not super familiar with this part of the code, but from taking a quick look: a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles per feature (mean, max, min, etc. etc.) b) The limit is on the result size from *all* tasks, not from one task. You start with 3072 tasks c) t

Re: FetchFailedException and MetadataFetchFailedException

2015-05-18 Thread Imran Rashid
Hi, can you take a look at the logs and see what the first error you are getting is? Its possible that the file doesn't exist when that error is produced, but it shows up later -- I've seen similar things happen, but only after there have already been some errors. But, if you see that in the ver

Re: spark log field clarification

2015-05-18 Thread Imran Rashid
depends what you mean by "output data". Do you mean: * the data that is sent back to the driver? that is "result size" * the shuffle output? that is in "Shuffle Write Metrics" * the data written to a hadoop output format? that is in "Output Metrics" On Thu, May 14, 2015 at 2:22 PM, yanwei wro

Re: Broadcast variables can be rebroadcast?

2015-05-18 Thread Imran Rashid
Rather than "updating" the broadcast variable, can't you simply create a new one? When the old one can be gc'ed in your program, it will also get gc'ed from spark's cache (and all executors). I think this will make your code *slightly* more complicated, as you need to add in another layer of indi

Re: Spark on Yarn : Map outputs lifetime ?

2015-05-18 Thread Imran Rashid
Neither of those two. Instead, the shuffle data is cleaned up when the stage they are from get GC'ed by the jvm. that is, when you are no longer holding any references to anything which points to the old stages, and there is an appropriate gc event. The data is not cleaned up right after the sta

Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted

2015-05-18 Thread Imran Rashid
Looks like this exception is after many more failures have occurred. It is already on attempt 6 for stage 7 -- I'd try to find out why attempt 0 failed. This particular exception is probably a result of corruption that can happen when stages are retried, that I'm working on addressing in https://

Re: applications are still in progress?

2015-05-18 Thread Imran Rashid
Most likely, you never call sc.stop(). Note that in 1.4, this will happen for you automatically in a shutdown hook, taken care of by https://issues.apache.org/jira/browse/SPARK-3090 On Wed, May 13, 2015 at 8:04 AM, Yifan LI wrote: > Hi, > > I have some applications finished(but actually failed

Re: Error communicating with MapOutputTracker

2015-05-18 Thread Imran Rashid
On Fri, May 15, 2015 at 5:09 PM, Thomas Gerber wrote: > Now, we noticed that we get java heap OOM exceptions on the output tracker > when we have too many tasks. I wonder: > 1. where does the map output tracker live? The driver? The master (when > those are not the same)? > 2. how can we increase

Re: parallelism on binary file

2015-05-18 Thread Imran Rashid
You can use sc.hadoopFile (or any of the variants) to do what you want. They even let you reuse your existing HadoopInputFormats. You should be able to mimic your old use with MR just fine. sc.textFile is just a convenience method which sits on top. imran On Fri, May 8, 2015 at 12:03 PM, tog w

Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:

2015-05-06 Thread Imran Rashid
tityObjectIntMap” > when it is resizing (or a similar operation), implying there are too many > object references, though it’s hard to see how I could get to 2b references > from a few million objects... > > T > > On 6 May 2015 at 00:58, Imran Rashid wrote: > >>

Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:

2015-05-05 Thread Imran Rashid
Are you setting a really large max buffer size for kryo? Was this fixed by https://issues.apache.org/jira/browse/SPARK-6405 ? If not, we should open up another issue to get a better warning in these cases. On Tue, May 5, 2015 at 2:47 AM, shahab wrote: > Thanks Tristan for sharing this. Actuall

Re: How to deal with code that runs before foreach block in Apache Spark?

2015-05-05 Thread Imran Rashid
Gerard is totally correct -- to expand a little more, I think what you want to do is a solrInputDocumentJavaRDD.foreachPartition, instead of solrInputDocumentJavaRDD.foreach: solrInputDocumentJavaRDD.foreachPartition( new VoidFunction>() { @Override public void call(Iterator docItr) {

Re: How to skip corrupted avro files

2015-05-05 Thread Imran Rashid
You might be interested in https://issues.apache.org/jira/browse/SPARK-6593 and the discussion around the PRs. This is probably more complicated than what you are looking for, but you could copy the code for HadoopReliableRDD in the PR into your own code and use it, without having to wait for the

Re: Spark job concurrency problem

2015-05-05 Thread Imran Rashid
can you give your entire spark submit command? Are you missing "--executor-cores "? Also, if you intend to use all 6 nodes, you also need "--num-executors 6" On Mon, May 4, 2015 at 2:07 AM, Xi Shen wrote: > Hi, > > I have two small RDD, each has about 600 records. In my code, I did > > val rdd

Re: spark kryo serialization question

2015-05-04 Thread Imran Rashid
yes, you should register all three. The truth is, you only *need* to register classes that will get serialized -- either via RDD caching or in a shuffle. So if a type is only used as an intermediate inside a stage, you don't need to register it. But the overhead of registering extra classes is p

Re: Kryo serialization of classes in additional jars

2015-05-04 Thread Imran Rashid
Oh, this seems like a real pain. You should file a jira, I didn't see an open issue -- if nothing else just to document the issue. As you've noted, the problem is that the serializer is created immediately in the executors, right when the SparkEnv is created, but the other jars aren't downloaded

Re: Extra stage that executes before triggering computation with an action

2015-05-04 Thread Imran Rashid
sortByKey() runs one job to sample the data, to determine what range of keys to put in each partition. There is a jira to change it to defer launching the job until the subsequent action, but it will still execute another stage: https://issues.apache.org/jira/browse/SPARK-1021 On Wed, Apr 29, 20

Re: ReduceByKey and sorting within partitions

2015-05-04 Thread Imran Rashid
oh wow, that is a really interesting observation, Marco & Jerry. I wonder if this is worth exposing in combineByKey()? I think Jerry's proposed workaround is all you can do for now -- use reflection to side-step the fact that the methods you need are private. On Mon, Apr 27, 2015 at 8:07 AM, Sais

Re: Spark partitioning question

2015-05-04 Thread Imran Rashid
Hi Marius, I am also a little confused -- are you saying that myPartitions is basically something like: class MyPartitioner extends Partitioner { def numPartitions = 1 def getPartition(key: Any) = 0 } ?? If so, I don't understand how you'd ever end up data in two partitions. Indeed, than ev

Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?

2015-04-24 Thread Imran Rashid
Another issue is that hadooprdd (which sc.textfile uses) might split input files and even if it doesn't split, it doesn't guarantee that part files numbers go to the corresponding partition number in the rdd. Eg part-0 could go to partition 27 On Apr 24, 2015 7:41 AM, "Michal Michalski" wrote

Re: Can't get SparkListener to work

2015-04-17 Thread Imran Rashid
when you start the spark-shell, its already too late to get the ApplicationStart event. Try listening for StageCompleted or JobEnd instead. On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji < secondorderpolynom...@gmail.com> wrote: > I'm trying to create a simple SparkListener to get notified of e

Re: How to persist RDD return from partitionBy() to disk?

2015-04-17 Thread Imran Rashid
https://issues.apache.org/jira/browse/SPARK-1061 note the proposed fix isn't to have spark automatically know about the partitioner when it reloads the data, but at least to make it *possible* for it to be done at the application level. On Fri, Apr 17, 2015 at 11:35 AM, Wang, Ningjun (LNG-NPV) <

Re: history-server does't read logs which are on FS

2015-04-17 Thread Imran Rashid
are you calling sc.stop() at the end of your applications? The history server only displays completed applications, but if you don't call sc.stop(), it doesn't know that those applications have been stopped. Note that in spark 1.3, the history server can also display running applications (includi

Re: Random pairs / RDD order

2015-04-17 Thread Imran Rashid
if you can store the entire sample for one partition in memory, I think you just want: val sample1 = rdd.sample(true,0.01,42).mapPartitions(scala.util.Random.shuffle) val sample2 = rdd.sample(true,0.01,43) .mapPartitions(scala.util.Random.shuffle) ... On Fri, Apr 17, 2015 at 3:05 AM, Aurélien

Re: Task result in Spark Worker Node

2015-04-17 Thread Imran Rashid
rator(rdd3.partitions(1), context))); > 1 > } > > I was wondering if you had any ideas on what I am doing wrong, or how I > can properly send the serialized version of the RDD and function to my > other program. My thought is that I might need to add more jars to the > build pa

Re: When are TaskCompletionListeners called?

2015-04-17 Thread Imran Rashid
its the latter -- after spark gets to the end of the iterator (or if it hits an exception) so your example is good, that is exactly what it is intended for. On Fri, Apr 17, 2015 at 12:23 PM, Akshat Aranya wrote: > Hi, > > I'm trying to figure out when TaskCompletionListeners are called -- are >

Re: Execption while using kryo with broadcast

2015-04-15 Thread Imran Rashid
list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html > > Can you please suggest any work around I am broad casting HashMap return > from RDD.collectasMap(). > > On 15 April 2015 at 19:33, Imran Ra

Re: Execption while using kryo with broadcast

2015-04-15 Thread Imran Rashid
this is a really strange exception ... I'm especially surprised that it doesn't work w/ java serialization. Do you think you could try to boil it down to a minimal example? On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele wrote: > Yes Without Kryo it did work out.when I remove kryo registrati

Re: Registering classes with KryoSerializer

2015-04-14 Thread Imran Rashid
ot resolve symbol ClassTag$$anon$1 > > Hence I am not any closer to making this work. If you have any further > suggestions, they would be most welcome. > > arun > > > On Tue, Apr 14, 2015 at 2:33 PM, Imran Rashid > wrote: > >> Hi Arun, >> >> It can

Re: Catching executor exception from executor in driver

2015-04-14 Thread Imran Rashid
(+dev) Hi Justin, short answer: no, there is no way to do that. I'm just guessing here, but I imagine this was done to eliminate serialization problems (eg., what if we got an error trying to serialize the user exception to send from the executors back to the driver?). Though, actually that isn'

Re: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer

2015-04-14 Thread Imran Rashid
HI Shuai, I don't think this is a bug with kryo, its just a subtlety with the kryo works. I *think* that it would also work if you changed your PropertiesUtil class to either (a) remove the no-arg constructor or (b) instead of extending properties, you make it a contained member variable. I wish

Re: Registering classes with KryoSerializer

2015-04-14 Thread Imran Rashid
Hi Arun, It can be hard to use kryo with required registration because of issues like this -- there isn't a good way to register all the classes that you need transitively. In this case, it looks like one of your classes has a reference to a ClassTag, which in turn has a reference to some anonymo

Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-14 Thread Imran Rashid
Shuffle write could be a good indication of skew, but it looks like the task in question hasn't generated any shuffle write yet, because its still working on the shuffle-read side. So I wouldn't read too much into the fact that the shuffle write is 0 for a task that is still running. The shuffle

Re: Array[T].distinct doesn't work inside RDD

2015-04-14 Thread Imran Rashid
Interesting, my gut instinct is the same as Sean's. I'd suggest debugging this in plain old scala first, without involving spark. Even just in the scala shell, create one of your Array[T], try calling .toSet and calling .distinct. If those aren't the same, then its got nothing to do with spark.

Re: Regarding benefits of using more than one cpu for a task in spark

2015-04-14 Thread Imran Rashid
Hi twinkle, To be completely honest, I'm not sure, I had never heard "spark.task.cpus" before. But I could imagine two different use cases: a) instead of just relying on spark's creation of tasks for parallelism, a user wants to run multiple threads *within* a task. This is sort of going agains

Re: counters in spark

2015-04-14 Thread Imran Rashid
Hi Robert, A lot of task metrics are already available for individual tasks. You can get these programmatically by registering a SparkListener, and you van also view them in the UI. Eg., for each task, you can see runtime, serialization time, amount of shuffle data read, etc. I'm working on als

Re: Understanding Spark Memory distribution

2015-04-13 Thread Imran Rashid
broadcast variables count towards "spark.storage.memoryFraction", so they use the same "pool" of memory as cached RDDs. That being said, I'm really not sure why you are running into problems, it seems like you have plenty of memory available. Most likely its got nothing to do with broadcast varia

Re: Registering classes with KryoSerializer

2015-04-13 Thread Imran Rashid
Those funny class names come from scala's specialization -- its compiling a different version of OpenHashMap for each primitive you stick in the type parameter. Here's a super simple example: *➜ **~ * more Foo.scala class Foo[@specialized X] *➜ **~ * scalac Foo.scala *➜ **~ * ls Foo*.cl

Re: How to get rdd count() without double evaluation of the RDD?

2015-04-13 Thread Imran Rashid
yes, it sounds like a good use of an accumulator to me val counts = sc.accumulator(0L) rdd.map{x => counts += 1 x }.saveAsObjectFile(file2) On Mon, Mar 30, 2015 at 12:08 PM, Wang, Ningjun (LNG-NPV) < ningjun.w...@lexisnexis.com> wrote: > Sean > > > > Yes I know that I can use persist() to

Re: Task result in Spark Worker Node

2015-04-13 Thread Imran Rashid
On the worker side, it all happens in Executor. The task result is computed here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 then its serialized along with some other goodies, and finally sent ba

Re: Serialization Problem in Spark Program

2015-03-25 Thread Imran Rashid
you also need to register *array*s of MyObject. so change: conf.registerKryoClasses(Array(classOf[MyObject])) to conf.registerKryoClasses(Array(classOf[MyObject], classOf[Array[MyObject]])) On Wed, Mar 25, 2015 at 2:44 AM, donhoff_h <165612...@qq.com> wrote: > Hi, experts > > I wrote a very

Re: spark disk-to-disk

2015-03-24 Thread Imran Rashid
I think writing to hdfs and reading it back again is totally reasonable. In fact, in my experience, writing to hdfs and reading back in actually gives you a good opportunity to handle some other issues as well: a) instead of just writing as an object file, I've found its helpful to write in a form

Re: ShuffleBlockFetcherIterator: Failed to get block(s)

2015-03-20 Thread Imran Rashid
I think you should see some other errors before that, from NettyBlockTransferService, with a msg like "Exception while beginning fetchBlocks". There might be a bit more information there. there are an assortment of possible causes, but first lets just make sure you have all the details from the o

Re: FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded

2015-03-20 Thread Imran Rashid
I think you are running into a combo of https://issues.apache.org/jira/browse/SPARK-5928 and https://issues.apache.org/jira/browse/SPARK-5945 The standard solution is to just increase the number of partitions you are creating. textFile(), reduceByKey(), and sortByKey() all take an optional second

Re: Why I didn't see the benefits of using KryoSerializer

2015-03-20 Thread Imran Rashid
Hi Yong, yes I think your analysis is correct. I'd imagine almost all serializers out there will just convert a string to its utf-8 representation. You might be interested in adding compression on top of a serializer, which would probably bring the string size down in almost all cases, but then

Re: Error communicating with MapOutputTracker

2015-03-20 Thread Imran Rashid
Hi Thomas, sorry for such a late reply. I don't have any super-useful advice, but this seems like something that is important to follow up on. to answer your immediate question, No, there should not be any hard limit to the number of tasks that MapOutputTracker can handle. Though of course as t

Re: Need Advice about reading lots of text files

2015-03-17 Thread Imran Rashid
Interesting, on another thread, I was just arguing that the user should *not* open the files themselves and read them, b/c then they lose all the other goodies we have in HadoopRDD, eg. the metric tracking. I think this encourages Pat's argument that we might actually need better support for this

Re: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-17 Thread Imran Rashid
I'm not super familiar w/ S3, but I think the issue is that you want to use a different output committers with "object" stores, that don't have a simple move operation. There have been a few other threads on S3 & outputcommitters. I think the most relevant for you is most probably this open JIRA:

Re: Process time series RDD after sortByKey

2015-03-16 Thread Imran Rashid
p my own MyGroupingRDD class? I am > not very clear how to do that, any place I can find an example? I never > create my own RDD class before (not RDD instance J). But this is very > valuable approach to me so I am desired to learn. > > > > Regards, > > > > Shuai &g

Re: Process time series RDD after sortByKey

2015-03-16 Thread Imran Rashid
Hi Shuai, On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng wrote: > Sorry I response late. > > Zhan Zhang's solution is very interesting and I look at into it, but it is > not what I want. Basically I want to run the job sequentially and also gain > parallelism. So if possible, if I have 1000 parti

Re: How to preserve/preset partition information when load time series data?

2015-03-16 Thread Imran Rashid
if I try to fake/enforce the partition in my own way. > > Regards, > > Shuai > > On Wed, Mar 11, 2015 at 8:09 PM, Imran Rashid > wrote: > >> It should be *possible* to do what you want ... but if I understand you >> right, there isn't really any very easy way t

Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-12 Thread Imran Rashid
lyCompressedMapStatus]) > > If I don't register it, I get a runtime error saying that it needs to be > registered (the error is only when I turn on kryo). > > However the code is running smoothly with kryo turned off. > > On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid >

Re: can spark take advantage of ordered data?

2015-03-11 Thread Imran Rashid
Hi Jonathan, you might be interested in https://issues.apache.org/jira/browse/SPARK-3655 (not yet available) and https://github.com/tresata/spark-sorted (not part of spark, but it is available right now). Hopefully thats what you are looking for. To the best of my knowledge that covers what is a

Re: Running Spark from Scala source files other than main file

2015-03-11 Thread Imran Rashid
did you forget to specify the main class w/ "--class Main"? though if that was it, you should at least see *some* error message, so I'm confused myself ... On Wed, Mar 11, 2015 at 6:53 AM, Aung Kyaw Htet wrote: > Hi Everyone, > > I am developing a scala app, in which the main object does not ca

Re: saveAsTextFile extremely slow near finish

2015-03-11 Thread Imran Rashid
is your data skewed? Could it be that there are a few keys with a huge number of records? You might consider outputting (recordA, count) (recordB, count) instead of recordA recordA recordA ... you could do this with: input = sc.textFile pairsCounts = input.map{x => (x,1)}.reduceByKey{_ + _}

Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-11 Thread Imran Rashid
ngs don't break. I want > to benefit from the MapOutputTracker fix in 1.2.0. > > On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid wrote: > >> the scala syntax for arrays is Array[T], not T[], so you want to use >> something: >> >> kryo.register(classOf[Array[o

Re: Process time series RDD after sortByKey

2015-03-11 Thread Imran Rashid
this is a very interesting use case. First of all, its worth pointing out that if you really need to process the data sequentially, fundamentally you are limiting the parallelism you can get. Eg., if you need to process the entire data set sequentially, then you can't get any parallelism. If you

Re: Top, takeOrdered, sortByKey

2015-03-11 Thread Imran Rashid
I am not entirely sure I understand your question -- are you saying: * scoring a sample of 50k events is fast * taking the top N scores of 77M events is slow, no matter what N is ? if so, this shouldn't come as a huge surprise. You can't find the top scoring elements (no matter how small N is)

Re: How to preserve/preset partition information when load time series data?

2015-03-11 Thread Imran Rashid
It should be *possible* to do what you want ... but if I understand you right, there isn't really any very easy way to do it. I think you would need to write your own subclass of RDD, which has its own logic on how the input files get put divided among partitions. You can probably subclass Hadoop

Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Imran Rashid
You can set the number of partitions dynamically -- its just a parameter to a method, so you can compute it however you want, it doesn't need to be some static constant: val dataSizeEstimate = yourMagicFunctionToEstimateDataSize() val numberOfPartitions = yourConversionFromDataSizeToNumPartitions(

Re: scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Imran Rashid
This doesn't involve spark at all, I think this is entirely an issue with how scala deals w/ primitives and boxing. Often it can hide the details for you, but IMO it just leads to far more confusing errors when things don't work out. The issue here is that your map has value type Any, which leads

Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-03 Thread Imran Rashid
the scala syntax for arrays is Array[T], not T[], so you want to use something: kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]]) kryo.register(classOf[Array[Short]]) nonetheless, the spark should take care of this itself. I'll look into it later today. On Mon, Mar 2, 2015

Re: Global sequential access of elements in RDD

2015-02-27 Thread Imran Rashid
Why would you want to use spark to sequentially process your entire data set? The entire purpose is to let you do distributed processing -- which means letting partitions get processed simultaneously by different cores / nodes. that being said, occasionally in a bigger pipeline with a lot of dist

Re: How to tell if one RDD depends on another

2015-02-26 Thread Imran Rashid
no, it does not give you transitive dependencies. You'd have to walk the tree of dependencies yourself, but that should just be a few lines. On Thu, Feb 26, 2015 at 3:32 PM, Corey Nolet wrote: > I see the "rdd.dependencies()" function, does that include ALL the > dependencies of an RDD? Is it s

Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Imran Rashid
Hi Yong, mostly correct except for: > >- Since we are doing reduceByKey, shuffling will happen. Data will be >shuffled into 1000 partitions, as we have 1000 unique keys. > > no, you will not get 1000 partitions. Spark has to decide how many partitions to use before it even knows how many

Re: Iterating on RDDs

2015-02-26 Thread Imran Rashid
val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER) // or whatever persistence makes more sense for you ... while(true) { val res = grouped.flatMap(F) res.collect.foreach(func) if(criteria) break } On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan wrote: > H

Re: Cartesian issue with user defined objects

2015-02-26 Thread Imran Rashid
any chance your input RDD is being read from hdfs, and you are running into this issue (in the docs on SparkContext#hadoopFile): * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD or directly passing it to an aggr

Re: GroupByKey causing problem

2015-02-26 Thread Imran Rashid
Hi Tushar, The most scalable option is probably for you to consider doing some approximation. Eg., sample the first to come up with the bucket boundaries. Then you can assign data points to buckets without needing to do a full groupByKey. You could even have more passes which corrects any error

Re: NegativeArraySizeException when doing joins on skewed data

2015-02-26 Thread Imran Rashid
Hi Tristan, at first I thought you were just hitting another instance of https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its entirely related to kryo. Would it be possible for you to try serializing your object using kryo, without involving spark at all? If you are unfamil

Re: Brodcast Variable updated from one transformation and used from another

2015-02-25 Thread Imran Rashid
Hi Yiannis, Broadcast variables are meant for *immutable* data. They are not meant for data structures that you intend to update. (It might *happen* to work when running local mode, though I doubt it, and it would probably be a bug if it did. It will certainly not work when running on a cluster

Re: How to get yarn logs to display in the spark or yarn history-server?

2015-02-24 Thread Imran Rashid
the spark history server and the yarn history server are totally independent. Spark knows nothing about yarn logs, and vice versa, so unfortunately there isn't any way to get all the info in one place. On Tue, Feb 24, 2015 at 12:36 PM, Colin Kincaid Williams wrote: > Looks like in my tired stat

Re: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Imran Rashid
I think you're getting tripped up lazy evaluation and the way stage boundaries work (admittedly its pretty confusing in this case). It is true that up until recently, if you unioned two RDDs with the same partitioner, the result did not have the same partitioner. But that was just fixed here: htt

Re: sorting output of join operation

2015-02-23 Thread Imran Rashid
sortByKey() is the probably the easiest way: import org.apache.spark.SparkContext._ joinedRdd.map{case(word, (file1Counts, file2Counts)) => (file1Counts, (word, file1Counts, file2Counts))}.sortByKey() On Mon, Feb 23, 2015 at 10:41 AM, Anupama Joshi wrote: > Hi , > To simplify my problem - > I

Re: what does "Submitting ... missing tasks from Stage" mean?

2015-02-20 Thread Imran Rashid
yeah, this is just the totally normal message when spark executes something. The first time something is run, all of its tasks are "missing". I would not worry about cases when all tasks aren't "missing" if you're new to spark, its probably an advanced concept that you don't care about. (and wou

Re: Failure on a Pipe operation

2015-02-19 Thread Imran Rashid
The error msg is telling you the exact problem, it can't find "ProgramSIM", the thing you are trying to run Lost task 3520.3 in stage 0.0 (TID 11, compute3.research.dev): java.io.IOException: Cannot run program "ProgramSIM": error=2, No s\ uch file or directory On Thu, Feb 19, 2015 at 5:52 PM, a

Re: Filter data from one RDD based on data from another RDD

2015-02-19 Thread Imran Rashid
the more scalable alternative is to do a join (or a variant like cogroup, leftOuterJoin, subtractByKey etc. found in PairRDDFunctions) the downside is this requires a shuffle of both your RDDs On Thu, Feb 19, 2015 at 3:36 PM, Himanish Kushary wrote: > Hi, > > I have two RDD's with csv data as b

Re: Incorrect number of records after left outer join (I think)

2015-02-19 Thread Imran Rashid
if you have duplicate values for a key, join creates all pairs. Eg. if you 2 values for key X in rdd A & 2 values for key X in rdd B, then a.join(B) will have 4 records for key X On Thu, Feb 19, 2015 at 3:39 PM, Darin McBeath wrote: > Consider the following left outer join > > potentialDailyMod

Re: Some tasks taking too much time to complete in a stage

2015-02-19 Thread Imran Rashid
almost all your data is going to one task. You can see that the shuffle read for task 0 is 153.3 KB, and for most other tasks its just 26B (which is probably just some header saying there are no actual records). You need to ensure your data is more evenly distributed before this step. On Thu, Fe

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
«size of expanded file» is actually the size of all > concatenated input files (probably about 800 GB)? In that case should I > multiply it by the number of files? Or perhaps I'm barking up completely > the wrong tree. > > Joe > > > > > On 19 February 2015 at 14:44,

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
Hi Joe, The issue is not that you have input partitions that are bigger than 2GB -- its just that they are getting cached. You can see in the stack trace, the problem is when you try to read data out of the DiskStore: org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) Also, just b

Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-18 Thread Imran Rashid
Hi Tom, there are a couple of things you can do here to make this more efficient. first, I think you can replace your self-join with a groupByKey. on your example data set, this would give you (1, Iterable(2,3)) (4, Iterable(3)) this reduces the amount of data that needs to be shuffled, and tha

  1   2   >