Thanks everyone for the feedback :)

On 2023/09/22 22:55:57 Alex Reid wrote:
> Hi Nick,
> 
> Ryan's suggestions are what I'd recommend as well. Another option would be
> to store the snapshot-id as a table property in the target table (though,
> you introduce the possibility of being out of sync since this will be 2
> separate commits, in case your job fails in between or the 2nd commit
> fails).
> 
> One thing to note about Ryan's suggestion is that you will probably want to
> "walk" the target table's snapshot history to find the most recent snapshot
> that has a last snapshot id property in it as the target table could have
> other commits occurring such as compaction / data file rewrite that will
> also create snapshots.
> 
> Here are a couple incomplete code examples of the incremental read +
> process pattern:
> 
> incremental read using table property
> <https://gist.github.com/ajreid21/e49e19e0fa3389cee4e6b9a0264e4ea2>
> 
> incremental read using snapshot property in commit
> <https://gist.github.com/ajreid21/ebfe8e7c0712112b50af33a8cdcb711b>
> (this one also shows how to set snapshot property using dataframe api and
> sql)
> 
> Cheers,
> Alex
> 
> 
> On Fri, Sep 22, 2023 at 3:36 PM Ryan Blue <bl...@tabular.io> wrote:
> 
> > Nick,
> >
> > We store the latest snapshot ID that has been processed in the change log
> > table in the snapshot summary metadata of the downstream table. Then when
> > we go to run an update, we pick up the starting point from the last
> > snapshot in history and use that as the start-snapshot-id in the
> > DataFrame read.
> >
> > You can store the snapshot ID in the snapshot summary using
> > CommitMetadata.withCommitProperties.
> >
> > For the second issue, you should be able to use MERGE by registering the
> > incoming DataFrame as a temporary table. Then it can be referenced by SQL.
> > df.createOrReplaceTempView("name").
> >
> > Ryan
> >
> > On Fri, Sep 22, 2023 at 2:48 PM Samarth Jain <sa...@gmail.com>
> > wrote:
> >
> >> > What is the recommendation for storing the latest snapshot ID that is
> >> successfully merged into *table*? Ideally this is committed in the same
> >> transaction as the MERGE so that reprocessing is minimized. Does Iceberg
> >> support storing this as table metadata? I do not see any related
> >> information in the Iceberg Table Spec.
> >>
> >> Tagging seems like a good option for this.
> >> https://iceberg.apache.org/docs/latest/branching/. So essentially, run
> >> the MERGE and then tag it with a known name that users can then use for
> >> reading data as of a snapshot that has CDC data merged as opposed to always
> >> reading the latest snapshot.
> >>
> >> MERGE isn't supported in the dataframe API, yet. See
> >> https://github.com/apache/iceberg/issues/3665
> >>
> >>
> >>
> >> On Fri, Sep 22, 2023 at 2:00 PM Nick Del Nano <ni...@gmail.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I am exploring implementing the Hybrid CDC Pattern explained at 29:26
> >>> <https://youtu.be/GM7EvRc7_is?si=mIQ5g2k1uEIMX5DT&t=1766> in Ryan
> >>> Blue's talk CDC patterns in Apache Iceberg
> >>> <https://trino.io/blog/2023/06/30/trino-fest-2023-apacheiceberg.html>.
> >>>
> >>> The use case is:
> >>>
> >>>    1. Stream CDC logs to an append only Iceberg table named
> >>>    *table_changelog* using Flink
> >>>    2. Periodically MERGE the CDC logs from *table_changelog* to *table*
> >>>       1. The rate of merge depends on the table's requirements. For
> >>>       some it may be frequently (hourly), for some it may be infrequent 
> >>> (daily).
> >>>
> >>> I am considering how to implement (2) using Iceberg's incremental read
> >>> <https://iceberg.apache.org/docs/latest/spark-queries/#incremental-read> 
> >>> and
> >>> would appreciate guidance on the following topics:
> >>>
> >>>    1. What is the recommendation for storing the latest snapshot ID
> >>>    that is successfully merged into *table*? Ideally this is committed
> >>>    in the same transaction as the MERGE so that reprocessing is minimized.
> >>>    Does Iceberg support storing this as table metadata? I do not see any
> >>>    related information in the Iceberg Table Spec.
> >>>    2. Use the dataframe API or Spark SQL for the incremental read and
> >>>    MERGE? From the docs, the incremental read examples are using 
> >>> dataframes,
> >>>    and the MERGE uses Spark SQL
> >>>    <https://iceberg.apache.org/docs/latest/spark-writes/#merge-into>.
> >>>    Does either API support both use cases?
> >>>
> >>> Thanks,
> >>> Nick
> >>>
> >>
> >
> > --
> > Ryan Blue
> > Tabular
> >
> 

Reply via email to