Hello!
I've been testing out the use of Iceberg with Spark by writing some basic Java 
test classes, but I've hit an issue with one of the methods from PartitionSpec 
. In a quick summary, I've got a small set of data that looks like this:

{"title":"Gone", "author": "Michael Grant", "published": 1541776051}
{"title":"Carry On", "author": "Rainbow Rowell", "published": 1536505651}
{"title":"Wayward Son", "author": "Rainbow Rowell", "published": 1504969651}

I would like to be able to partition this small set of data with:

PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
        .identity("author")
        .build();

However, when I try to run this simple test, I get the error:

java.lang.IllegalStateException: Already closed files for partition: 
author=Rainbow+Rowell
                at 
org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:505)
                at 
org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:476)
                at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run


With partitioning on `author`, I was expecting the 2 bottom records to go into 
one folder, and the first to go into another. Is there anything here that I'm 
doing wrong? I'm really stumped as to why I'm getting an error when trying to 
partition on that column.

For reference, here’s the rest of what’s in the test class.

    Schema SCHEMA = new Schema(
        optional(1, "title", Types.StringType.get()),
        optional(2, "author", Types.StringType.get()),
        optional(3, "published", Types.TimestampType.withZone())
    );

    HadoopTables tables = new HadoopTables(CONF);
    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
        .identity("author")
        .build();

    Table table = tables.create(SCHEMA, spec, location.toString());

    Dataset<Row> df = spark.read().json("src/test/resources/books.json");

    df.select(df.col("title") df.col("author"), 
df.col("published").cast(DataTypes.TimestampType).write()
        .format("iceberg")
        .mode("append")
        .save(location.toString());

    table.refresh();

I'm using the spark-runtime-jar generated from commit: 
57b10995aade1362e582cef68b20014023556501 (from Oct 3) and I'm using Spark 
version 2.4.4.

Thank you for your time!!

Christine Mathiesen
Software Engineer Intern
BDP – Hotels.com
Expedia Group

Reply via email to