[ 
https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-28569:
--------------------------------
    Fix Version/s: 1.16.0

> SinkUpsertMaterializer should be aware of the input upsertKey if it is not 
> empty otherwise wrong result maybe produced
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28569
>                 URL: https://issues.apache.org/jira/browse/FLINK-28569
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.5, 1.15.2
>            Reporter: lincoln lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0
>
>
> Currently SinkUpsertMaterializer only update row by comparing the complete 
> row in anycase, but this may cause wrong result if input has upsertKey and 
> also non-deterministic column values, see such a case:
> {code:java}
> @Test
> public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
> tEnv.createTemporaryFunction(
> "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
> String cdcDdl =
> "CREATE TABLE users (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " primary key (user_id) not enforced\n"
> + ") WITH (\n"
> + " 'connector' = 'values',\n"
> + " 'changelog-mode' = 'I,UA,UB,D'\n"
> + ")";
> String sinkTableDdl =
> "CREATE TABLE sink (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " PRIMARY KEY(email) NOT ENFORCED\n"
> + ") WITH(\n"
> + " 'connector' = 'values',\n"
> + " 'sink-insert-only' = 'false'\n"
> + ")";
> tEnv.executeSql(cdcDdl);
> tEnv.executeSql(sinkTableDdl);
> util.verifyJsonPlan(
> "insert into sink select user_id, ndFunc(user_name), email, balance from 
> users");
> }
> {code}
> for original cdc source records:
> {code}
> +I[user1, Tom, t...@gmail.com, 10.02],
> -D[user1, Tom, t...@gmail.com, 10.02],
> {code}
> the above query cannot correctly delete the former insertion row because of 
> the non-deterministic column value 'ndFunc(user_name)'
> this canbe solved by letting the SinkUpsertMaterializer be aware of input 
> upsertKey and update by it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to