[ https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee updated FLINK-28569: -------------------------------- Fix Version/s: 1.16.0 > SinkUpsertMaterializer should be aware of the input upsertKey if it is not > empty otherwise wrong result maybe produced > ---------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-28569 > URL: https://issues.apache.org/jira/browse/FLINK-28569 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.14.5, 1.15.2 > Reporter: lincoln lee > Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > Currently SinkUpsertMaterializer only update row by comparing the complete > row in anycase, but this may cause wrong result if input has upsertKey and > also non-deterministic column values, see such a case: > {code:java} > @Test > public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() { > tEnv.createTemporaryFunction( > "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf()); > String cdcDdl = > "CREATE TABLE users (\n" > + " user_id STRING,\n" > + " user_name STRING,\n" > + " email STRING,\n" > + " balance DECIMAL(18,2),\n" > + " primary key (user_id) not enforced\n" > + ") WITH (\n" > + " 'connector' = 'values',\n" > + " 'changelog-mode' = 'I,UA,UB,D'\n" > + ")"; > String sinkTableDdl = > "CREATE TABLE sink (\n" > + " user_id STRING,\n" > + " user_name STRING,\n" > + " email STRING,\n" > + " balance DECIMAL(18,2),\n" > + " PRIMARY KEY(email) NOT ENFORCED\n" > + ") WITH(\n" > + " 'connector' = 'values',\n" > + " 'sink-insert-only' = 'false'\n" > + ")"; > tEnv.executeSql(cdcDdl); > tEnv.executeSql(sinkTableDdl); > util.verifyJsonPlan( > "insert into sink select user_id, ndFunc(user_name), email, balance from > users"); > } > {code} > for original cdc source records: > {code} > +I[user1, Tom, t...@gmail.com, 10.02], > -D[user1, Tom, t...@gmail.com, 10.02], > {code} > the above query cannot correctly delete the former insertion row because of > the non-deterministic column value 'ndFunc(user_name)' > this canbe solved by letting the SinkUpsertMaterializer be aware of input > upsertKey and update by it -- This message was sent by Atlassian Jira (v8.20.10#820010)