Adding the Apache Iceberg dev list, which is new one.

Venkata,

Looks like progress! The next step is to add a sort to your data to cluster
it by partition key. Writing Parquet takes a lot of memory, so Iceberg
keeps just one file open at a time. But if data that should go into the
same file isn’t grouped together, Iceberg would create thousands of tiny
files. So Iceberg adds a check to ensure that you’ve clustered the data
together by partition. Eventually, Spark should alter the query plan to
write into an Iceberg table to do this automatically, but we’re not quite
there yet.

For now, you can fix the problem by adding a global sort:

ice.sort($"key", $"yr", $"mon",
$"dt").write.format("iceberg").mode("append").save(...)

This prepares the data correctly, and also tends to have better wall time
because of the skew estimation that is done for the sort. In fact, our
internal recommendation is to always add a sort when writing data from
Spark.

If you also sort by columns you will be filtering by, then Iceberg can do
extra split filtering on reads. I also recommend having a unique ID column
or something with high cardinality at the end of your sort so that Spark
can break up tasks at a small granularity.

rb

On Wed, Feb 6, 2019 at 2:07 PM Venkatakrishnan Sowrirajan <vsowr...@asu.edu>
wrote:

> Ok. I tried creating a table using Iceberg's table API before writing (or
> appending) data to that location using Spark Dataframe write. But it still
> failed with the below exception.
>
> 1. Table created using Iceberg tables API.
>
>     val path = "s3://dev.canopydata.com/vsowrira/iceberg/"
>     // remove the temp table if it already exists
>     val conf = spark.sparkContext.hadoopConfiguration
>     val fs = new Path(path).getFileSystem(conf)
>     fs.delete(new Path(path), true /* recursive */ )
>
>     // create the temp table using Spark utils to create a schema and
> partition spec
>     val tables = new HadoopTables(conf)
>     val schema = SparkSchemaUtil.schemaForTable(spark, "default.qubole10")
>     val spec = SparkSchemaUtil.specForTable(spark, "default.qubole10")
>
>     tables.create(schema, spec, path)
>
>     // show the schema
>     tables.load(path).schema
>
> 2. Read and write to new Iceberg table.
>
> val ice =
> spark.read.format("com.netflix.iceberg.spark.source.IcebergSource").load("s3://
> dev.canopydata.com/vsowrira/qubole10/")
>
> ice.write.format("com.netflix.iceberg.spark.source.IcebergSource").mode("append").save("s3://
> dev.canopydata.com/vsowrira/iceberg/")
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task
> 0.0 in stage 0.0 (TID 0, localhost, executor driver):
> java.lang.IllegalStateException: Already closed file for partition:
> key=1/yr=2017/mon=1/dt=5
> at
> com.netflix.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:397)
> at
> com.netflix.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:358)
> at
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
> at
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1507)
> at
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
> at
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
> at
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:403)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1473)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
>
> I see the this line with WARN - LOG.warn("Duplicate key: {} == {}",
> existingKey, key);
>
> Is this happening because there are duplicate partition names? Thats why I
> understood from the code in the Writer.java.
>
> Regards
> Venkata krishnan
>
>
> On Tue, Feb 5, 2019 at 4:40 PM 'Ryan Blue' via Iceberg Developers <
> iceberg-de...@googlegroups.com> wrote:
>
>> Hi Venkata,
>>
>> The problem here is that you're asking Iceberg to append to a table that
>> doesn't exist. Iceberg assumes that an append operation will write to an
>> existing table. When it can't find that table, it throws the exception.
>>
>> Unfortunately, Spark can only ask Iceberg to append data right now. Spark
>> only supports using the "save" action in DataSourceWriter with v2 data
>> sources, which includes Iceberg. Those v2 sources are using a new Spark API
>> that hasn't been finished yet, so not all actions are available. Only
>> append works in the latest Spark release.
>>
>> We're working on updating Spark so that SQL works with Iceberg. We have
>> full SQL support here at Netflix and we are trying to get it into the
>> upstream community. You can follow overwrite support here:
>> https://github.com/apache/spark/pull/23606
>>
>> Until that's done, I recommend using Iceberg APIs to create tables and
>> delete data, and Spark to append and query those tables.
>>
>> rb
>>
>> On Tue, Feb 5, 2019 at 2:28 PM Venkatakrishnan Sowrirajan <
>> vsowr...@asu.edu> wrote:
>>
>>> Forwarding to old Iceberg-devel Google group if some one not moved to
>>> the apache mailing list.
>>>
>>> Hi Guys,
>>>
>>> I am new to Iceberg and I was just playing around with the example in
>>> the repo. I tried creating Iceberg table out of existing spark's parquet
>>> table. This part worked fine. But I was trying to experiment with the write
>>> and hitting some issues. I am not sure whether I am using it right or not.
>>> Pleas let me know what you think.
>>>
>>> Here is what I'm doing.
>>>
>>> 1. Created an Iceberg table on the existing parquet table using the
>>> below APIs.
>>>
>>>     val path = "s3://dev.canopydata.com/vsowrira/qubole10/"
>>>     val tables = new HadoopTables(conf)
>>>     val schema = SparkSchemaUtil.schemaForTable(spark,
>>> "default.qubole100")
>>>     val spec = SparkSchemaUtil.specForTable(spark, "default.qubole100")
>>>
>>>     tables.create(schema, spec, path)
>>>
>>>     // show the schema
>>>     tables.load(path).schema
>>>
>>> 2. Listed partitions from the table and loaded metrics of the files and
>>> finally appended that to the table.
>>>
>>> 3. Now tried reading the table using Iceberg Datasource. This also works
>>> fine.
>>>
>>> > val ice =
>>> spark.read.format("com.netflix.iceberg.spark.source.IcebergSource").load("s3://
>>> dev.canopydata.com/vsowrira/qubole10/")
>>>
>>> 4. But when I try to write this -
>>> > ice.write.format("com.netflix.iceberg.spark.source.IcebergSource").mode("append").save("s3://
>>> dev.canopydata.com/vsowrira/iceberg/")
>>>
>>> This is failing with the exception, 
>>> com.netflix.iceberg.exceptions.NoSuchTableException:
>>> Table does not exist at location: s3://
>>> dev.canopydata.com/vsowrira/iceberg/
>>>   at com.netflix.iceberg.hadoop.HadoopTables.load(HadoopTables.java:63)
>>>   at
>>> com.netflix.iceberg.spark.source.IcebergSource.findTable(IcebergSource.java:104)
>>>   at
>>> com.netflix.iceberg.spark.source.IcebergSource.getTableAndResolveHadoopConfiguration(IcebergSource.java:125)
>>>   at
>>> com.netflix.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:70)
>>>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:260)
>>>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234)
>>>   ... 60 elided
>>>
>>> I'm assuming Iceberg can also create and write the table using the
>>> Iceberg datasource with out explicitly creating a table out of the final
>>> path using the "Tables" API in Iceberg.
>>>
>>> Is my assumption right? Please let me know if I'm doing something really
>>> stupid here.
>>>
>>> Thanks
>>> Venkata krishnan
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Iceberg Developers" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to iceberg-devel+unsubscr...@googlegroups.com.
>>> To post to this group, send email to iceberg-de...@googlegroups.com.
>>> To view this discussion on the web visit
>>> https://groups.google.com/d/msgid/iceberg-devel/CAOc3vAMJL1gsF0pVhqMoi4vWY0SwZRebtS0TCNmFKYPiLRCs1w%40mail.gmail.com
>>> <https://groups.google.com/d/msgid/iceberg-devel/CAOc3vAMJL1gsF0pVhqMoi4vWY0SwZRebtS0TCNmFKYPiLRCs1w%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>> .
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "Iceberg Developers" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to iceberg-devel+unsubscr...@googlegroups.com.
>> To post to this group, send email to iceberg-de...@googlegroups.com.
>> To view this discussion on the web visit
>> https://groups.google.com/d/msgid/iceberg-devel/CAO4re1%3DfR_D_qCHi1m-iDO5v0Rdc4qfDT3WDH3Xi0SQYYekwww%40mail.gmail.com
>> <https://groups.google.com/d/msgid/iceberg-devel/CAO4re1%3DfR_D_qCHi1m-iDO5v0Rdc4qfDT3WDH3Xi0SQYYekwww%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to