Hi, Could you suggest alternative way of implementing distinct, e.g. via fold or aggregate? Both SQL distinct and RDD distinct fail on my dataset due to overflow of Spark shuffle disk. I have 7 nodes with 300GB dedicated to Spark shuffle each. My dataset is 2B rows, the field which I'm performing distinct has 23 distinct values.
Best regards, Alexander -----Original Message----- From: Olivier Girardot [mailto:o.girar...@lateral-thoughts.com] Sent: Friday, May 08, 2015 12:50 PM To: Michael Armbrust; Olivier Girardot Cc: Reynold Xin; dev@spark.apache.org Subject: Re: DataFrame distinct vs RDD distinct I'll try to reproduce what has been reported to me first :) and I'll let you know. Thanks ! Le jeu. 7 mai 2015 à 21:16, Michael Armbrust <mich...@databricks.com> a écrit : > I'd happily merge a PR that changes the distinct implementation to be > more like Spark core, assuming it includes benchmarks that show better > performance for both the "fits in memory case" and the "too big for > memory case". > > On Thu, May 7, 2015 at 2:23 AM, Olivier Girardot < > o.girar...@lateral-thoughts.com> wrote: > >> Ok, but for the moment, this seems to be killing performances on some >> computations... >> I'll try to give you precise figures on this between rdd and dataframe. >> >> Olivier. >> >> Le jeu. 7 mai 2015 à 10:08, Reynold Xin <r...@databricks.com> a écrit : >> >> > In 1.5, we will most likely just rewrite distinct in SQL to either >> > use >> the >> > Aggregate operator which will benefit from all the Tungsten >> optimizations, >> > or have a Tungsten version of distinct for SQL/DataFrame. >> > >> > On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot < >> > o.girar...@lateral-thoughts.com> wrote: >> > >> >> Hi everyone, >> >> there seems to be different implementations of the "distinct" >> >> feature >> in >> >> DataFrames and RDD and some performance issue with the DataFrame >> distinct >> >> API. >> >> >> >> In RDD.scala : >> >> >> >> def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): >> >> RDD[T] = >> >> withScope { map(x => (x, null)).reduceByKey((x, y) => x, >> >> numPartitions).map(_._1) } >> >> And in DataFrame : >> >> >> >> >> >> case class Distinct(partial: Boolean, child: SparkPlan) extends >> UnaryNode >> >> { >> >> override def output: Seq[Attribute] = child.output override def >> >> requiredChildDistribution: Seq[Distribution] = if (partial) >> >> UnspecifiedDistribution :: Nil else >> ClusteredDistribution(child.output) :: >> >> >> > Nil *override def execute(): RDD[Row] = {** >> child.execute().mapPartitions { >> >> iter =>** val hashSet = new >> >> scala.collection.mutable.HashSet[Row]()* * >> var >> >> currentRow: Row = null** while (iter.hasNext) {** currentRow = >> >> iter.next()** >> >> if (!hashSet.contains(currentRow)) {** >> >> hashSet.add(currentRow.copy())** >> >> }** >> >> }* * hashSet.iterator** }** }*} >> > >> > >> >> >> >> >> >> >> >> >> >> I can try to reproduce more clearly the performance issue, but do >> >> you >> have >> >> any insights into why we can't have the same distinct strategy >> >> between DataFrame and RDD ? >> >> >> >> Regards, >> >> >> >> Olivier. >> >> >> > >> > >