Hi Devs,
this has been raised by Swetha on the user mailing list, which also hit
us recently.
Here is the question again:
*Is it guaranteed that written files are sorted as stated in
**sortWithinPartitions**?*
ds.repartition($"day")
.sortWithinPartitions($"day", $"id")
.write
.partitionBy("day")
.csv("interleaved.csv")
This construct is a common use case to generate partitioned and sorted
files, where downstream systems depend on guaranteed order.
Instead of
0
1
2
3
4
...
9999995
9999996
9999997
9999998
9999999
You get
0
8388608
1
8388609
2
8388610
3
8388611
4
...
1611390
9999998
1611391
9999999
1611392
1611393
1611394
...
8388600
8388601
8388602
8388603
8388604
8388605
8388606
8388607
It used to work until 3.0.3. *Was this guaranteed to work or just
happened to be correct?*
It stopped working with 3.1.0, but we can workaround setting
spark.sql.adaptive.coalescePartitions.enabled="false". *Is that
guaranteed to fix it?*
With 3.2.x and 3.3.x, the workaround does not work. *Is there a workaround?*
It has been fixed in 3.4.0-SNAPSHOT. *Was that fixed intentionally or
accidentally?*
Code to reproduce:
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SaveMode
val ids = 10000000
val days = 10
case class Value(day: Long, id: Long)
val ds = spark.range(days).withColumnRenamed("id",
"day").join(spark.range(ids)).as[Value]
// days * 10 is required, as well as a sufficiently large value for ids
(10m) and day (10)
ds.repartition(days * 10, $"day")
.sortWithinPartitions($"day", $"id")
.write
.partitionBy("day")
.mode(SaveMode.Overwrite)
.csv("interleaved.csv")
val df =
spark.read.schema(Encoders.product[Value].schema).csv("interleaved.csv")
Check the written files are sorted (says OK when they are sorted):
for file in interleaved.csv/day\=*/part-*
do
echo "$(sort -n "$file" | md5sum | cut -d " " -f 1) $file"
done | md5sum -c
Thanks for your background knowledge on this.
Cheers,
Enrico