Nicholas , thank you for your explanation. I am also interested in the example that Rishi is asking for. I am sure mapPartitions may work , but as Vladimir suggests it may not be the best option in terms of performance.
@Vladimir Prus , are you aware of any example about writing a "custom physical exec operator"? If anyone needs a further explanation for the follow up question Rishi posted , please see the example below : import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val someData = Seq( Row(1, 10), Row(1, 20), Row(1, 11) ) val schema = List( StructField("id", IntegerType, true), StructField("score", IntegerType, true) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(someData), StructType(schema) ) // Goal : Drop duplicates using the "id" as the primary key and keep the highest "score". df.sort($"score".desc).dropDuplicates("id").show == Physical Plan == *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)]) +- Exchange hashpartitioning(id#191, 200) +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192, false)]) +- *(1) Sort [score#192 DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200) +- Scan ExistingRDD[id#191,score#192] This seems to work , but I don't know what are the implications if we use this approach with a bigger dataset or what are the alternatives. From the explain output I can see the two Exchanges , so it may not be the best approach? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org