Pushing down across mapping would be great. If you're used to SQL or
work frequently with lazy collections this is a behavior you learn to
expect.
On 08/02/2016 02:12 PM, Sun Rui wrote:
> Spark does optimise subsequent limits, for example:
>
> scala> df1.limit(3).limit(1).explain
> == Phys
Spark does optimise subsequent limits, for example:
scala> df1.limit(3).limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [assertnotnull(input[0, $line14.$read$$iw$$iw$my,
true], top level non-flat input object).x AS x#2]
+- Scan ExternalRDDScan[obj#1]
However, limit
Thank you for your prompt response and great examples Sun Rui but I am
still confused about one thing. Do you see any particular reason to not
to merge subsequent limits? Following case
(limit n (map f (limit m ds)))
could be optimized to:
(map f (limit n (limit m ds)))
and further to
Based on your code, here is simpler test case on Spark 2.0
case class my (x: Int)
val rdd = sc.parallelize(0.until(1), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slow
Hi everyone,
This doesn't look like something expected, does it?
http://stackoverflow.com/q/38710018/1560062
Quick glance at the UI suggest that there is a shuffle involved and
input for first is ShuffledRowRDD.
--
Best regards,
Maciej Szymkiewicz