If I understand correctly, duplicate IDs are only a problem if we want to 
compute pre and post update images. If we are not trying to rebuild updates, it 
should not matter, which is good as most use cases can be solved with only 
DELETE and INSERT. 

My initial thought was to do our best and come up with a reasonable changelog 
in case rows are not unique and update images were requested. However, I am no 
longer sure it is a good idea as there are cases when the generated changelog 
may not represent what actually happened.

Consider a MERGE command like this.

MERGE INTO target t USING source s
ON t.id = s.id
WHEN MATCHED AND extra_col = ‘a’ THEN
  UPDATE SET update_clause_1
WHEN MATCHED AND extra_col = ‘b’ THEN
  UPDATE SET update_clause_2

It is a valid MERGE and all matching rows must be updated. However, 
reconstructing correct UPDATEs is impossible in this case.

The spec does not enforce uniqueness and generating a wrong changelog can lead 
to problems in downstream jobs. Detecting such cases is extremely hard, not to 
mention the pain to rollback entire pipelines.

Jack’s use case of deduplicating rows after writes is valid but shouldn’t it be 
implemented using WAP or branches to ensure the main branch is clean? That way, 
downstream consumers always see only correct data.

In my view, it is reasonable to throw an exception when update images are 
requested but IDs are not unique. Unless there is a good way to resolve the 
problem above?

- Anton
 

> On Apr 24, 2023, at 3:00 PM, Yufei Gu <flyrain...@gmail.com> wrote:
> 
> Two rows are the “same”—that is, the rows represent the same entity—if the 
> identifier fields are equal. However, uniqueness of rows by this identifier 
> is not guaranteed or required by Iceberg and it is the responsibility of 
> processing engines or data providers to enforce.
> Based on the above table spec, it is the responsibility of individual compute 
> engines to determine if and how to enforce uniqueness. The three modes you 
> mentioned(unenforced mode/semi-enforced/enforced) are associated with 
> specific engines. To my knowledge, Spark does not enforce uniqueness, while 
> Flink offers options for both unenforced and enforced modes. Changelog 
> generation is independent of writer types of different engines.
> 
> The current changelog generation tool covers the enforced use case of course. 
> However, it seems reasonable to provide users with an option to prevent job 
> failures in case of non-unique records. This flexibility would allow for 
> smoother operations and potentially better overall user experience.Thanks for 
> the suggestion.
> 
> I'm also curious how the semi-enforced dedupe process works. Would the result 
> be like following two rows after the dedupe? Can you share any docs or 
> implementations for the dedupe process?
> (1, 'a', DELETE)
> (1, 'd', INSERT)
> 
> Best,
> 
> Yufei
> 
> 
> On Sun, Apr 23, 2023 at 12:00 AM Jack Ye <yezhao...@gmail.com 
> <mailto:yezhao...@gmail.com>> wrote:
> When the row identifier is defined in a table, I think the table can be 
> viewed in one of the 3 modes:
> 1. unenforced mode: the table operates as if it has no primary key.
> 2. semi-enforced mode: some/most writers try to enforce the primary key, but 
> it is not guaranteed that there are completely no duplicates. It is expected 
> that duplicated rows will be removed as a part of table maintenance
> 2. enforced mode: the table's primary key must be enforced by the writers. 
> Having duplicated rows is unexpected and is considered an illegal state.
> 
> In your example, it seems like:
> 1. under unenforced mode, the changelog produced is technically correct 
> although it is ambiguous.
> 2. under semi-enforced mode, users could accept this ambiguous result 
> temporarily and expect the inconsistency to be resolved soon.
> 3. under enforced mode, it makes sense to throw an exception to notify the 
> table owner that the table has entered an illegal state.
> 
> To illustrate the semi-enforced case, a table maintenance process can 
> continuously do an aggregation and delete the duplicated rows based on some 
> merge-key definition. Suppose in your example the 'name' column is the merge 
> key and larger value in comparison wins (just for the sake of discussion, 
> probably not a very intuitive example, usually it's something like a 
> timestamp column), after the dedupe process runs, the changelog including the 
> new transactions would always be:
> 
> (1, 'a', DELETE)
> (1, 'b', DELETE)
> (1, 'c', INSERT)
> (1, 'd', INSERT)
> (1, 'c', DELETE)
> 
> a.k.a.
> 
> (1, 'a', DELETE)
> (1, 'b', DELETE)
> (1, 'd', INSERT)
> 
> and there is only a single record of (1, 'd') in the end regardless of which 
> SQL in the original change was actually run.
> 
> So going back to the original question, when the user does not expect the 
> table primary key to always be strictly enforced, I think it still has value 
> for users to have a solution, even if it might be ambiguous and might not be 
> the unique and correct solution. That solution might already be good enough, 
> or might eventually correct itself. If we follow this logic, throwing an 
> exception could be based on a config, just like in CDC we have upsert mode as 
> a specific mode to turn on. Otherwise people developing a change data feed 
> based on this might have to be blocked by such error until the table is 
> repaired and the duplicate rows are removed.
> 
> Any thoughts?
> 
> Best,
> Jack Ye
> 
> 
> 
> On Thu, Apr 20, 2023, 11:59 PM Yufei Gu <flyrain...@gmail.com 
> <mailto:flyrain...@gmail.com>> wrote:
> Hi folks, 
> 
> I am reaching out to request your insights on addressing the ambiguous 
> behavior of generating changelogs in Iceberg.
> 
> To provide some context, Iceberg does not enforce row uniqueness even when 
> configured with identifier fields (a.k.a primary key in the other database 
> system) during write operations. That means that it is possible to have 
> multiple rows with the same identifier fields values. For example, let's 
> consider a table "customer" with columns "id" (int) and "name" (string), and 
> the identifier field set as "id." It is still possible to write multiple rows 
> with the same "id" values, as shown below:
> 
> (1, 'A')
> (1, 'B')
> (2, 'X')
> (2, 'Y')
> 
> The CreateChangelogViewProcedure 
> <https://github.com/apache/iceberg/blob/master/docs/spark-procedures.md#change-data-capture>
>  can reconstruct updates based on identifier fields. It works effectively 
> when there is only one row per identifier value. However, handling multiple 
> rows with the same identifier values can be challenging. For example, a 
> `Merge into` or `Update` command can result the following changes:
> 
> (1, 'a', DELETE)
> (1, 'b', DELETE)
> (1, 'c', INSERT)
> (1, 'd', INSERT)
> 
> Unfortunately, it is impossible to determine whether "c" or "d" updated "a". 
> For example, both of the following commands are valid even though there is an 
> identifier column id.
> UPDATE table SET data = 'c' WHERE data = 'a';
> UPDATE table SET data = 'd' WHERE data = 'b';
> Or 
> UPDATE table SET data = 'd' WHERE data = 'a';
> UPDATE table SET data = 'c' WHERE data = 'b';
> 
> Due to this uncertainty, we have allowed the procedure to throw an exception 
> in such cases. A relevant pull request can be found [here 
> <https://github.com/apache/iceberg/pull/7388>]. I would appreciate any 
> thoughts or suggestions.
> 
> Best,
> 
> Yufei
> 
> `This is not a contribution`

Reply via email to