Thank you for your prompt response and great examples Sun Rui but I am still confused about one thing. Do you see any particular reason to not to merge subsequent limits? Following case
(limit n (map f (limit m ds))) could be optimized to: (map f (limit n (limit m ds))) and further to (map f (limit (min n m) ds)) couldn't it? On 08/02/2016 11:57 AM, Sun Rui wrote: > Based on your code, here is simpler test case on Spark 2.0 > > case class my (x: Int) > val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) } > val df1 = spark.createDataFrame(rdd) > val df2 = df1.limit(1) > df1.map { r => r.getAs[Int](0) }.first > df2.map { r => r.getAs[Int](0) }.first // Much slower than the > previous line > > Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so > check the physical plan of the two cases: > > scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain > == Physical Plan == > CollectLimit 1 > +- *SerializeFromObject [input[0, int, true] AS value#124] > +- *MapElements <function1>, obj#123: int > +- *DeserializeToObject createexternalrow(x#74, > StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row > +- Scan ExistingRDD[x#74] > > scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain > == Physical Plan == > CollectLimit 1 > +- *SerializeFromObject [input[0, int, true] AS value#131] > +- *MapElements <function1>, obj#130: int > +- *DeserializeToObject createexternalrow(x#74, > StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row > +- *GlobalLimit 1 > +- Exchange SinglePartition > +- *LocalLimit 1 > +- Scan ExistingRDD[x#74] > > > For the first case, it is related to an optimisation in > the CollectLimitExec physical operator. That is, it will first fetch > the first partition to get limit number of row, 1 in this case, if not > satisfied, then fetch more partitions, until the desired limit is > reached. So generally, if the first partition is not empty, only the > first partition will be calculated and fetched. Other partitions will > even not be computed. > > However, in the second case, the optimisation in the CollectLimitExec > does not help, because the previous limit operation involves a shuffle > operation. All partitions will be computed, and running LocalLimit(1) > on each partition to get 1 row, and then all partitions are shuffled > into a single partition. CollectLimitExec will fetch 1 row from the > resulted single partition. > > >> On Aug 2, 2016, at 09:08, Maciej Szymkiewicz <mszymkiew...@gmail.com >> <mailto:mszymkiew...@gmail.com>> wrote: >> >> Hi everyone, >> >> This doesn't look like something expected, does it? >> >> http://stackoverflow.com/q/38710018/1560062 >> >> Quick glance at the UI suggest that there is a shuffle involved and >> input for first is ShuffledRowRDD. >> -- >> Best regards, >> Maciej Szymkiewicz > -- Maciej Szymkiewicz --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org