It was at my previous company, so no Flink was involved for me. We were thinking about adding a background compaction service to the Iceberg tables using 0.14 version of Iceberg and were concerned about causing issues with concurrent writes
Anton Okolnychyi <aokolnyc...@apple.com.invalid> ezt írta (időpont: 2023. máj. 5., P, 22:04): > Peter, are you taking about the Flink compaction issue discussed here [1] > or something else? > > [1] - https://github.com/apache/iceberg/pull/5760#issuecomment-1528722651 > > - Anton > > On Apr 27, 2023, at 9:56 PM, Péter Váry <peter.vary.apa...@gmail.com> > wrote: > > I have been thinking about compaction conflicts with concurrent deletes a > year ago. > > If we have explicit rowids, then we can run compactions, and concurrent > deletes, and we do not have to roll back one of them on commits. This would > be possible, since we can identify the deleted rows in the compacted files > as well. > > That said, this would introduce a spec change and complications in query > planning, as we would need to read more delete files for a given data file. > At that point this was the reason I did not push this idea. > > Thanks, Peter > > On Thu, Apr 27, 2023, 18:27 Anton Okolnychyi < > aokolnyc...@apple.com.invalid> wrote: > >> +1 on adding more documentation regarding this behavior to the docs. >> >> We debated implicit vs explicit row ID columns while designing lazy >> row-level operations. It is not uncommon to use explicit row IDs (e.g. Hive >> ACID) but we decided to avoid that if possible. Iceberg file_name + pos is >> a cheap way to uniquely identify a row within a snapshot that can be used >> for external indexes or to encoding row-level mutations. Since we did not >> plan to ensure uniqueness, it seemed enough to rely on that + support >> unenforced identifier columns. >> >> I agree we lose file_name + pos across snapshots if the row gets changed >> but I am not sure there is a strong use case that can’t be solved using >> file_name >> + pos and/or identity columns with some reasonable expectations. I can be >> convinced otherwise, though. >> >> - Anton >> >> On Apr 26, 2023, at 1:38 PM, Jack Ye <yezhao...@gmail.com> wrote: >> >> To catch up from what we discussed in the community sync, I had the >> misunderstanding that this will throw an exception even when we are only >> requesting deletes and inserts. Looks like users who want to consume that >> kind of changes will not be affected, and what Anton proposes about using >> the WAP branch also works for me, so we have a consensus there that >> throwing exceptions should be the right way to go. >> >> I was thinking that it might be worth considering adding an internal >> ROWID column to the Iceberg spec to make sure updates are always valid. >> Traditional database systems like Oracle, MySQL, etc. all have that column >> baked in and it opens the door for various optimizations, especifically for >> indexing purposes. In those database systems, the ROWID is a virtual column >> describing the physical storage location of the row, similar to Iceberg's >> (file_path, row_posistion) combo. However, because Iceberg tables are >> versioned, rows are constantly put to new files, so the ROWID is not >> stable. If we want to support a stable ROWID, we probably need to write a >> UUID for inserted rows directly in the data files. If we want to do that, >> it will be a lot of effort, and it's debatable what exact benefit it would >> bring, so it is probably something to consider in the long term. >> >> Meanwhile, users can engineer a unique ROWID column when creating the >> Iceberg table and use that as a primary key if they want to track exact >> update changes for a system with potential duplicate primary key entries. >> We could add more documentation in the Spark CDC section >> <https://iceberg.apache.org/docs/latest/spark-procedures/#change-data-capture> >> to describe this specific engineering pattern. Or maybe a dedicated page >> for primary key and CDC should be added to describe the use case in a more >> general way. >> >> -Jack >> >> >> >> >> On Wed, Apr 26, 2023 at 8:58 AM Anton Okolnychyi < >> aokolnyc...@apple.com.invalid> wrote: >> >>> If I understand correctly, duplicate IDs are only a problem if we want >>> to compute pre and post update images. If we are not trying to rebuild >>> updates, it should not matter, which is good as most use cases can be >>> solved with only DELETE and INSERT. >>> >>> My initial thought was to do our best and come up with a reasonable >>> changelog in case rows are not unique and update images were requested. >>> However, I am no longer sure it is a good idea as there are cases when the >>> generated changelog may not represent what actually happened. >>> >>> Consider a MERGE command like this. >>> >>> MERGE INTO target t USING source s >>> ON t.id >>> <https://flagged.apple.com/proxy?t2=DH9x3U2xu7&o=http%3A%2F%2Ft.id&emid=17bb959c-98fa-41ca-ad19-8d11a7796930&c=7> >>> = s.id >>> <https://flagged.apple.com/proxy?t2=dH1l7d9A7T&o=http%3A%2F%2Fs.id&emid=17bb959c-98fa-41ca-ad19-8d11a7796930&c=7> >>> WHEN MATCHED AND extra_col = ‘a’ THEN >>> UPDATE SET update_clause_1 >>> WHEN MATCHED AND extra_col = ‘b’ THEN >>> UPDATE SET update_clause_2 >>> >>> It is a valid MERGE and all matching rows must be updated. However, >>> reconstructing correct UPDATEs is impossible in this case. >>> >>> The spec does not enforce uniqueness and generating a wrong changelog >>> can lead to problems in downstream jobs. Detecting such cases is extremely >>> hard, not to mention the pain to rollback entire pipelines. >>> >>> Jack’s use case of deduplicating rows after writes is valid but >>> shouldn’t it be implemented using WAP or branches to ensure the main branch >>> is clean? That way, downstream consumers always see only correct data. >>> >>> In my view, it is reasonable to throw an exception when update images >>> are requested but IDs are not unique. Unless there is a good way to resolve >>> the problem above? >>> >>> - Anton >>> >>> >>> On Apr 24, 2023, at 3:00 PM, Yufei Gu <flyrain...@gmail.com> wrote: >>> >>> Two rows are the “same”—that is, the rows represent the same entity—if >>>> the identifier fields are equal. However, uniqueness of rows by this >>>> identifier is not guaranteed or required by Iceberg and it is the >>>> responsibility of processing engines or data providers to enforce. >>> >>> Based on the above table spec, it is the responsibility of individual >>> compute engines to determine if and how to enforce uniqueness. The three >>> modes you mentioned(unenforced mode/semi-enforced/enforced) are associated >>> with specific engines. To my knowledge, Spark does not enforce uniqueness, >>> while Flink offers options for both unenforced and enforced modes. >>> Changelog generation is independent of writer types of different engines. >>> >>> The current changelog generation tool covers the enforced use case of >>> course. However, it seems reasonable to provide users with an option to >>> prevent job failures in case of non-unique records. This flexibility would >>> allow for smoother operations and potentially better overall user >>> experience.Thanks for the suggestion. >>> >>> I'm also curious how the semi-enforced dedupe process works. Would the >>> result be like following two rows after the dedupe? Can you share any docs >>> or implementations for the dedupe process? >>> (1, 'a', DELETE) >>> (1, 'd', INSERT) >>> >>> Best, >>> >>> Yufei >>> >>> >>> On Sun, Apr 23, 2023 at 12:00 AM Jack Ye <yezhao...@gmail.com> wrote: >>> >>>> When the row identifier is defined in a table, I think the table can be >>>> viewed in one of the 3 modes: >>>> 1. *unenforced mode*: the table operates as if it has no primary key. >>>> 2. *semi-enforced mode*: some/most writers try to enforce the primary >>>> key, but it is not guaranteed that there are completely no duplicates. It >>>> is expected that duplicated rows will be removed as a part of table >>>> maintenance >>>> 2. *enforced mode*: the table's primary key must be enforced by the >>>> writers. Having duplicated rows is unexpected and is considered an illegal >>>> state. >>>> >>>> In your example, it seems like: >>>> 1. under unenforced mode, the changelog produced is technically correct >>>> although it is ambiguous. >>>> 2. under semi-enforced mode, users could accept this ambiguous result >>>> temporarily and expect the inconsistency to be resolved soon. >>>> 3. under enforced mode, it makes sense to throw an exception to notify >>>> the table owner that the table has entered an illegal state. >>>> >>>> To illustrate the semi-enforced case, a table maintenance process can >>>> continuously do an aggregation and delete the duplicated rows based on some >>>> merge-key definition. Suppose in your example the 'name' column is the >>>> merge key and larger value in comparison wins (just for the sake of >>>> discussion, probably not a very intuitive example, usually it's something >>>> like a timestamp column), after the dedupe process runs, the changelog >>>> including the new transactions would always be: >>>> >>>> (1, 'a', DELETE) >>>> (1, 'b', DELETE) >>>> (1, 'c', INSERT) >>>> (1, 'd', INSERT) >>>> (1, 'c', DELETE) >>>> >>>> a.k.a. >>>> >>>> (1, 'a', DELETE) >>>> (1, 'b', DELETE) >>>> (1, 'd', INSERT) >>>> >>>> and there is only a single record of (1, 'd') in the end regardless of >>>> which SQL in the original change was actually run. >>>> >>>> So going back to the original question, when the user does not expect >>>> the table primary key to always be strictly enforced, I think it still has >>>> value for users to have a solution, even if it might be ambiguous and might >>>> not be the unique and correct solution. That solution might already be good >>>> enough, or might eventually correct itself. If we follow this logic, >>>> throwing an exception could be based on a config, just like in CDC we have >>>> upsert mode as a specific mode to turn on. Otherwise people developing a >>>> change data feed based on this might have to be blocked by such error until >>>> the table is repaired and the duplicate rows are removed. >>>> >>>> Any thoughts? >>>> >>>> Best, >>>> Jack Ye >>>> >>>> >>>> >>>> On Thu, Apr 20, 2023, 11:59 PM Yufei Gu <flyrain...@gmail.com> wrote: >>>> >>>>> Hi folks, >>>>> >>>>> I am reaching out to request your insights on addressing the ambiguous >>>>> behavior of generating changelogs in Iceberg. >>>>> >>>>> To provide some context, Iceberg does not enforce row uniqueness even >>>>> when configured with identifier fields (a.k.a primary key in the other >>>>> database system) during write operations. That means that it is possible >>>>> to >>>>> have multiple rows with the same identifier fields values. For example, >>>>> let's consider a table "customer" with columns "id" (int) and "name" >>>>> (string), and the identifier field set as "id." It is still possible to >>>>> write multiple rows with the same "id" values, as shown below: >>>>> >>>>> (1, 'A') >>>>> (1, 'B') >>>>> (2, 'X') >>>>> (2, 'Y') >>>>> >>>>> The CreateChangelogViewProcedure >>>>> <https://github.com/apache/iceberg/blob/master/docs/spark-procedures.md#change-data-capture> >>>>> can reconstruct updates based on identifier fields. It works effectively >>>>> when there is only one row per identifier value. However, handling >>>>> multiple >>>>> rows with the same identifier values can be challenging. For example, a >>>>> `Merge into` or `Update` command can result the following changes: >>>>> >>>>> (1, 'a', DELETE) >>>>> (1, 'b', DELETE) >>>>> (1, 'c', INSERT) >>>>> (1, 'd', INSERT) >>>>> >>>>> Unfortunately, it is impossible to determine whether "c" or "d" >>>>> updated "a". For example, both of the following commands are valid even >>>>> though there is an identifier column id. >>>>> UPDATE table SET data = 'c' WHERE data = 'a'; >>>>> UPDATE table SET data = 'd' WHERE data = 'b'; >>>>> Or >>>>> UPDATE table SET data = 'd' WHERE data = 'a'; >>>>> UPDATE table SET data = 'c' WHERE data = 'b'; >>>>> >>>>> Due to this uncertainty, we have allowed the procedure to throw an >>>>> exception in such cases. A relevant pull request can be found [here >>>>> <https://github.com/apache/iceberg/pull/7388>]. I would appreciate >>>>> any thoughts or suggestions. >>>>> >>>>> Best, >>>>> >>>>> Yufei >>>>> >>>>> `This is not a contribution` >>>>> >>>> >>> >> >