+Iceberg dev list (moved to Apache) Hi Sudsport,
The short answer is that you should sort your data by key to group partitions together. The reason you have to do this is to prevent Iceberg tasks from creating a lot of tiny files. Iceberg won't keep multiple files open at once to avoid the memory overhead, so you need to group rows before you pass the data to Iceberg. In the future, Spark will detect the structure of your Iceberg table and group data to minimize files automatically. rb On Fri, Feb 1, 2019 at 4:22 PM sudsport s <sudssf2...@gmail.com> wrote: > If I replace > > val spec = PartitionSpec.builderFor(icebergSchema).identity("key").build > > > to > > val spec = PartitionSpec.builderFor(icebergSchema).build > > > > code runs successfully but create flat parquet file and no partitions are > created. > > On Friday, February 1, 2019 at 4:20:49 PM UTC-8, sudsport s wrote: >> >> I created simple example from test code >> >> >> package com.netflix.iceberg.spark >> >> import java.io.File >> import java.nio.file.Path >> >> /** >> * >> */ >> object TestTable extends App { >> >> case class TestData(id: String, value1: String, key: Int) >> >> import java.io.IOException >> import java.nio.file.{Files, LinkOption} >> >> @throws[IOException] >> def deleteDirectoryRecursion(path: Path): Unit = { >> if (Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) try { >> val entries = Files.newDirectoryStream(path) >> try { >> import scala.collection.JavaConversions._ >> for (entry <- entries) { >> deleteDirectoryRecursion(entry) >> } >> } finally if (entries != null) entries.close() >> } >> Files.delete(path) >> } >> >> def test(): Unit = { >> >> >> //import com.netflix.iceberg._ >> import com.netflix.iceberg.PartitionSpec >> import com.netflix.iceberg.hadoop.HadoopTables >> import org.apache.spark.sql.SparkSession >> >> val spark = SparkSession >> .builder >> .appName("test app") >> .master("local") >> .getOrCreate >> val location = "/tmp/test2" >> try { >> deleteDirectoryRecursion(new File(location).toPath) >> } catch { >> case ex: Throwable => >> >> } >> // Simple case class to cast the data >> >> import org.apache.spark.sql.catalyst.ScalaReflection >> import org.apache.spark.sql.types.StructType >> val schema = >> ScalaReflection.schemaFor[TestData].dataType.asInstanceOf[StructType] >> >> val testData = Stream.from(1000).map(x => TestData("test" + x, "test" >> + (x % 10), x % 10)).take(1000) >> >> val testDataP = spark.sparkContext.parallelize(testData, 10) >> val df = spark.createDataFrame(testDataP) >> >> val icebergSchema = SparkSchemaUtil.convert(df.schema) >> >> val CONF = spark.sparkContext.hadoopConfiguration >> val tables = new HadoopTables(CONF) >> >> >> val spec = >> PartitionSpec.builderFor(icebergSchema).identity("key").build >> val table = tables.create(icebergSchema, spec, location.toString) >> >> >> df.write.partitionBy("key").format("iceberg").mode("append").save(location.toString) >> >> table.refresh >> >> val result = spark.read.format("iceberg").load(location.toString) >> >> //val actual = >> result.orderBy("key").as(Encoders.bean(classOf[TestData])).collectAsList >> >> println(s"Count is ${result.count()}") >> } >> >> test() >> } >> >> >> >> >> I get following exception >> >> >> java.lang.IllegalStateException: Already closed file for partition: key=0 >> looks like due to partition already exists >> >> >> is this correct way to write partitioned dataframe? if this is real issue >> how can I help to fix it? >> >> >> >> >> >> -- > You received this message because you are subscribed to the Google Groups > "Iceberg Developers" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to iceberg-devel+unsubscr...@googlegroups.com. > To post to this group, send email to iceberg-de...@googlegroups.com. > To view this discussion on the web visit > https://groups.google.com/d/msgid/iceberg-devel/e978fc12-0f89-4609-921c-062aaf74112e%40googlegroups.com > <https://groups.google.com/d/msgid/iceberg-devel/e978fc12-0f89-4609-921c-062aaf74112e%40googlegroups.com?utm_medium=email&utm_source=footer> > . > For more options, visit https://groups.google.com/d/optout. > -- Ryan Blue Software Engineer Netflix