Sorry for the slow response. I agree with Hamel on #1. GraphFrames are mostly wrappers for GraphX algorithms. There are a few which are not: * BFS: This is an iterative DataFrame alg. Though it has unit tests, I have not pushed it in scaling to see how far it can go. * Belief Propagation example: This uses the conversion to and from an RDD. Not great, but it's really just an example for now.
I definitely want to get this issue fixed ASAP! On Sun, May 15, 2016 at 7:15 AM, Hamel Kothari <hamelkoth...@gmail.com> wrote: > I don't know about the second one but for question #1: > When you convert from a cached DF to an RDD (via a map function or the > "rdd" value) the types are converted from the off-heap types to on-heap > types. If your rows are fairly large/complex this can have a pretty big > performance impact so I would watch out for that. > > On Fri, May 13, 2016 at 5:29 PM Ulanov, Alexander < > alexander.ula...@hpe.com> wrote: > >> Hi Joseph, >> >> >> >> Thank you for the link! Two follow up questions >> >> 1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types >> and cached in off-heap store. It might be quite useful for iterative >> workloads due to lower GC overhead. Then I convert it to RDD and then >> backto DF. Will the resulting DF remain off-heap or it will be on heap as >> regular RDD? >> >> 2)How is the mentioned problem handled in GraphFrames? Suppose, I want to >> use aggregateMessages in the iterative loop, for implementing PageRank. >> >> >> >> Best regards, Alexander >> >> >> >> *From:* Joseph Bradley [mailto:jos...@databricks.com] >> *Sent:* Friday, May 13, 2016 12:38 PM >> *To:* Ulanov, Alexander <alexander.ula...@hpe.com> >> *Cc:* dev@spark.apache.org >> *Subject:* Re: Shrinking the DataFrame lineage >> >> >> >> Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346 >> >> >> >> I don't have a great method currently, but hacks can get around it: >> convert the DataFrame to an RDD and back to truncate the query plan lineage. >> >> >> >> Joseph >> >> >> >> On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander < >> alexander.ula...@hpe.com> wrote: >> >> Dear Spark developers, >> >> >> >> Recently, I was trying to switch my code from RDDs to DataFrames in order >> to compare the performance. The code computes RDD in a loop. I use >> RDD.persist followed by RDD.count to force Spark compute the RDD and cache >> it, so that it does not need to re-compute it on each iteration. However, >> it does not seem to work for DataFrame: >> >> >> >> import scala.util.Random >> >> val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5)) >> >> val edges = sqlContext.createDataFrame(rdd).toDF("from", "to") >> >> val vertices = >> edges.select("from").unionAll(edges.select("to")).distinct().cache() >> >> vertices.count >> >> [Stage 34:=================> (65 + 4) >> / 200] >> >> [Stage 34:========================> (90 + 5) >> / 200] >> >> [Stage 34:==============================> (114 + 4) >> / 200] >> >> [Stage 34:====================================> (137 + 4) >> / 200] >> >> [Stage 34:==========================================> (157 + 4) >> / 200] >> >> [Stage 34:=================================================> (182 + 4) >> / 200] >> >> >> >> res25: Long = 5 >> >> If I run count again, it recomputes it again instead of using the cached >> result: >> >> scala> vertices.count >> >> [Stage 37:=============> (49 + 4) >> / 200] >> >> [Stage 37:==================> (66 + 4) >> / 200] >> >> [Stage 37:========================> (90 + 4) >> / 200] >> >> [Stage 37:=============================> (110 + 4) >> / 200] >> >> [Stage 37:===================================> (133 + 4) >> / 200] >> >> [Stage 37:==========================================> (157 + 4) >> / 200] >> >> [Stage 37:================================================> (178 + 5) >> / 200] >> >> res26: Long = 5 >> >> >> >> Could you suggest how to schrink the DataFrame lineage ? >> >> >> >> Best regards, Alexander >> >> >> >