I have a dataset that have the following schema: (timestamp, partitionKey, logValue)
I want to have the dataset to be sorted by timestamp, but write to file in the follow directory layout: outputDir/partitionKey/files The output file only contains logValue, that is, timestamp is used for sorting only and is not used for output. (FYI, logValue contains textual representation of timestamp which is not sortable) My first attempt is to use DataFrameWriter.partitionBy: dataset .sort("timestamp") .select("partitionKey", "logValue") .write() .partitionBy("partitionKey") .text("output"); However, as mentioned in SPARK-44512 ( https://issues.apache.org/jira/browse/SPARK-44512), this does not guarantee the output is globally sorted. (note: I found that even setting spark.sql.optimizer.plannedWrite.enabled=false still does not guarantee sorted result in low memory environment) And the developers say DataFrameWriter.partitionBy does not guarantee sorted results: "Although I understand Apache Spark 3.4.0 changes the behavior like the above, I don't think there is a contract that Apache Spark's `partitionBy` operation preserves the previous ordering." To workaround this problem, I have to resort to creating a hadoop output format by extending org.apache.hadoop.mapred.lib.MultipleTextOutputFormat and output the file by saveAsHadoopFile: import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; public final class PartitionedMultipleTextOutputFormat<K extends Object, V> extends MultipleTextOutputFormat<Object, V> { @SuppressWarnings("MissingJavadocMethod") public PartitionedMultipleTextOutputFormat() { super(); } @Override protected Object generateActualKey(final Object key, final V value) { return NullWritable.get(); } @Override protected String generateFileNameForKeyValue(final Object key, final V value, final String leaf) { return new Path(key.toString(), leaf).toString(); } } private static Tuple2<String, Text> mapRDDToDomainLogPair(final Row row) { final String domain = row.getAs(" partitionKey "); final var log = (String) row.getAs("logValue"); final var logTextClass = new Text(log); return new Tuple2<String, Text>(domain, logTextClass); } dataset .sort("timestamp") .javaRDD() .mapToPair(TheClass::mapRDDToDomainLogPair) .saveAsHadoopFile(hdfsTmpPath, String.class, Text.class, PartitionedMultipleTextOutputFormat.class, GzipCodec.class); Which seems a little bit hacky. Does anyone have another better method?