In 1.5, we will most likely just rewrite distinct in SQL to either use the
Aggregate operator which will benefit from all the Tungsten optimizations,
or have a Tungsten version of distinct for SQL/DataFrame.

On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> Hi everyone,
> there seems to be different implementations of the "distinct" feature in
> DataFrames and RDD and some performance issue with the DataFrame distinct
> API.
>
> In RDD.scala :
>
> def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
> =
> withScope { map(x => (x, null)).reduceByKey((x, y) => x,
> numPartitions).map(_._1) }
> And in DataFrame :
>
>
> case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
> override def output: Seq[Attribute] = child.output override def
> requiredChildDistribution: Seq[Distribution] = if (partial)
> UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) ::
> Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions {
> iter =>** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var
> currentRow: Row = null** while (iter.hasNext) {** currentRow =
> iter.next()**
> if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }**
> }* * hashSet.iterator** }** }*}
>
>
>
>
> I can try to reproduce more clearly the performance issue, but do you have
> any insights into why we can't have the same distinct strategy between
> DataFrame and RDD ?
>
> Regards,
>
> Olivier.
>

Reply via email to