Peter, are you taking about the Flink compaction issue discussed here [1] or something else?
[1] - https://github.com/apache/iceberg/pull/5760#issuecomment-1528722651 <https://github.com/apache/iceberg/pull/5760#issuecomment-1528722651> - Anton > On Apr 27, 2023, at 9:56 PM, Péter Váry <peter.vary.apa...@gmail.com> wrote: > > I have been thinking about compaction conflicts with concurrent deletes a > year ago. > > If we have explicit rowids, then we can run compactions, and concurrent > deletes, and we do not have to roll back one of them on commits. This would > be possible, since we can identify the deleted rows in the compacted files as > well. > > That said, this would introduce a spec change and complications in query > planning, as we would need to read more delete files for a given data file. > At that point this was the reason I did not push this idea. > > Thanks, Peter > > On Thu, Apr 27, 2023, 18:27 Anton Okolnychyi <aokolnyc...@apple.com.invalid> > wrote: > +1 on adding more documentation regarding this behavior to the docs. > > We debated implicit vs explicit row ID columns while designing lazy row-level > operations. It is not uncommon to use explicit row IDs (e.g. Hive ACID) but > we decided to avoid that if possible. Iceberg file_name + pos is a cheap way > to uniquely identify a row within a snapshot that can be used for external > indexes or to encoding row-level mutations. Since we did not plan to ensure > uniqueness, it seemed enough to rely on that + support unenforced identifier > columns. > > I agree we lose file_name + pos across snapshots if the row gets changed but > I am not sure there is a strong use case that can’t be solved using file_name > + pos and/or identity columns with some reasonable expectations. I can be > convinced otherwise, though. > > - Anton > >> On Apr 26, 2023, at 1:38 PM, Jack Ye <yezhao...@gmail.com >> <mailto:yezhao...@gmail.com>> wrote: >> >> To catch up from what we discussed in the community sync, I had the >> misunderstanding that this will throw an exception even when we are only >> requesting deletes and inserts. Looks like users who want to consume that >> kind of changes will not be affected, and what Anton proposes about using >> the WAP branch also works for me, so we have a consensus there that throwing >> exceptions should be the right way to go. >> >> I was thinking that it might be worth considering adding an internal ROWID >> column to the Iceberg spec to make sure updates are always valid. >> Traditional database systems like Oracle, MySQL, etc. all have that column >> baked in and it opens the door for various optimizations, especifically for >> indexing purposes. In those database systems, the ROWID is a virtual column >> describing the physical storage location of the row, similar to Iceberg's >> (file_path, row_posistion) combo. However, because Iceberg tables are >> versioned, rows are constantly put to new files, so the ROWID is not stable. >> If we want to support a stable ROWID, we probably need to write a UUID for >> inserted rows directly in the data files. If we want to do that, it will be >> a lot of effort, and it's debatable what exact benefit it would bring, so it >> is probably something to consider in the long term. >> >> Meanwhile, users can engineer a unique ROWID column when creating the >> Iceberg table and use that as a primary key if they want to track exact >> update changes for a system with potential duplicate primary key entries. We >> could add more documentation in the Spark CDC section >> <https://iceberg.apache.org/docs/latest/spark-procedures/#change-data-capture> >> to describe this specific engineering pattern. Or maybe a dedicated page >> for primary key and CDC should be added to describe the use case in a more >> general way. >> >> -Jack >> >> >> >> >> On Wed, Apr 26, 2023 at 8:58 AM Anton Okolnychyi >> <aokolnyc...@apple.com.invalid <mailto:aokolnyc...@apple.com.invalid>> wrote: >> If I understand correctly, duplicate IDs are only a problem if we want to >> compute pre and post update images. If we are not trying to rebuild updates, >> it should not matter, which is good as most use cases can be solved with >> only DELETE and INSERT. >> >> My initial thought was to do our best and come up with a reasonable >> changelog in case rows are not unique and update images were requested. >> However, I am no longer sure it is a good idea as there are cases when the >> generated changelog may not represent what actually happened. >> >> Consider a MERGE command like this. >> >> MERGE INTO target t USING source s >> ON t.id >> <https://flagged.apple.com/proxy?t2=DH9x3U2xu7&o=http%3A%2F%2Ft.id&emid=17bb959c-98fa-41ca-ad19-8d11a7796930&c=7> >> = s.id >> <https://flagged.apple.com/proxy?t2=dH1l7d9A7T&o=http%3A%2F%2Fs.id&emid=17bb959c-98fa-41ca-ad19-8d11a7796930&c=7> >> WHEN MATCHED AND extra_col = ‘a’ THEN >> UPDATE SET update_clause_1 >> WHEN MATCHED AND extra_col = ‘b’ THEN >> UPDATE SET update_clause_2 >> >> It is a valid MERGE and all matching rows must be updated. However, >> reconstructing correct UPDATEs is impossible in this case. >> >> The spec does not enforce uniqueness and generating a wrong changelog can >> lead to problems in downstream jobs. Detecting such cases is extremely hard, >> not to mention the pain to rollback entire pipelines. >> >> Jack’s use case of deduplicating rows after writes is valid but shouldn’t it >> be implemented using WAP or branches to ensure the main branch is clean? >> That way, downstream consumers always see only correct data. >> >> In my view, it is reasonable to throw an exception when update images are >> requested but IDs are not unique. Unless there is a good way to resolve the >> problem above? >> >> - Anton >> >> >>> On Apr 24, 2023, at 3:00 PM, Yufei Gu <flyrain...@gmail.com >>> <mailto:flyrain...@gmail.com>> wrote: >>> >>> Two rows are the “same”—that is, the rows represent the same entity—if the >>> identifier fields are equal. However, uniqueness of rows by this identifier >>> is not guaranteed or required by Iceberg and it is the responsibility of >>> processing engines or data providers to enforce. >>> Based on the above table spec, it is the responsibility of individual >>> compute engines to determine if and how to enforce uniqueness. The three >>> modes you mentioned(unenforced mode/semi-enforced/enforced) are associated >>> with specific engines. To my knowledge, Spark does not enforce uniqueness, >>> while Flink offers options for both unenforced and enforced modes. >>> Changelog generation is independent of writer types of different engines. >>> >>> The current changelog generation tool covers the enforced use case of >>> course. However, it seems reasonable to provide users with an option to >>> prevent job failures in case of non-unique records. This flexibility would >>> allow for smoother operations and potentially better overall user >>> experience.Thanks for the suggestion. >>> >>> I'm also curious how the semi-enforced dedupe process works. Would the >>> result be like following two rows after the dedupe? Can you share any docs >>> or implementations for the dedupe process? >>> (1, >>> 'a', DELETE) >>> (1, 'd', INSERT) >>> >>> Best, >>> >>> Yufei >>> >>> >>> On Sun, Apr 23, 2023 at 12:00 AM Jack Ye <yezhao...@gmail.com >>> <mailto:yezhao...@gmail.com>> wrote: >>> When the row identifier is defined in a table, I think the table can be >>> viewed in one of the 3 modes: >>> 1. unenforced mode: the table operates as if it has no primary key. >>> 2. semi-enforced mode: some/most writers try to enforce the primary key, >>> but it is not guaranteed that there are completely no duplicates. It is >>> expected that duplicated rows will be removed as a part of table maintenance >>> 2. enforced mode: the table's primary key must be enforced by the writers. >>> Having duplicated rows is unexpected and is considered an illegal state. >>> >>> In your example, it seems like: >>> 1. under unenforced mode, the changelog produced is technically correct >>> although it is ambiguous. >>> 2. under semi-enforced mode, users could accept this ambiguous result >>> temporarily and expect the inconsistency to be resolved soon. >>> 3. under enforced mode, it makes sense to throw an exception to notify the >>> table owner that the table has entered an illegal state. >>> >>> To illustrate the semi-enforced case, a table maintenance process can >>> continuously do an aggregation and delete the duplicated rows based on some >>> merge-key definition. Suppose in your example the 'name' column is the >>> merge key and larger value in comparison wins (just for the sake of >>> discussion, probably not a very intuitive example, usually it's something >>> like a timestamp column), after the dedupe process runs, the changelog >>> including the new transactions would always be: >>> >>> (1, >>> 'a', DELETE) >>> (1, 'b', DELETE) >>> (1, 'c', INSERT) >>> (1, 'd', INSERT) >>> (1, 'c', DELETE) >>> >>> a.k.a. >>> >>> (1, 'a', DELETE) >>> (1, 'b', DELETE) >>> (1, 'd', INSERT) >>> >>> and there is only a single record of (1, 'd') in the end regardless of >>> which SQL in the original change was actually run. >>> >>> So going back to the original question, when the user does not expect the >>> table primary key to always be strictly enforced, I think it still has >>> value for users to have a solution, even if it might be ambiguous and might >>> not be the unique and correct solution. That solution might already be good >>> enough, or might eventually correct itself. If we follow this logic, >>> throwing an exception could be based on a config, just like in CDC we have >>> upsert mode as a specific mode to turn on. Otherwise people developing a >>> change data feed based on this might have to be blocked by such error until >>> the table is repaired and the duplicate rows are removed. >>> >>> Any thoughts? >>> >>> Best, >>> Jack Ye >>> >>> >>> >>> On Thu, Apr 20, 2023, 11:59 PM Yufei Gu <flyrain...@gmail.com >>> <mailto:flyrain...@gmail.com>> wrote: >>> Hi folks, >>> >>> I am reaching out to >>> request your insights on addressing the ambiguous behavior of generating >>> changelogs in Iceberg. >>> >>> To provide some context, Iceberg does not enforce row uniqueness even when >>> configured >>> with identifier fields (a.k.a primary key in the other database system) >>> during write operations. That means that it is possible to have multiple >>> rows with the same identifier fields values. For example, let's consider a >>> table "customer" with columns "id" (int) and "name" (string), and the >>> identifier field set as "id." It is still possible to write multiple rows >>> with the same "id" values, as shown below: >>> >>> (1, 'A') >>> (1, 'B') >>> (2, 'X') >>> (2, 'Y') >>> >>> The CreateChangelogViewProcedure >>> <https://github.com/apache/iceberg/blob/master/docs/spark-procedures.md#change-data-capture> >>> can reconstruct updates based on identifier fields. It >>> works effectively when there is only one row per identifier value. >>> However, handling multiple rows with the same identifier values can be >>> challenging. For example, a `Merge into` or `Update` command can result the >>> following changes: >>> >>> (1, 'a', DELETE) >>> (1, 'b', DELETE) >>> (1, 'c', INSERT) >>> (1, 'd', INSERT) >>> >>> Unfortunately, it is impossible to determine whether "c" or "d" updated >>> "a". For example, both of the following commands are valid even though >>> there is an identifier column id. >>> UPDATE >>> table SET >>> data = 'c' >>> WHERE data = >>> 'a'; >>> UPDATE table SET data = 'd' WHERE data = 'b'; >>> Or >>> UPDATE >>> table SET data = 'd' WHERE data = 'a'; >>> UPDATE table SET data = 'c' WHERE data = 'b'; >>> >>> Due to this uncertainty, we have allowed the procedure to throw an >>> exception in such cases. A relevant pull request can be found [here >>> <https://github.com/apache/iceberg/pull/7388>]. I would appreciate any >>> thoughts or suggestions. >>> >>> Best, >>> >>> Yufei >>> >>> `This is not a contribution` >> >