rdblue closed pull request #6: Support customizing the location where data is written in Spark URL: https://github.com/apache/incubator-iceberg/pull/6
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java index e522f84..0d99c7e 100644 --- a/core/src/main/java/com/netflix/iceberg/TableProperties.java +++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java @@ -67,6 +67,11 @@ public static final String OBJECT_STORE_PATH = "write.object-storage.path"; + // This only applies to files written after this property is set. Files previously written aren't relocated to + // reflect this parameter. + // If not set, defaults to a "data" folder underneath the root path of the table. + public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path"; + public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled"; public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false; } diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java index bed2cf6..c9d3a7b 100644 --- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java +++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java @@ -32,6 +32,7 @@ import com.netflix.iceberg.PartitionSpec; import com.netflix.iceberg.Schema; import com.netflix.iceberg.Table; +import com.netflix.iceberg.TableProperties; import com.netflix.iceberg.avro.Avro; import com.netflix.iceberg.exceptions.RuntimeIOException; import com.netflix.iceberg.hadoop.HadoopInputFile; @@ -164,7 +165,9 @@ private int propertyAsInt(String property, int defaultValue) { } private String dataLocation() { - return new Path(new Path(table.location()), "data").toString(); + return table.properties().getOrDefault( + TableProperties.WRITE_NEW_DATA_LOCATION, + new Path(new Path(table.location()), "data").toString()); } @Override diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java b/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java index f84c6fe..bc74908 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java @@ -38,7 +38,7 @@ protected abstract void writeAndValidate(Schema schema) throws IOException; - private static final StructType SUPPORTED_PRIMITIVES = StructType.of( + protected static final StructType SUPPORTED_PRIMITIVES = StructType.of( required(100, "id", LongType.get()), optional(101, "data", Types.StringType.get()), required(102, "b", Types.BooleanType.get()), diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java index 3b0d32b..05f8f80 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java @@ -32,10 +32,12 @@ import com.netflix.iceberg.spark.data.AvroDataTest; import com.netflix.iceberg.spark.data.RandomData; import com.netflix.iceberg.spark.data.SparkAvroReader; +import com.netflix.iceberg.types.Types; import org.apache.avro.generic.GenericData.Record; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -43,10 +45,12 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; import static com.netflix.iceberg.spark.SparkSchemaUtil.convert; @@ -57,7 +61,7 @@ public class TestDataFrameWrites extends AvroDataTest { private static final Configuration CONF = new Configuration(); - private String format = null; + private final String format; @Parameterized.Parameters public static Object[][] parameters() { @@ -90,23 +94,43 @@ public static void stopSpark() { @Override protected void writeAndValidate(Schema schema) throws IOException { + File location = createTableFolder(); + Table table = createTable(schema, location); + writeAndValidateWithLocations(table, location, new File(location, "data")); + } + + @Test + public void testWriteWithCustomDataLocation() throws IOException { + File location = createTableFolder(); + File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir"); + Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location); + table.updateProperties().set( + TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit(); + writeAndValidateWithLocations(table, location, tablePropertyDataLocation); + } + + private File createTableFolder() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); Assert.assertTrue("Mkdir should succeed", location.mkdirs()); + return location; + } + private Table createTable(Schema schema, File location) { HadoopTables tables = new HadoopTables(CONF); - Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + return tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + } + + private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) throws IOException { Schema tableSchema = table.schema(); // use the table schema because ids are reassigned table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); List<Record> expected = RandomData.generateList(tableSchema, 100, 0L); Dataset<Row> df = createDataset(expected, tableSchema); + DataFrameWriter<?> writer = df.write().format("iceberg").mode("append"); - df.write() - .format("iceberg") - .mode("append") - .save(location.toString()); + writer.save(location.toString()); table.refresh(); @@ -120,6 +144,14 @@ protected void writeAndValidate(Schema schema) throws IOException { for (int i = 0; i < expected.size(); i += 1) { assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i)); } + + table.currentSnapshot().addedFiles().forEach(dataFile -> + Assert.assertTrue( + String.format( + "File should have the parent directory %s, but has: %s.", + expectedDataDir.getAbsolutePath(), + dataFile.path()), + URI.create(dataFile.path().toString()).getPath().startsWith(expectedDataDir.getAbsolutePath()))); } private Dataset<Row> createDataset(List<Record> records, Schema schema) throws IOException { diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java index 4f71ead..a2d105d 100644 --- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java @@ -71,7 +71,6 @@ public static void stopSpark() { public void testBasicWrite() throws IOException { File parent = temp.newFolder("parquet"); File location = new File(parent, "test"); - location.mkdirs(); HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services