Apologies - bumping this because it did not seem to arrive in my inbox.
On 2019/10/11 15:57:53, Christine Mathiesen <t-cmathie...@hotels.com> wrote: > Hello! > I've been testing out the use of Iceberg with Spark by writing some basic > Java test classes, but I've hit an issue with one of the methods from > PartitionSpec . In a quick summary, I've got a small set of data that looks > like this: > > {"title":"Gone", "author": "Michael Grant", "published": 1541776051} > {"title":"Carry On", "author": "Rainbow Rowell", "published": 1536505651} > {"title":"Wayward Son", "author": "Rainbow Rowell", "published": 1504969651} > > I would like to be able to partition this small set of data with: > > PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) > .identity("author") > .build(); > > However, when I try to run this simple test, I get the error: > > java.lang.IllegalStateException: Already closed files for partition: > author=Rainbow+Rowell > at > org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:505) > at > org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:476) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run > > > With partitioning on `author`, I was expecting the 2 bottom records to go > into one folder, and the first to go into another. Is there anything here > that I'm doing wrong? I'm really stumped as to why I'm getting an error when > trying to partition on that column. > > For reference, here’s the rest of what’s in the test class. > > Schema SCHEMA = new Schema( > optional(1, "title", Types.StringType.get()), > optional(2, "author", Types.StringType.get()), > optional(3, "published", Types.TimestampType.withZone()) > ); > > HadoopTables tables = new HadoopTables(CONF); > PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) > .identity("author") > .build(); > > Table table = tables.create(SCHEMA, spec, location.toString()); > > Dataset<Row> df = spark.read().json("src/test/resources/books.json"); > > df.select(df.col("title") df.col("author"), > df.col("published").cast(DataTypes.TimestampType).write() > .format("iceberg") > .mode("append") > .save(location.toString()); > > table.refresh(); > > I'm using the spark-runtime-jar generated from commit: > 57b10995aade1362e582cef68b20014023556501 (from Oct 3) and I'm using Spark > version 2.4.4. > > Thank you for your time!! > > Christine Mathiesen > Software Engineer Intern > BDP – Hotels.com > Expedia Group >