I have a dataset that have the following schema:
(timestamp, partitionKey, logValue)

I want to have the dataset to be sorted by timestamp, but write to file in
the follow directory layout:
outputDir/partitionKey/files
The output file only contains logValue, that is, timestamp is used for
sorting only and is not used for output.
(FYI, logValue contains textual representation of timestamp which is not
sortable)

My first attempt is to use DataFrameWriter.partitionBy:
dataset
.sort("timestamp")
.select("partitionKey", "logValue")
.write()
.partitionBy("partitionKey")
.text("output");

However, as mentioned in SPARK-44512 (
https://issues.apache.org/jira/browse/SPARK-44512), this does not guarantee
the output is globally sorted.
(note: I found that even setting
spark.sql.optimizer.plannedWrite.enabled=false still does not guarantee
sorted result in low memory environment)

And the developers say DataFrameWriter.partitionBy does not guarantee
sorted results:
"Although I understand Apache Spark 3.4.0 changes the behavior like the
above, I don't think there is a contract that Apache Spark's `partitionBy`
operation preserves the previous ordering."

To workaround this problem, I have to resort to creating a hadoop output
format by extending org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
and output the file by saveAsHadoopFile:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

public final class PartitionedMultipleTextOutputFormat<K extends Object, V>
extends MultipleTextOutputFormat<Object, V> {
    @SuppressWarnings("MissingJavadocMethod")
    public PartitionedMultipleTextOutputFormat() {
        super();
    }

    @Override
    protected Object generateActualKey(final Object key, final V value) {
        return NullWritable.get();
    }

    @Override
    protected String generateFileNameForKeyValue(final Object key, final V
value, final String leaf) {
        return new Path(key.toString(), leaf).toString();
    }
}

private static Tuple2<String, Text> mapRDDToDomainLogPair(final Row row) {
    final String domain = row.getAs(" partitionKey ");
    final var log = (String) row.getAs("logValue");
    final var logTextClass = new Text(log);
    return new Tuple2<String, Text>(domain, logTextClass);
}

dataset
.sort("timestamp")
.javaRDD()
.mapToPair(TheClass::mapRDDToDomainLogPair)
.saveAsHadoopFile(hdfsTmpPath, String.class, Text.class,
    PartitionedMultipleTextOutputFormat.class, GzipCodec.class);

Which seems a little bit hacky.
Does anyone have another better method?

Reply via email to