I have been thinking about compaction conflicts with concurrent deletes a
year ago.

If we have explicit rowids, then we can run compactions, and concurrent
deletes, and we do not have to roll back one of them on commits. This would
be possible, since we can identify the deleted rows in the compacted files
as well.

That said, this would introduce a spec change and complications in query
planning, as we would need to read more delete files for a given data file.
At that point this was the reason I did not push this idea.

Thanks, Peter

On Thu, Apr 27, 2023, 18:27 Anton Okolnychyi <aokolnyc...@apple.com.invalid>
wrote:

> +1 on adding more documentation regarding this behavior to the docs.
>
> We debated implicit vs explicit row ID columns while designing lazy
> row-level operations. It is not uncommon to use explicit row IDs (e.g. Hive
> ACID) but we decided to avoid that if possible. Iceberg file_name + pos is
> a cheap way to uniquely identify a row within a snapshot that can be used
> for external indexes or to encoding row-level mutations. Since we did not
> plan to ensure uniqueness, it seemed enough to rely on that + support
> unenforced identifier columns.
>
> I agree we lose file_name + pos across snapshots if the row gets changed
> but I am not sure there is a strong use case that can’t be solved using 
> file_name
> + pos and/or identity columns with some reasonable expectations. I can be
> convinced otherwise, though.
>
> - Anton
>
> On Apr 26, 2023, at 1:38 PM, Jack Ye <yezhao...@gmail.com> wrote:
>
> To catch up from what we discussed in the community sync, I had the
> misunderstanding that this will throw an exception even when we are only
> requesting deletes and inserts. Looks like users who want to consume that
> kind of changes will not be affected, and what Anton proposes about using
> the WAP branch also works for me, so we have a consensus there that
> throwing exceptions should be the right way to go.
>
> I was thinking that it might be worth considering adding an internal ROWID
> column to the Iceberg spec to make sure updates are always valid.
> Traditional database systems like Oracle, MySQL, etc. all have that column
> baked in and it opens the door for various optimizations, especifically for
> indexing purposes. In those database systems, the ROWID is a virtual column
> describing the physical storage location of the row, similar to Iceberg's
> (file_path, row_posistion) combo. However, because Iceberg tables are
> versioned, rows are constantly put to new files, so the ROWID is not
> stable. If we want to support a stable ROWID, we probably need to write a
> UUID for inserted rows directly in the data files. If we want to do that,
> it will be a lot of effort, and it's debatable what exact benefit it would
> bring, so it is probably something to consider in the long term.
>
> Meanwhile, users can engineer a unique ROWID column when creating the
> Iceberg table and use that as a primary key if they want to track exact
> update changes for a system with potential duplicate primary key entries.
> We could add more documentation in the Spark CDC section
> <https://iceberg.apache.org/docs/latest/spark-procedures/#change-data-capture>
> to describe this specific engineering pattern. Or maybe a dedicated page
> for primary key and CDC should be added to describe the use case in a more
> general way.
>
> -Jack
>
>
>
>
> On Wed, Apr 26, 2023 at 8:58 AM Anton Okolnychyi <
> aokolnyc...@apple.com.invalid> wrote:
>
>> If I understand correctly, duplicate IDs are only a problem if we want to
>> compute pre and post update images. If we are not trying to rebuild
>> updates, it should not matter, which is good as most use cases can be
>> solved with only DELETE and INSERT.
>>
>> My initial thought was to do our best and come up with a reasonable
>> changelog in case rows are not unique and update images were requested.
>> However, I am no longer sure it is a good idea as there are cases when the
>> generated changelog may not represent what actually happened.
>>
>> Consider a MERGE command like this.
>>
>> MERGE INTO target t USING source s
>> ON t.id
>> <https://flagged.apple.com/proxy?t2=DH9x3U2xu7&o=http%3A%2F%2Ft.id&emid=17bb959c-98fa-41ca-ad19-8d11a7796930&c=7>
>> = s.id
>> <https://flagged.apple.com/proxy?t2=dH1l7d9A7T&o=http%3A%2F%2Fs.id&emid=17bb959c-98fa-41ca-ad19-8d11a7796930&c=7>
>> WHEN MATCHED AND extra_col = ‘a’ THEN
>>   UPDATE SET update_clause_1
>> WHEN MATCHED AND extra_col = ‘b’ THEN
>>   UPDATE SET update_clause_2
>>
>> It is a valid MERGE and all matching rows must be updated. However,
>> reconstructing correct UPDATEs is impossible in this case.
>>
>> The spec does not enforce uniqueness and generating a wrong changelog can
>> lead to problems in downstream jobs. Detecting such cases is extremely
>> hard, not to mention the pain to rollback entire pipelines.
>>
>> Jack’s use case of deduplicating rows after writes is valid but shouldn’t
>> it be implemented using WAP or branches to ensure the main branch is clean?
>> That way, downstream consumers always see only correct data.
>>
>> In my view, it is reasonable to throw an exception when update images are
>> requested but IDs are not unique. Unless there is a good way to resolve the
>> problem above?
>>
>> - Anton
>>
>>
>> On Apr 24, 2023, at 3:00 PM, Yufei Gu <flyrain...@gmail.com> wrote:
>>
>> Two rows are the “same”—that is, the rows represent the same entity—if
>>> the identifier fields are equal. However, uniqueness of rows by this
>>> identifier is not guaranteed or required by Iceberg and it is the
>>> responsibility of processing engines or data providers to enforce.
>>
>> Based on the above table spec, it is the responsibility of individual
>> compute engines to determine if and how to enforce uniqueness. The three
>> modes you mentioned(unenforced mode/semi-enforced/enforced) are associated
>> with specific engines. To my knowledge, Spark does not enforce uniqueness,
>> while Flink offers options for both unenforced and enforced modes.
>> Changelog generation is independent of writer types of different engines.
>>
>> The current changelog generation tool covers the enforced use case of
>> course. However, it seems reasonable to provide users with an option to
>> prevent job failures in case of non-unique records. This flexibility would
>> allow for smoother operations and potentially better overall user
>> experience.Thanks for the suggestion.
>>
>> I'm also curious how the semi-enforced dedupe process works. Would the
>> result be like following two rows after the dedupe? Can you share any docs
>> or implementations for the dedupe process?
>> (1, 'a', DELETE)
>> (1, 'd', INSERT)
>>
>> Best,
>>
>> Yufei
>>
>>
>> On Sun, Apr 23, 2023 at 12:00 AM Jack Ye <yezhao...@gmail.com> wrote:
>>
>>> When the row identifier is defined in a table, I think the table can be
>>> viewed in one of the 3 modes:
>>> 1. *unenforced mode*: the table operates as if it has no primary key.
>>> 2. *semi-enforced mode*: some/most writers try to enforce the primary
>>> key, but it is not guaranteed that there are completely no duplicates. It
>>> is expected that duplicated rows will be removed as a part of table
>>> maintenance
>>> 2. *enforced mode*: the table's primary key must be enforced by the
>>> writers. Having duplicated rows is unexpected and is considered an illegal
>>> state.
>>>
>>> In your example, it seems like:
>>> 1. under unenforced mode, the changelog produced is technically correct
>>> although it is ambiguous.
>>> 2. under semi-enforced mode, users could accept this ambiguous result
>>> temporarily and expect the inconsistency to be resolved soon.
>>> 3. under enforced mode, it makes sense to throw an exception to notify
>>> the table owner that the table has entered an illegal state.
>>>
>>> To illustrate the semi-enforced case, a table maintenance process can
>>> continuously do an aggregation and delete the duplicated rows based on some
>>> merge-key definition. Suppose in your example the 'name' column is the
>>> merge key and larger value in comparison wins (just for the sake of
>>> discussion, probably not a very intuitive example, usually it's something
>>> like a timestamp column), after the dedupe process runs, the changelog
>>> including the new transactions would always be:
>>>
>>> (1, 'a', DELETE)
>>> (1, 'b', DELETE)
>>> (1, 'c', INSERT)
>>> (1, 'd', INSERT)
>>> (1, 'c', DELETE)
>>>
>>> a.k.a.
>>>
>>> (1, 'a', DELETE)
>>> (1, 'b', DELETE)
>>> (1, 'd', INSERT)
>>>
>>> and there is only a single record of (1, 'd') in the end regardless of
>>> which SQL in the original change was actually run.
>>>
>>> So going back to the original question, when the user does not expect
>>> the table primary key to always be strictly enforced, I think it still has
>>> value for users to have a solution, even if it might be ambiguous and might
>>> not be the unique and correct solution. That solution might already be good
>>> enough, or might eventually correct itself. If we follow this logic,
>>> throwing an exception could be based on a config, just like in CDC we have
>>> upsert mode as a specific mode to turn on. Otherwise people developing a
>>> change data feed based on this might have to be blocked by such error until
>>> the table is repaired and the duplicate rows are removed.
>>>
>>> Any thoughts?
>>>
>>> Best,
>>> Jack Ye
>>>
>>>
>>>
>>> On Thu, Apr 20, 2023, 11:59 PM Yufei Gu <flyrain...@gmail.com> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> I am reaching out to request your insights on addressing the ambiguous
>>>> behavior of generating changelogs in Iceberg.
>>>>
>>>> To provide some context, Iceberg does not enforce row uniqueness even
>>>> when configured with identifier fields (a.k.a primary key in the other
>>>> database system) during write operations. That means that it is possible to
>>>> have multiple rows with the same identifier fields values. For example,
>>>> let's consider a table "customer" with columns "id" (int) and "name"
>>>> (string), and the identifier field set as "id." It is still possible to
>>>> write multiple rows with the same "id" values, as shown below:
>>>>
>>>> (1, 'A')
>>>> (1, 'B')
>>>> (2, 'X')
>>>> (2, 'Y')
>>>>
>>>> The CreateChangelogViewProcedure
>>>> <https://github.com/apache/iceberg/blob/master/docs/spark-procedures.md#change-data-capture>
>>>> can reconstruct updates based on identifier fields. It works effectively
>>>> when there is only one row per identifier value. However, handling multiple
>>>> rows with the same identifier values can be challenging. For example, a
>>>> `Merge into` or `Update` command can result the following changes:
>>>>
>>>> (1, 'a', DELETE)
>>>> (1, 'b', DELETE)
>>>> (1, 'c', INSERT)
>>>> (1, 'd', INSERT)
>>>>
>>>> Unfortunately, it is impossible to determine whether "c" or "d" updated
>>>> "a". For example, both of the following commands are valid even though
>>>> there is an identifier column id.
>>>> UPDATE table SET data = 'c' WHERE data = 'a';
>>>> UPDATE table SET data = 'd' WHERE data = 'b';
>>>> Or
>>>> UPDATE table SET data = 'd' WHERE data = 'a';
>>>> UPDATE table SET data = 'c' WHERE data = 'b';
>>>>
>>>> Due to this uncertainty, we have allowed the procedure to throw an
>>>> exception in such cases. A relevant pull request can be found [here
>>>> <https://github.com/apache/iceberg/pull/7388>]. I would appreciate any
>>>> thoughts or suggestions.
>>>>
>>>> Best,
>>>>
>>>> Yufei
>>>>
>>>> `This is not a contribution`
>>>>
>>>
>>
>

Reply via email to