Thanks everyone for the feedback :)
On 2023/09/22 22:55:57 Alex Reid wrote: > Hi Nick, > > Ryan's suggestions are what I'd recommend as well. Another option would be > to store the snapshot-id as a table property in the target table (though, > you introduce the possibility of being out of sync since this will be 2 > separate commits, in case your job fails in between or the 2nd commit > fails). > > One thing to note about Ryan's suggestion is that you will probably want to > "walk" the target table's snapshot history to find the most recent snapshot > that has a last snapshot id property in it as the target table could have > other commits occurring such as compaction / data file rewrite that will > also create snapshots. > > Here are a couple incomplete code examples of the incremental read + > process pattern: > > incremental read using table property > <https://gist.github.com/ajreid21/e49e19e0fa3389cee4e6b9a0264e4ea2> > > incremental read using snapshot property in commit > <https://gist.github.com/ajreid21/ebfe8e7c0712112b50af33a8cdcb711b> > (this one also shows how to set snapshot property using dataframe api and > sql) > > Cheers, > Alex > > > On Fri, Sep 22, 2023 at 3:36 PM Ryan Blue <bl...@tabular.io> wrote: > > > Nick, > > > > We store the latest snapshot ID that has been processed in the change log > > table in the snapshot summary metadata of the downstream table. Then when > > we go to run an update, we pick up the starting point from the last > > snapshot in history and use that as the start-snapshot-id in the > > DataFrame read. > > > > You can store the snapshot ID in the snapshot summary using > > CommitMetadata.withCommitProperties. > > > > For the second issue, you should be able to use MERGE by registering the > > incoming DataFrame as a temporary table. Then it can be referenced by SQL. > > df.createOrReplaceTempView("name"). > > > > Ryan > > > > On Fri, Sep 22, 2023 at 2:48 PM Samarth Jain <sa...@gmail.com> > > wrote: > > > >> > What is the recommendation for storing the latest snapshot ID that is > >> successfully merged into *table*? Ideally this is committed in the same > >> transaction as the MERGE so that reprocessing is minimized. Does Iceberg > >> support storing this as table metadata? I do not see any related > >> information in the Iceberg Table Spec. > >> > >> Tagging seems like a good option for this. > >> https://iceberg.apache.org/docs/latest/branching/. So essentially, run > >> the MERGE and then tag it with a known name that users can then use for > >> reading data as of a snapshot that has CDC data merged as opposed to always > >> reading the latest snapshot. > >> > >> MERGE isn't supported in the dataframe API, yet. See > >> https://github.com/apache/iceberg/issues/3665 > >> > >> > >> > >> On Fri, Sep 22, 2023 at 2:00 PM Nick Del Nano <ni...@gmail.com> > >> wrote: > >> > >>> Hi, > >>> > >>> I am exploring implementing the Hybrid CDC Pattern explained at 29:26 > >>> <https://youtu.be/GM7EvRc7_is?si=mIQ5g2k1uEIMX5DT&t=1766> in Ryan > >>> Blue's talk CDC patterns in Apache Iceberg > >>> <https://trino.io/blog/2023/06/30/trino-fest-2023-apacheiceberg.html>. > >>> > >>> The use case is: > >>> > >>> 1. Stream CDC logs to an append only Iceberg table named > >>> *table_changelog* using Flink > >>> 2. Periodically MERGE the CDC logs from *table_changelog* to *table* > >>> 1. The rate of merge depends on the table's requirements. For > >>> some it may be frequently (hourly), for some it may be infrequent > >>> (daily). > >>> > >>> I am considering how to implement (2) using Iceberg's incremental read > >>> <https://iceberg.apache.org/docs/latest/spark-queries/#incremental-read> > >>> and > >>> would appreciate guidance on the following topics: > >>> > >>> 1. What is the recommendation for storing the latest snapshot ID > >>> that is successfully merged into *table*? Ideally this is committed > >>> in the same transaction as the MERGE so that reprocessing is minimized. > >>> Does Iceberg support storing this as table metadata? I do not see any > >>> related information in the Iceberg Table Spec. > >>> 2. Use the dataframe API or Spark SQL for the incremental read and > >>> MERGE? From the docs, the incremental read examples are using > >>> dataframes, > >>> and the MERGE uses Spark SQL > >>> <https://iceberg.apache.org/docs/latest/spark-writes/#merge-into>. > >>> Does either API support both use cases? > >>> > >>> Thanks, > >>> Nick > >>> > >> > > > > -- > > Ryan Blue > > Tabular > > >