Hi Devs,

this has been raised by Swetha on the user mailing list, which also hit us recently.

Here is the question again:

*Is it guaranteed that written files are sorted as stated in **sortWithinPartitions**?*

ds.repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .csv("interleaved.csv")

This construct is a common use case to generate partitioned and sorted files, where downstream systems depend on guaranteed order.

Instead of

0
1
2
3
4
...
9999995
9999996
9999997
9999998
9999999

You get

0
8388608
1
8388609
2
8388610
3
8388611
4
...
1611390
9999998
1611391
9999999
1611392
1611393
1611394
...
8388600
8388601
8388602
8388603
8388604
8388605
8388606
8388607

It used to work until 3.0.3. *Was this guaranteed to work or just happened to be correct?*

It stopped working with 3.1.0, but we can workaround setting spark.sql.adaptive.coalescePartitions.enabled="false". *Is that guaranteed to fix it?*

With 3.2.x and 3.3.x, the workaround does not work. *Is there a workaround?*

It has been fixed in 3.4.0-SNAPSHOT. *Was that fixed intentionally or accidentally?*


Code to reproduce:

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SaveMode

val ids = 10000000
val days = 10

case class Value(day: Long, id: Long)

val ds = spark.range(days).withColumnRenamed("id", "day").join(spark.range(ids)).as[Value]

// days * 10 is required, as well as a sufficiently large value for ids (10m) and day (10)
ds.repartition(days * 10, $"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .mode(SaveMode.Overwrite)
  .csv("interleaved.csv")

val df = spark.read.schema(Encoders.product[Value].schema).csv("interleaved.csv")

Check the written files are sorted (says OK when they are sorted):

for file in interleaved.csv/day\=*/part-*
do
  echo "$(sort -n "$file" | md5sum | cut -d " " -f 1)  $file"
done | md5sum -c


Thanks for your background knowledge on this.

Cheers,
Enrico

Reply via email to