Hi Nick,

Ryan's suggestions are what I'd recommend as well. Another option would be
to store the snapshot-id as a table property in the target table (though,
you introduce the possibility of being out of sync since this will be 2
separate commits, in case your job fails in between or the 2nd commit
fails).

One thing to note about Ryan's suggestion is that you will probably want to
"walk" the target table's snapshot history to find the most recent snapshot
that has a last snapshot id property in it as the target table could have
other commits occurring such as compaction / data file rewrite that will
also create snapshots.

Here are a couple incomplete code examples of the incremental read +
process pattern:

incremental read using table property
<https://gist.github.com/ajreid21/e49e19e0fa3389cee4e6b9a0264e4ea2>

incremental read using snapshot property in commit
<https://gist.github.com/ajreid21/ebfe8e7c0712112b50af33a8cdcb711b>
(this one also shows how to set snapshot property using dataframe api and
sql)

Cheers,
Alex


On Fri, Sep 22, 2023 at 3:36 PM Ryan Blue <b...@tabular.io> wrote:

> Nick,
>
> We store the latest snapshot ID that has been processed in the change log
> table in the snapshot summary metadata of the downstream table. Then when
> we go to run an update, we pick up the starting point from the last
> snapshot in history and use that as the start-snapshot-id in the
> DataFrame read.
>
> You can store the snapshot ID in the snapshot summary using
> CommitMetadata.withCommitProperties.
>
> For the second issue, you should be able to use MERGE by registering the
> incoming DataFrame as a temporary table. Then it can be referenced by SQL.
> df.createOrReplaceTempView("name").
>
> Ryan
>
> On Fri, Sep 22, 2023 at 2:48 PM Samarth Jain <samarth.j...@gmail.com>
> wrote:
>
>> > What is the recommendation for storing the latest snapshot ID that is
>> successfully merged into *table*? Ideally this is committed in the same
>> transaction as the MERGE so that reprocessing is minimized. Does Iceberg
>> support storing this as table metadata? I do not see any related
>> information in the Iceberg Table Spec.
>>
>> Tagging seems like a good option for this.
>> https://iceberg.apache.org/docs/latest/branching/. So essentially, run
>> the MERGE and then tag it with a known name that users can then use for
>> reading data as of a snapshot that has CDC data merged as opposed to always
>> reading the latest snapshot.
>>
>> MERGE isn't supported in the dataframe API, yet. See
>> https://github.com/apache/iceberg/issues/3665
>>
>>
>>
>> On Fri, Sep 22, 2023 at 2:00 PM Nick Del Nano <nickdeln...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am exploring implementing the Hybrid CDC Pattern explained at 29:26
>>> <https://youtu.be/GM7EvRc7_is?si=mIQ5g2k1uEIMX5DT&t=1766> in Ryan
>>> Blue's talk CDC patterns in Apache Iceberg
>>> <https://trino.io/blog/2023/06/30/trino-fest-2023-apacheiceberg.html>.
>>>
>>> The use case is:
>>>
>>>    1. Stream CDC logs to an append only Iceberg table named
>>>    *table_changelog* using Flink
>>>    2. Periodically MERGE the CDC logs from *table_changelog* to *table*
>>>       1. The rate of merge depends on the table's requirements. For
>>>       some it may be frequently (hourly), for some it may be infrequent 
>>> (daily).
>>>
>>> I am considering how to implement (2) using Iceberg's incremental read
>>> <https://iceberg.apache.org/docs/latest/spark-queries/#incremental-read> and
>>> would appreciate guidance on the following topics:
>>>
>>>    1. What is the recommendation for storing the latest snapshot ID
>>>    that is successfully merged into *table*? Ideally this is committed
>>>    in the same transaction as the MERGE so that reprocessing is minimized.
>>>    Does Iceberg support storing this as table metadata? I do not see any
>>>    related information in the Iceberg Table Spec.
>>>    2. Use the dataframe API or Spark SQL for the incremental read and
>>>    MERGE? From the docs, the incremental read examples are using dataframes,
>>>    and the MERGE uses Spark SQL
>>>    <https://iceberg.apache.org/docs/latest/spark-writes/#merge-into>.
>>>    Does either API support both use cases?
>>>
>>> Thanks,
>>> Nick
>>>
>>
>
> --
> Ryan Blue
> Tabular
>

Reply via email to