Hi Ryan and Alex, First, thanks again for your advice on this. CommitProperties was what I am looking for.
I'm working on an end-to-end example of the Hybrid CDC method to explore the efficiency and tradeoffs. I like the properties of it. Now I am exploring the read path and have a question. I have extended <https://github.com/nickdelnano/apache-iceberg-examples/blob/incremental-read-merge/src/main/java/org/ndelnano/example/IncrementalReadMerge.java#L65> Alex's example (basically filling in the FIXME's) to incrementally read and merge commits from a source table to target, storing the last processed snapshot ID in the snapshot summary. Pretty neat. Next I would define a view that queries the source mirror and CDC table, and appropriately reconciles any records in the CDC table that are not yet reflected in the source mirror table. I'd call this "in memory MERGE using a view". This part is straightforward, at least for a client viewing the latest state of the table. I'm exploring an optimization. One of the benefits of this Hybrid CDC method is continuous time travel. I would like *read** optimized* continuous time travel -- "in memory MERGE using a view" for the latest snapshot and *past table snapshots*. Why? Time travel to any point in time is great, but without what I describe, it is only "time travel to any point in time by reading the whole changelog table, or read optimized time travel to any point after the most recent MERGE". An example ... A tool like Kafka Connect or Flink is sinking a single partition Kafka topic into an Iceberg table. It contains CDC events from an upstream source like a relational database. The data in the topic is ordered by a field `event_time`. Given an input value for `event_time`, call it `x` 1. Pick the most recent table snapshot with max(event_time) <= `x` 2. Use the "in memory MERGE using a view" technique but select the Iceberg snapshot from (1) as the base table. Filter the CDC table records using `x`. For users that access Iceberg tables via a client library, I could include this logic there. E.g. spark users that use my library as a pyspark abstraction. This sounds good. For users that access via a SQL client, like AWS Athena, (1) is more difficult as SQL is less flexible than a general purpose language. I have the ideas: put the logic into a SQL view and/or function, create a view per snapshot and name them with predictable structure (business_read_optimized_commit_01_03_2023_10_00_00). I think the former is possible with a CASE statement <https://trino.io/docs/current/routines/case.html>, but Athena has limited support for this via Lambdas. In my experience that interface is not very good, I'd avoid it if possible. I dislike the latter. I thought I'd check with you and ask ... Is this a way that the Iceberg community is solving CDC? Am I on the right track here? Are there other ways of solving this problem? Is there a simple or clever way to implement this for SQL clients? If I can sort this out for SQL clients, I'd feel closer to getting the best of all worlds with this approach. Cheers, Nick On Mon, Sep 25, 2023 at 12:21 PM Nick Del Nano <nickdeln...@gmail.com> wrote: > Thanks everyone for the feedback :) > > On 2023/09/22 22:55:57 Alex Reid wrote: > > Hi Nick, > > > > Ryan's suggestions are what I'd recommend as well. Another option would > be > > to store the snapshot-id as a table property in the target table (though, > > you introduce the possibility of being out of sync since this will be 2 > > separate commits, in case your job fails in between or the 2nd commit > > fails). > > > > One thing to note about Ryan's suggestion is that you will probably want > to > > "walk" the target table's snapshot history to find the most recent > snapshot > > that has a last snapshot id property in it as the target table could have > > other commits occurring such as compaction / data file rewrite that will > > also create snapshots. > > > > Here are a couple incomplete code examples of the incremental read + > > process pattern: > > > > incremental read using table property > > <https://gist.github.com/ajreid21/e49e19e0fa3389cee4e6b9a0264e4ea2> > > > > incremental read using snapshot property in commit > > <https://gist.github.com/ajreid21/ebfe8e7c0712112b50af33a8cdcb711b> > > (this one also shows how to set snapshot property using dataframe api and > > sql) > > > > Cheers, > > Alex > > > > > > On Fri, Sep 22, 2023 at 3:36 PM Ryan Blue <bl...@tabular.io> wrote: > > > > > Nick, > > > > > > We store the latest snapshot ID that has been processed in the change > log > > > table in the snapshot summary metadata of the downstream table. Then > when > > > we go to run an update, we pick up the starting point from the last > > > snapshot in history and use that as the start-snapshot-id in the > > > DataFrame read. > > > > > > You can store the snapshot ID in the snapshot summary using > > > CommitMetadata.withCommitProperties. > > > > > > For the second issue, you should be able to use MERGE by registering > the > > > incoming DataFrame as a temporary table. Then it can be referenced by > SQL. > > > df.createOrReplaceTempView("name"). > > > > > > Ryan > > > > > > On Fri, Sep 22, 2023 at 2:48 PM Samarth Jain <sa...@gmail.com> > > > wrote: > > > > > >> > What is the recommendation for storing the latest snapshot ID that > is > > >> successfully merged into *table*? Ideally this is committed in the > same > > >> transaction as the MERGE so that reprocessing is minimized. Does > Iceberg > > >> support storing this as table metadata? I do not see any related > > >> information in the Iceberg Table Spec. > > >> > > >> Tagging seems like a good option for this. > > >> https://iceberg.apache.org/docs/latest/branching/. So essentially, > run > > >> the MERGE and then tag it with a known name that users can then use > for > > >> reading data as of a snapshot that has CDC data merged as opposed to > always > > >> reading the latest snapshot. > > >> > > >> MERGE isn't supported in the dataframe API, yet. See > > >> https://github.com/apache/iceberg/issues/3665 > > >> > > >> > > >> > > >> On Fri, Sep 22, 2023 at 2:00 PM Nick Del Nano <ni...@gmail.com> > > >> wrote: > > >> > > >>> Hi, > > >>> > > >>> I am exploring implementing the Hybrid CDC Pattern explained at 29:26 > > >>> <https://youtu.be/GM7EvRc7_is?si=mIQ5g2k1uEIMX5DT&t=1766> in Ryan > > >>> Blue's talk CDC patterns in Apache Iceberg > > >>> <https://trino.io/blog/2023/06/30/trino-fest-2023-apacheiceberg.html > >. > > >>> > > >>> The use case is: > > >>> > > >>> 1. Stream CDC logs to an append only Iceberg table named > > >>> *table_changelog* using Flink > > >>> 2. Periodically MERGE the CDC logs from *table_changelog* to > *table* > > >>> 1. The rate of merge depends on the table's requirements. For > > >>> some it may be frequently (hourly), for some it may be > infrequent (daily). > > >>> > > >>> I am considering how to implement (2) using Iceberg's incremental > read > > >>> < > https://iceberg.apache.org/docs/latest/spark-queries/#incremental-read> > and > > >>> would appreciate guidance on the following topics: > > >>> > > >>> 1. What is the recommendation for storing the latest snapshot ID > > >>> that is successfully merged into *table*? Ideally this is > committed > > >>> in the same transaction as the MERGE so that reprocessing is > minimized. > > >>> Does Iceberg support storing this as table metadata? I do not see > any > > >>> related information in the Iceberg Table Spec. > > >>> 2. Use the dataframe API or Spark SQL for the incremental read and > > >>> MERGE? From the docs, the incremental read examples are using > dataframes, > > >>> and the MERGE uses Spark SQL > > >>> <https://iceberg.apache.org/docs/latest/spark-writes/#merge-into > >. > > >>> Does either API support both use cases? > > >>> > > >>> Thanks, > > >>> Nick > > >>> > > >> > > > > > > -- > > > Ryan Blue > > > Tabular > > > > >