Based on your code, here is simpler test case on Spark 2.0

case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line

Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so check the 
physical plan of the two cases:

scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
   +- *MapElements <function1>, obj#123: int
      +- *DeserializeToObject createexternalrow(x#74, 
StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
         +- Scan ExistingRDD[x#74]

scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
   +- *MapElements <function1>, obj#130: int
      +- *DeserializeToObject createexternalrow(x#74, 
StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
         +- *GlobalLimit 1
            +- Exchange SinglePartition
               +- *LocalLimit 1
                  +- Scan ExistingRDD[x#74]

For the first case, it is related to an optimisation in the CollectLimitExec 
physical operator. That is, it will first fetch the first partition to get 
limit number of row, 1 in this case, if not satisfied, then fetch more 
partitions, until the desired limit is reached. So generally, if the first 
partition is not empty, only the first partition will be calculated and 
fetched. Other partitions will even not be computed.

However, in the second case, the optimisation in the CollectLimitExec does not 
help, because the previous limit operation involves a shuffle operation. All 
partitions will be computed, and running LocalLimit(1) on each partition to get 
1 row, and then all partitions are shuffled into a single partition. 
CollectLimitExec will fetch 1 row from the resulted single partition.


> On Aug 2, 2016, at 09:08, Maciej Szymkiewicz <mszymkiew...@gmail.com> wrote:
> 
> Hi everyone, 
> This doesn't look like something expected, does it?
> 
> http://stackoverflow.com/q/38710018/1560062 
> <http://stackoverflow.com/q/38710018/1560062>
> Quick glance at the UI suggest that there is a shuffle involved and input for 
> first is ShuffledRowRDD. 
> -- 
> Best regards,
> Maciej Szymkiewicz

Reply via email to