I miss that part. Thanks for the explanation.  It is a challenging problem 
implementation wise.  To do it programmatically,

1. pre-analyze all DAGs to form a complete DAG with root as the source, and 
leaf as all actions.
2. Any RDD(node) that has more than one downstream nodes needs to be marked as 
cached.
3. After each action is done, remove it from the graph and clean up the nodes 
that does not have downstream nodes.

Implementation:

1. Needs to construct the graph at every RDD transformation (internal node and 
edge) and actions (leaf).
2. In each runJob, identify the actions and remove it from the graph, and clean 
up the cache.


Take yours as the example, the graph is construct as below:


RDD1——>output
    |
    |_____RDD2___output


Thanks.

Zhan Zhang


On Feb 26, 2015, at 4:20 PM, Corey Nolet 
<cjno...@gmail.com<mailto:cjno...@gmail.com>> wrote:


Ted. That one I know. It was the dependency part I was curious about

On Feb 26, 2015 7:12 PM, "Ted Yu" 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:
bq. whether or not rdd1 is a cached rdd

RDD has getStorageLevel method which would return the RDD's current storage 
level.

SparkContext has this method:
   * Return information about what RDDs are cached, if they are in mem or on 
disk, how much space
   * they take, etc.
   */
  @DeveloperApi
  def getRDDStorageInfo: Array[RDDInfo] = {

Cheers

On Thu, Feb 26, 2015 at 4:00 PM, Corey Nolet 
<cjno...@gmail.com<mailto:cjno...@gmail.com>> wrote:
Zhan,

This is exactly what I'm trying to do except, as I metnioned in my first 
message, I am being given rdd1 and rdd2 only and I don't necessarily know at 
that point whether or not rdd1 is a cached rdd. Further, I don't know at that 
point whether or not rdd2 depends on rdd1.

On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang 
<zzh...@hortonworks.com<mailto:zzh...@hortonworks.com>> wrote:
In this case, it is slow to wait for rdd1.saveAsHasoopFile(...)  to finish 
probably due to writing to hdfs.  a walk around for this particular case may be 
as follows.

val rdd1 = ......cache()

val rdd2 = rdd1.map().....()
rdd1.count
future { rdd1.saveAsHasoopFile(...) }
future { rdd2.saveAsHadoopFile(…)]

In this way, rdd1 will be calculated once, and two saveAsHadoopFile will happen 
concurrently.

Thanks.

Zhan Zhang



On Feb 26, 2015, at 3:28 PM, Corey Nolet 
<cjno...@gmail.com<mailto:cjno...@gmail.com>> wrote:

> What confused me is  the statement of "The final result is that rdd1 is 
> calculated twice.” Is it the expected behavior?

To be perfectly honest, performing an action on a cached RDD in two different 
threads and having them (at the partition level) block until the parent are 
cached would be the behavior and myself and all my coworkers expected.

On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet 
<cjno...@gmail.com<mailto:cjno...@gmail.com>> wrote:
I should probably mention that my example case is much over simplified- Let's 
say I've got a tree, a fairly complex one where I begin a series of jobs at the 
root which calculates a bunch of really really complex joins and as I move down 
the tree, I'm creating reports from the data that's already been joined (i've 
implemented logic to determine when cached items can be cleaned up, e.g. the 
last report has been done in a subtree).

My issue is that the 'actions' on the rdds are currently being implemented in a 
single thread- even if I'm waiting on a cache to complete fully before I run 
the "children" jobs, I'm still in a better placed than I was because I'm able 
to run those jobs concurrently- right now this is not the case.

> What you want is for a request for partition X to wait if partition X is 
> already being calculated in a persisted RDD.

I totally agree and if I could get it so that it's waiting at the granularity 
of the partition, I'd be in a much much better place. I feel like I'm going 
down a rabbit hole and working against the Spark API.


On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen 
<so...@cloudera.com<mailto:so...@cloudera.com>> wrote:
To distill this a bit further, I don't think you actually want rdd2 to
wait on rdd1 in this case. What you want is for a request for
partition X to wait if partition X is already being calculated in a
persisted RDD. Otherwise the first partition of rdd2 waits on the
final partition of rdd1 even when the rest is ready.

That is probably usually a good idea in almost all cases. That much, I
don't know how hard it is to implement. But I speculate that it's
easier to deal with it at that level than as a function of the
dependency graph.

On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet 
<cjno...@gmail.com<mailto:cjno...@gmail.com>> wrote:
> I'm trying to do the scheduling myself now- to determine that rdd2 depends
> on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can
> do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure
> this out so I don't need to think about all this.






Reply via email to