Thanks for forwarding this, Elliot. I had missed it. The problem here is that a writer doesn't have its incoming data grouped by partition. To avoid taking a ton of memory and failing, Iceberg doesn't keep more than one output file open at a time. When it sees a record for a new partition, Iceberg closes the current file and opens a new one. The trade-off is that this could cause a huge number of output files if the data isn't grouped by partition key, so Iceberg checks to make sure that isn't happening. When it detects rows that aren't grouped, it throws this exception instead of creating new output files.
The solution is to repartition and/or sort your output rows. In this case, you'd probably want to sort by author and published. Sorting by author groups the data together for the partitioning. Adding a column like published to the sort is a good idea to avoid skew -- it creates sort keys that are usually unique, so Spark can spread data across tasks (each key must go to one task). Eventually, we want your Iceberg table what sort order it expects so that Spark will do the ordering automatically, but this isn't supported in Spark yet. rb On Fri, Oct 18, 2019 at 1:26 AM Elliot West <tea...@gmail.com> wrote: > Apologies - bumping this because it did not seem to arrive in my inbox. > > On 2019/10/11 15:57:53, Christine Mathiesen <t-cmathie...@hotels.com> > wrote: > > Hello! > > I've been testing out the use of Iceberg with Spark by writing some > basic Java test classes, but I've hit an issue with one of the methods from > PartitionSpec . In a quick summary, I've got a small set of data that looks > like this: > > > > {"title":"Gone", "author": "Michael Grant", "published": 1541776051} > > {"title":"Carry On", "author": "Rainbow Rowell", "published": 1536505651} > > {"title":"Wayward Son", "author": "Rainbow Rowell", "published": > 1504969651} > > > > I would like to be able to partition this small set of data with: > > > > PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) > > .identity("author") > > .build(); > > > > However, when I try to run this simple test, I get the error: > > > > java.lang.IllegalStateException: Already closed files for partition: > author=Rainbow+Rowell > > at > org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:505) > > at > org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:476) > > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run > > > > > > With partitioning on `author`, I was expecting the 2 bottom records to > go into one folder, and the first to go into another. Is there anything > here that I'm doing wrong? I'm really stumped as to why I'm getting an > error when trying to partition on that column. > > > > For reference, here’s the rest of what’s in the test class. > > > > Schema SCHEMA = new Schema( > > optional(1, "title", Types.StringType.get()), > > optional(2, "author", Types.StringType.get()), > > optional(3, "published", Types.TimestampType.withZone()) > > ); > > > > HadoopTables tables = new HadoopTables(CONF); > > PartitionSpec spec = PartitionSpec.builderFor(SCHEMA) > > .identity("author") > > .build(); > > > > Table table = tables.create(SCHEMA, spec, location.toString()); > > > > Dataset<Row> df = spark.read().json("src/test/resources/books.json"); > > > > df.select(df.col("title") df.col("author"), > df.col("published").cast(DataTypes.TimestampType).write() > > .format("iceberg") > > .mode("append") > > .save(location.toString()); > > > > table.refresh(); > > > > I'm using the spark-runtime-jar generated from commit: > 57b10995aade1362e582cef68b20014023556501 (from Oct 3) and I'm using Spark > version 2.4.4. > > > > Thank you for your time!! > > > > Christine Mathiesen > > Software Engineer Intern > > BDP – Hotels.com > > Expedia Group > > > -- Ryan Blue Software Engineer Netflix