bq. whether or not rdd1 is a cached rdd RDD has getStorageLevel method which would return the RDD's current storage level.
SparkContext has this method: * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ @DeveloperApi def getRDDStorageInfo: Array[RDDInfo] = { Cheers On Thu, Feb 26, 2015 at 4:00 PM, Corey Nolet <cjno...@gmail.com> wrote: > Zhan, > > This is exactly what I'm trying to do except, as I metnioned in my first > message, I am being given rdd1 and rdd2 only and I don't necessarily know > at that point whether or not rdd1 is a cached rdd. Further, I don't know at > that point whether or not rdd2 depends on rdd1. > > On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang <zzh...@hortonworks.com> > wrote: > >> In this case, it is slow to wait for rdd1.saveAsHasoopFile(...) to >> finish probably due to writing to hdfs. a walk around for this particular >> case may be as follows. >> >> val rdd1 = ......cache() >> >> val rdd2 = rdd1.map().....() >> rdd1.count >> future { rdd1.saveAsHasoopFile(...) } >> future { rdd2.saveAsHadoopFile(…)] >> >> In this way, rdd1 will be calculated once, and two saveAsHadoopFile >> will happen concurrently. >> >> Thanks. >> >> Zhan Zhang >> >> >> >> On Feb 26, 2015, at 3:28 PM, Corey Nolet <cjno...@gmail.com> wrote: >> >> > What confused me is the statement of *"The final result is that rdd1 >> is calculated twice.” *Is it the expected behavior? >> >> To be perfectly honest, performing an action on a cached RDD in two >> different threads and having them (at the partition level) block until the >> parent are cached would be the behavior and myself and all my coworkers >> expected. >> >> On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet <cjno...@gmail.com> wrote: >> >>> I should probably mention that my example case is much over >>> simplified- Let's say I've got a tree, a fairly complex one where I begin a >>> series of jobs at the root which calculates a bunch of really really >>> complex joins and as I move down the tree, I'm creating reports from the >>> data that's already been joined (i've implemented logic to determine when >>> cached items can be cleaned up, e.g. the last report has been done in a >>> subtree). >>> >>> My issue is that the 'actions' on the rdds are currently being >>> implemented in a single thread- even if I'm waiting on a cache to complete >>> fully before I run the "children" jobs, I'm still in a better placed than I >>> was because I'm able to run those jobs concurrently- right now this is not >>> the case. >>> >>> > What you want is for a request for partition X to wait if partition X >>> is already being calculated in a persisted RDD. >>> >>> I totally agree and if I could get it so that it's waiting at the >>> granularity of the partition, I'd be in a much much better place. I feel >>> like I'm going down a rabbit hole and working against the Spark API. >>> >>> >>> On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen <so...@cloudera.com> wrote: >>> >>>> To distill this a bit further, I don't think you actually want rdd2 to >>>> wait on rdd1 in this case. What you want is for a request for >>>> partition X to wait if partition X is already being calculated in a >>>> persisted RDD. Otherwise the first partition of rdd2 waits on the >>>> final partition of rdd1 even when the rest is ready. >>>> >>>> That is probably usually a good idea in almost all cases. That much, I >>>> don't know how hard it is to implement. But I speculate that it's >>>> easier to deal with it at that level than as a function of the >>>> dependency graph. >>>> >>>> On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet <cjno...@gmail.com> >>>> wrote: >>>> > I'm trying to do the scheduling myself now- to determine that rdd2 >>>> depends >>>> > on rdd1 and rdd1 is a persistent RDD (storage level != None) so that >>>> I can >>>> > do the no-op on rdd1 before I run rdd2. I would much rather the DAG >>>> figure >>>> > this out so I don't need to think about all this. >>>> >>> >>> >> >> >