Thank you for your prompt response and great examples Sun Rui but I am
still confused about one thing. Do you see any particular reason to not
to merge subsequent limits? Following case

    (limit n (map f (limit m ds)))

could be optimized to:

    (map f (limit n (limit m ds)))

and further to

    (map f (limit (min n m) ds))

couldn't it?


On 08/02/2016 11:57 AM, Sun Rui wrote:
> Based on your code, here is simpler test case on Spark 2.0
>
>     case class my (x: Int)
>     val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
>     val df1 = spark.createDataFrame(rdd)
>     val df2 = df1.limit(1)
>     df1.map { r => r.getAs[Int](0) }.first
>     df2.map { r => r.getAs[Int](0) }.first // Much slower than the
>     previous line
>
> Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so
> check the physical plan of the two cases:
>
>     scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
>     == Physical Plan ==
>     CollectLimit 1
>     +- *SerializeFromObject [input[0, int, true] AS value#124]
>        +- *MapElements <function1>, obj#123: int
>           +- *DeserializeToObject createexternalrow(x#74,
>     StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
>              +- Scan ExistingRDD[x#74]
>
>     scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
>     == Physical Plan ==
>     CollectLimit 1
>     +- *SerializeFromObject [input[0, int, true] AS value#131]
>        +- *MapElements <function1>, obj#130: int
>           +- *DeserializeToObject createexternalrow(x#74,
>     StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
>              +- *GlobalLimit 1
>                 +- Exchange SinglePartition
>                    +- *LocalLimit 1
>                       +- Scan ExistingRDD[x#74]
>
>
> For the first case, it is related to an optimisation in
> the CollectLimitExec physical operator. That is, it will first fetch
> the first partition to get limit number of row, 1 in this case, if not
> satisfied, then fetch more partitions, until the desired limit is
> reached. So generally, if the first partition is not empty, only the
> first partition will be calculated and fetched. Other partitions will
> even not be computed.
>
> However, in the second case, the optimisation in the CollectLimitExec
> does not help, because the previous limit operation involves a shuffle
> operation. All partitions will be computed, and running LocalLimit(1)
> on each partition to get 1 row, and then all partitions are shuffled
> into a single partition. CollectLimitExec will fetch 1 row from the
> resulted single partition.
>
>
>> On Aug 2, 2016, at 09:08, Maciej Szymkiewicz <mszymkiew...@gmail.com
>> <mailto:mszymkiew...@gmail.com>> wrote:
>>
>> Hi everyone,
>>
>> This doesn't look like something expected, does it?
>>
>> http://stackoverflow.com/q/38710018/1560062
>>
>> Quick glance at the UI suggest that there is a shuffle involved and
>> input for first is ShuffledRowRDD.
>> -- 
>> Best regards,
>> Maciej Szymkiewicz
>

-- 
Maciej Szymkiewicz


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to