Just a note that the functionality to compute net changes was added by Yufei only in Iceberg 1.4.0, in #7326 <https://github.com/apache/iceberg/pull/7326>.
On Thu, Aug 22, 2024 at 12:48 PM Wing Yew Poon <wyp...@cloudera.com> wrote: > Peter, > > The Spark procedure is implemented by CreateChangelogViewProcedure.java > <https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java>. > This was already added by Yufei in Iceberg 1.2.0. > ChangelogIterator > <https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java> > is > a base class that contains static methods such as the removeNetCarryovers I > mentioned; RemoveNetCarryoverIterator > <https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/RemoveNetCarryoverIterator.java> > is > a subclass that computes the net changes. > These are Spark specific as they work with iterators of > org.apache.spark.sql.Row. > > BaseIncrementalChangelogScan > <https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java> > is > a common building block that can be used by other engines than Spark; it > powers the Spark ChangelogRowReader. In the engines, whether Spark or Flink > or some other, choices can be made or made available for what records to > show. However, as a building block, I think we need the generation of all > the changes for each snapshot in the requested range. If you have ideas for > expanding the API of BaseIncrementalChangelogScan so that refinements of > what records to emit can be pushed down to it, I'd be interested in hearing > them. (They will be beyond the scope of my current PR, I think.) > > - Wing Yew > > > On Thu, Aug 22, 2024 at 11:51 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> That's good info. I didn't know that we already have the Spark procedure >> at hand. >> How does Spark calculate the `changelog_view`? Do we already have an >> implementation at hand somewhere? Could it be reused? >> >> Anyways, if we want to reuse the new changelogscan for the changelog_view >> as well, then I agree that we need to provide a solution for (b). I think >> that (a)/net_changes is also important as streaming readers for the table >> are often not interested in the intermediate states, just in the final >> changes. And (a) could result in far fewer records which means better >> performance, lower resource usage. >> >> Steve Zhang <hongyue_zh...@apple.com.invalid> ezt írta (időpont: 2024. >> aug. 22., Cs, 19:47): >> >>> Yeah agree on this, I think for changelogscan to convert per snapshot >>> scan to tasks the option b with complete history is the right way. While >>> there shall be an option to configure if net/squashed changes are desired. >>> >>> Also, In spark create_catalog_view, the net changes and compute update >>> cannot be used together. >>> >>> Thanks, >>> Steve Zhang >>> >>> >>> >>> On Aug 22, 2024, at 8:50 AM, Steven Wu <stevenz...@gmail.com> wrote: >>> >>> > It should emit changes for each snapshot in the requested range. >>> >>> Wing Yew has a good point here. +1 >>> >>> On Thu, Aug 22, 2024 at 8:46 AM Wing Yew Poon >>> <wyp...@cloudera.com.invalid> wrote: >>> >>>> First, thank you all for your responses to my question. >>>> >>>> For Peter's question, I believe that (b) is the correct behavior. It is >>>> also the current behavior when using copy-on-write (deletes and updates are >>>> still supported but not using delete files). A changelog scan is an >>>> incremental scan over multiple snapshots. It should emit changes for each >>>> snapshot in the requested range. Spark provides additional functionality on >>>> top of the changelog scan, to produce net changes for the requested range. >>>> See >>>> https://iceberg.apache.org/docs/latest/spark-procedures/#create_changelog_view. >>>> Basically the create_changelog_view procedure uses a changelog scan (read >>>> the changelog table, i.e., <table>.changes) to get a DataFrame which is >>>> saved to a temporary Spark view which can then be queried; if net_changes >>>> is true, only the net changes are produced for this temporary view. This >>>> functionality uses ChangelogIterator.removeNetCarryovers (which is in >>>> Spark). >>>> >>>> >>>> On Thu, Aug 22, 2024 at 7:51 AM Steven Wu <stevenz...@gmail.com> wrote: >>>> >>>>> Peter, good question. In this case, (b) is the complete change >>>>> history. (a) is the squashed version. >>>>> >>>>> I would probably check how other changelog systems deal with this >>>>> scenario. >>>>> >>>>> On Thu, Aug 22, 2024 at 3:49 AM Péter Váry < >>>>> peter.vary.apa...@gmail.com> wrote: >>>>> >>>>>> Technically different, but somewhat similar question: >>>>>> >>>>>> What is the expected behaviour when the `IncrementalScan` is created >>>>>> for not a single snapshot, but for multiple snapshots? >>>>>> S1 added PK1-V1 >>>>>> S2 updated PK1-V1 to PK1-V1b (removed PK1-V1 and added PK1-V1b) >>>>>> S3 updated PK1-V1b to PK1-V1c (removed PK1-V1b and added PK1-V1c) >>>>>> >>>>>> Let's say we have >>>>>> *IncrementalScan.fromSnapshotInclusive(S1).toSnapshot(S3)*. >>>>>> Or we need to return: >>>>>> (a) >>>>>> - PK1,V1c,INSERTED >>>>>> >>>>>> Or is it ok, to return: >>>>>> (b) >>>>>> - PK1,V1,INSERTED >>>>>> - PK1,V1,DELETED >>>>>> - PK1,V1b,INSERTED >>>>>> - PK1,V1b,DELETED >>>>>> - PK1,V1c,INSERTED >>>>>> >>>>>> I think the (a) is the correct behaviour. >>>>>> >>>>>> Thanks, >>>>>> Peter >>>>>> >>>>>> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2024. aug. 21., >>>>>> Sze, 22:27): >>>>>> >>>>>>> Agree with everyone that option (a) is the correct behavior. >>>>>>> >>>>>>> On Wed, Aug 21, 2024 at 11:57 AM Steve Zhang >>>>>>> <hongyue_zh...@apple.com.invalid> wrote: >>>>>>> >>>>>>>> I agree that option (a) is what user expects for row level changes. >>>>>>>> >>>>>>>> I feel the added deletes in given snapshots provides a PK of >>>>>>>> DELETED entry, existing deletes are used to read together with data >>>>>>>> files >>>>>>>> to find DELETED value (V1b) and result of columns. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Steve Zhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Aug 20, 2024, at 6:06 PM, Wing Yew Poon >>>>>>>> <wyp...@cloudera.com.INVALID> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I have a PR open to add changelog support for the case where delete >>>>>>>> files are present (https://github.com/apache/iceberg/pull/10935). >>>>>>>> I have a question about what the changelog should emit in the following >>>>>>>> scenario: >>>>>>>> >>>>>>>> The table has a schema with a primary key/identifier column PK and >>>>>>>> additional column V. >>>>>>>> In snapshot 1, we write a data file DF1 with rows >>>>>>>> PK1, V1 >>>>>>>> PK2, V2 >>>>>>>> etc. >>>>>>>> In snapshot 2, we write an equality delete file ED1 with PK=PK1, >>>>>>>> and new data file DF2 with rows >>>>>>>> PK1, V1b >>>>>>>> (possibly other rows) >>>>>>>> In snapshot 3, we write an equality delete file ED2 with PK=PK1, >>>>>>>> and new data file DF3 with rows >>>>>>>> PK1, V1c >>>>>>>> (possibly other rows) >>>>>>>> >>>>>>>> Thus, in snapshot 2 and snapshot 3, we update the row identified by >>>>>>>> PK1 with new values by using an equality delete and writing new data >>>>>>>> for >>>>>>>> the row. >>>>>>>> These are the files present in snapshot 3: >>>>>>>> DF1 (sequence number 1) >>>>>>>> DF2 (sequence number 2) >>>>>>>> DF3 (sequence number 3) >>>>>>>> ED1 (sequence number 2) >>>>>>>> ED2 (sequence number 3) >>>>>>>> >>>>>>>> The question I have is what should the changelog emit for snapshot >>>>>>>> 3? >>>>>>>> For snapshot 1, the changelog should emit a row for each row in DF1 >>>>>>>> as INSERTED. >>>>>>>> For snapshot 2, it should emit a row for PK1, V1 as DELETED; and a >>>>>>>> row for PK1, V1b as INSERTED. >>>>>>>> For snapshot 3, I see two possibilities: >>>>>>>> (a) >>>>>>>> PK1,V1b,DELETED >>>>>>>> PK1,V1c,INSERTED >>>>>>>> >>>>>>>> (b) >>>>>>>> PK1,V1,DELETED >>>>>>>> PK1,V1b,DELETED >>>>>>>> PK1,V1c,INSERTED >>>>>>>> >>>>>>>> The interpretation for (b) is that both ED1 and ED2 apply to DF1, >>>>>>>> with ED1 being an existing delete file and ED2 being an added delete >>>>>>>> file >>>>>>>> for it. We discount ED1 and apply ED2 and get a DELETED row for PK1,V1. >>>>>>>> ED2 also applies to DF2, from which we get a DELETED row for PK1, >>>>>>>> V1b. >>>>>>>> >>>>>>>> The interpretation for (a) is that ED1 is an existing delete file >>>>>>>> for DF1 and in snapshot 3, the row PK1,V1 already does not exist >>>>>>>> before the >>>>>>>> snapshot. Thus we do emit a row for it. (We can think of it as ED1 is >>>>>>>> already applied to DF1, and we only consider any additional rows that >>>>>>>> get >>>>>>>> deleted when ED2 is applied.) >>>>>>>> >>>>>>>> I lean towards (a), as I think it is more reflective of net changes. >>>>>>>> I am interested to hear what folks think. >>>>>>>> >>>>>>>> Thank you, >>>>>>>> Wing Yew >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>