Nicholas , thank you for your explanation. 

I am also interested in the example that Rishi is asking for.  I am sure
mapPartitions may work , but as Vladimir suggests it may not be the best
option in terms of performance. 

@Vladimir Prus , are you aware of any example about writing a  "custom
physical exec operator"? 

If anyone needs a further explanation for the follow up  question Rishi 
posted , please see the example below : 


import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


val someData = Seq(
  Row(1, 10),
  Row(1, 20),
  Row(1, 11)
)

val schema = List(
  StructField("id", IntegerType, true),
  StructField("score", IntegerType, true)
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(schema)
)

// Goal : Drop duplicates using the "id" as the primary key and keep the
highest "score".

df.sort($"score".desc).dropDuplicates("id").show

== Physical Plan ==
*(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
+- Exchange hashpartitioning(id#191, 200)
   +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
false)])
      +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
         +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
            +- Scan ExistingRDD[id#191,score#192]

This seems to work , but I don't know what are the implications if we use
this approach with a bigger dataset or what are the alternatives. From the
explain output I can see the two Exchanges , so it may not be the best
approach? 







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to