Just a note that the functionality to compute net changes was added by
Yufei only in Iceberg 1.4.0, in #7326
<https://github.com/apache/iceberg/pull/7326>.

On Thu, Aug 22, 2024 at 12:48 PM Wing Yew Poon <wyp...@cloudera.com> wrote:

> Peter,
>
> The Spark procedure is implemented by CreateChangelogViewProcedure.java
> <https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java>.
> This was already added by Yufei in Iceberg 1.2.0.
> ChangelogIterator
> <https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java>
>  is
> a base class that contains static methods such as the removeNetCarryovers I
> mentioned; RemoveNetCarryoverIterator
> <https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java>
>  is
> a subclass that computes the net changes.
> These are Spark specific as they work with iterators of
> org.apache.spark.sql.Row.
>
> BaseIncrementalChangelogScan
> <https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java>
>  is
> a common building block that can be used by other engines than Spark; it
> powers the Spark ChangelogRowReader. In the engines, whether Spark or Flink
> or some other, choices can be made or made available for what records to
> show. However, as a building block, I think we need the generation of all
> the changes for each snapshot in the requested range. If you have ideas for
> expanding the API of BaseIncrementalChangelogScan so that refinements of
> what records to emit can be pushed down to it, I'd be interested in hearing
> them. (They will be beyond the scope of my current PR, I think.)
>
> - Wing Yew
>
>
> On Thu, Aug 22, 2024 at 11:51 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> That's good info. I didn't know that we already have the Spark procedure
>> at hand.
>> How does Spark calculate the `changelog_view`? Do we already have an
>> implementation at hand somewhere? Could it be reused?
>>
>> Anyways, if we want to reuse the new changelogscan for the changelog_view
>> as well, then I agree that we need to provide a solution for (b). I think
>> that (a)/net_changes is also important as streaming readers for the table
>> are often not interested in the intermediate states, just in the final
>> changes. And (a) could result in far fewer records which means better
>> performance, lower resource usage.
>>
>> Steve Zhang <hongyue_zh...@apple.com.invalid> ezt írta (időpont: 2024.
>> aug. 22., Cs, 19:47):
>>
>>> Yeah agree on this, I think for changelogscan to convert per snapshot
>>> scan to tasks the option b with complete history is the right way. While
>>> there shall be an option to configure if net/squashed changes are desired.
>>>
>>> Also, In spark create_catalog_view, the net changes and compute update
>>> cannot be used together.
>>>
>>> Thanks,
>>> Steve Zhang
>>>
>>>
>>>
>>> On Aug 22, 2024, at 8:50 AM, Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>> >  It should emit changes for each snapshot in the requested range.
>>>
>>> Wing Yew has a good point here. +1
>>>
>>> On Thu, Aug 22, 2024 at 8:46 AM Wing Yew Poon
>>> <wyp...@cloudera.com.invalid> wrote:
>>>
>>>> First, thank you all for your responses to my question.
>>>>
>>>> For Peter's question, I believe that (b) is the correct behavior. It is
>>>> also the current behavior when using copy-on-write (deletes and updates are
>>>> still supported but not using delete files). A changelog scan is an
>>>> incremental scan over multiple snapshots. It should emit changes for each
>>>> snapshot in the requested range. Spark provides additional functionality on
>>>> top of the changelog scan, to produce net changes for the requested range.
>>>> See
>>>> https://iceberg.apache.org/docs/latest/spark-procedures/#create_changelog_view.
>>>> Basically the create_changelog_view procedure uses a changelog scan (read
>>>> the changelog table, i.e., <table>.changes) to get a DataFrame which is
>>>> saved to a temporary Spark view which can then be queried; if net_changes
>>>> is true, only the net changes are produced for this temporary view. This
>>>> functionality uses ChangelogIterator.removeNetCarryovers (which is in
>>>> Spark).
>>>>
>>>>
>>>> On Thu, Aug 22, 2024 at 7:51 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>>
>>>>> Peter, good question. In this case, (b) is the complete change
>>>>> history. (a) is the squashed version.
>>>>>
>>>>> I would probably check how other changelog systems deal with this
>>>>> scenario.
>>>>>
>>>>> On Thu, Aug 22, 2024 at 3:49 AM Péter Váry <
>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>
>>>>>> Technically different, but somewhat similar question:
>>>>>>
>>>>>> What is the expected behaviour when the `IncrementalScan` is created
>>>>>> for not a single snapshot, but for multiple snapshots?
>>>>>> S1 added PK1-V1
>>>>>> S2 updated PK1-V1 to PK1-V1b (removed PK1-V1 and added PK1-V1b)
>>>>>> S3 updated PK1-V1b to PK1-V1c (removed PK1-V1b and added PK1-V1c)
>>>>>>
>>>>>> Let's say we have
>>>>>> *IncrementalScan.fromSnapshotInclusive(S1).toSnapshot(S3)*.
>>>>>> Or we need to return:
>>>>>> (a)
>>>>>> - PK1,V1c,INSERTED
>>>>>>
>>>>>> Or is it ok, to return:
>>>>>> (b)
>>>>>> - PK1,V1,INSERTED
>>>>>> - PK1,V1,DELETED
>>>>>> - PK1,V1b,INSERTED
>>>>>> - PK1,V1b,DELETED
>>>>>> - PK1,V1c,INSERTED
>>>>>>
>>>>>> I think the (a) is the correct behaviour.
>>>>>>
>>>>>> Thanks,
>>>>>> Peter
>>>>>>
>>>>>> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2024. aug. 21.,
>>>>>> Sze, 22:27):
>>>>>>
>>>>>>> Agree with everyone that option (a) is the correct behavior.
>>>>>>>
>>>>>>> On Wed, Aug 21, 2024 at 11:57 AM Steve Zhang
>>>>>>> <hongyue_zh...@apple.com.invalid> wrote:
>>>>>>>
>>>>>>>> I agree that option (a) is what user expects for row level changes.
>>>>>>>>
>>>>>>>> I feel the added deletes in given snapshots provides a PK of
>>>>>>>> DELETED entry, existing deletes are used to read together with data 
>>>>>>>> files
>>>>>>>> to find DELETED value (V1b) and result of columns.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Steve Zhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Aug 20, 2024, at 6:06 PM, Wing Yew Poon
>>>>>>>> <wyp...@cloudera.com.INVALID> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I have a PR open to add changelog support for the case where delete
>>>>>>>> files are present (https://github.com/apache/iceberg/pull/10935).
>>>>>>>> I have a question about what the changelog should emit in the following
>>>>>>>> scenario:
>>>>>>>>
>>>>>>>> The table has a schema with a primary key/identifier column PK and
>>>>>>>> additional column V.
>>>>>>>> In snapshot 1, we write a data file DF1 with rows
>>>>>>>> PK1, V1
>>>>>>>> PK2, V2
>>>>>>>> etc.
>>>>>>>> In snapshot 2, we write an equality delete file ED1 with PK=PK1,
>>>>>>>> and new data file DF2 with rows
>>>>>>>> PK1, V1b
>>>>>>>> (possibly other rows)
>>>>>>>> In snapshot 3, we write an equality delete file ED2 with PK=PK1,
>>>>>>>> and new data file DF3 with rows
>>>>>>>> PK1, V1c
>>>>>>>> (possibly other rows)
>>>>>>>>
>>>>>>>> Thus, in snapshot 2 and snapshot 3, we update the row identified by
>>>>>>>> PK1 with new values by using an equality delete and writing new data 
>>>>>>>> for
>>>>>>>> the row.
>>>>>>>> These are the files present in snapshot 3:
>>>>>>>> DF1 (sequence number 1)
>>>>>>>> DF2 (sequence number 2)
>>>>>>>> DF3 (sequence number 3)
>>>>>>>> ED1 (sequence number 2)
>>>>>>>> ED2 (sequence number 3)
>>>>>>>>
>>>>>>>> The question I have is what should the changelog emit for snapshot
>>>>>>>> 3?
>>>>>>>> For snapshot 1, the changelog should emit a row for each row in DF1
>>>>>>>> as INSERTED.
>>>>>>>> For snapshot 2, it should emit a row for PK1, V1 as DELETED; and a
>>>>>>>> row for PK1, V1b as INSERTED.
>>>>>>>> For snapshot 3, I see two possibilities:
>>>>>>>> (a)
>>>>>>>> PK1,V1b,DELETED
>>>>>>>> PK1,V1c,INSERTED
>>>>>>>>
>>>>>>>> (b)
>>>>>>>> PK1,V1,DELETED
>>>>>>>> PK1,V1b,DELETED
>>>>>>>> PK1,V1c,INSERTED
>>>>>>>>
>>>>>>>> The interpretation for (b) is that both ED1 and ED2 apply to DF1,
>>>>>>>> with ED1 being an existing delete file and ED2 being an added delete 
>>>>>>>> file
>>>>>>>> for it. We discount ED1 and apply ED2 and get a DELETED row for PK1,V1.
>>>>>>>> ED2 also applies to DF2, from which we get a DELETED row for PK1,
>>>>>>>> V1b.
>>>>>>>>
>>>>>>>> The interpretation for (a) is that ED1 is an existing delete file
>>>>>>>> for DF1 and in snapshot 3, the row PK1,V1 already does not exist 
>>>>>>>> before the
>>>>>>>> snapshot. Thus we do emit a row for it. (We can think of it as ED1 is
>>>>>>>> already applied to DF1, and we only consider any additional rows that 
>>>>>>>> get
>>>>>>>> deleted when ED2 is applied.)
>>>>>>>>
>>>>>>>> I lean towards (a), as I think it is more reflective of net changes.
>>>>>>>> I am interested to hear what folks think.
>>>>>>>>
>>>>>>>> Thank you,
>>>>>>>> Wing Yew
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>

Reply via email to