Nick, We store the latest snapshot ID that has been processed in the change log table in the snapshot summary metadata of the downstream table. Then when we go to run an update, we pick up the starting point from the last snapshot in history and use that as the start-snapshot-id in the DataFrame read.
You can store the snapshot ID in the snapshot summary using CommitMetadata.withCommitProperties. For the second issue, you should be able to use MERGE by registering the incoming DataFrame as a temporary table. Then it can be referenced by SQL. df.createOrReplaceTempView("name"). Ryan On Fri, Sep 22, 2023 at 2:48 PM Samarth Jain <samarth.j...@gmail.com> wrote: > > What is the recommendation for storing the latest snapshot ID that is > successfully merged into *table*? Ideally this is committed in the same > transaction as the MERGE so that reprocessing is minimized. Does Iceberg > support storing this as table metadata? I do not see any related > information in the Iceberg Table Spec. > > Tagging seems like a good option for this. > https://iceberg.apache.org/docs/latest/branching/. So essentially, run > the MERGE and then tag it with a known name that users can then use for > reading data as of a snapshot that has CDC data merged as opposed to always > reading the latest snapshot. > > MERGE isn't supported in the dataframe API, yet. See > https://github.com/apache/iceberg/issues/3665 > > > > On Fri, Sep 22, 2023 at 2:00 PM Nick Del Nano <nickdeln...@gmail.com> > wrote: > >> Hi, >> >> I am exploring implementing the Hybrid CDC Pattern explained at 29:26 >> <https://youtu.be/GM7EvRc7_is?si=mIQ5g2k1uEIMX5DT&t=1766> in Ryan Blue's >> talk CDC patterns in Apache Iceberg >> <https://trino.io/blog/2023/06/30/trino-fest-2023-apacheiceberg.html>. >> >> The use case is: >> >> 1. Stream CDC logs to an append only Iceberg table named >> *table_changelog* using Flink >> 2. Periodically MERGE the CDC logs from *table_changelog* to *table* >> 1. The rate of merge depends on the table's requirements. For some >> it may be frequently (hourly), for some it may be infrequent (daily). >> >> I am considering how to implement (2) using Iceberg's incremental read >> <https://iceberg.apache.org/docs/latest/spark-queries/#incremental-read> and >> would appreciate guidance on the following topics: >> >> 1. What is the recommendation for storing the latest snapshot ID that >> is successfully merged into *table*? Ideally this is committed in the >> same transaction as the MERGE so that reprocessing is minimized. Does >> Iceberg support storing this as table metadata? I do not see any related >> information in the Iceberg Table Spec. >> 2. Use the dataframe API or Spark SQL for the incremental read and >> MERGE? From the docs, the incremental read examples are using dataframes, >> and the MERGE uses Spark SQL >> <https://iceberg.apache.org/docs/latest/spark-writes/#merge-into>. >> Does either API support both use cases? >> >> Thanks, >> Nick >> > -- Ryan Blue Tabular