[ https://issues.apache.org/jira/browse/FLINK-26595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17506318#comment-17506318 ]
wuguihu commented on FLINK-26595: --------------------------------- [~jark] I checked out the [Postgres website|https://www.postgresql.org/docs/9.5/sql-insert.html], and Postgres supports this. In the example on the official website, the UPDATE SET clause does not contain primary key field. {code:sql} INSERT INTO distributors (did, dname) VALUES (5, 'Gizmo Transglobal'), (6, 'Associated Computing, Inc') ON CONFLICT (did) DO UPDATE SET dname = EXCLUDED.dname; {code} !image-20220315001550269.png! > Improve the PostgresDialect method for getting upsert statements. > ----------------------------------------------------------------- > > Key: FLINK-26595 > URL: https://issues.apache.org/jira/browse/FLINK-26595 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.13.1 > Reporter: wuguihu > Assignee: wuguihu > Priority: Major > Labels: pull-request-available > Attachments: image-20220311125613545.png, > image-20220311130744606.png, image-20220311141815540.png, > image-20220315001550269.png > > > I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real > time. > But I encountered an error. > The error message is as follows: > {quote}CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: > Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) > VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', > 'user_...@foo.com') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, > name=EXCLUDED.name, address=EXCLUDED.address, > phone_number=EXCLUDED.phone_number, email=EXCLUDED.email was aborted: ERROR: > modification of distribution columns in OnConflictUpdate is not supported > Call getNextException to see other errors in the batch. > {quote} > This exception is caused by the getUpsertStatement method of PostgresDialect. > There is something wrong with the upsert statement. > In the Update statement, uniqueKey-related columns should be deleted; > > I did the following experiment to test my modifications. > At the same time, I recompiled and packaged flink-connector-JDBC. Using the > modified flink-connector-JDBC, my program no longer reported errors. > {code:sql} > -- 1、Create a table for maxtrixDB > CREATE TABLE user_1 ( > id int, > name VARCHAR(255) NOT NULL DEFAULT 'flink', > address VARCHAR(1024), > phone_number VARCHAR(512), > email VARCHAR(255), > UNIQUE(id) > ); > -- 2、Insert a record. > INSERT INTO user_1(id, name, address, phone_number, email) > VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', > 'user_...@foo.com') > ON CONFLICT (id) > DO UPDATE SET > id=EXCLUDED.id, > name=EXCLUDED.name, > address=EXCLUDED.address, > phone_number=EXCLUDED.phone_number, > email=EXCLUDED.email; > -- 3、Executing the above insert statement results in the following error. > ERROR: modification of distribution columns in OnConflictUpdate is not > supported > -- 4、If the value is changed to the following statement, the command is > executed successfully. > INSERT INTO user_1(id, name, address, phone_number, email) > VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', > 'user_...@foo.com') > ON CONFLICT (id) > DO UPDATE SET > name=EXCLUDED.name, > address=EXCLUDED.address, > phone_number=EXCLUDED.phone_number, > email=EXCLUDED.email; > {code} > > > The PostgresDialect class handles upsert statements as follows: > {code:java} > // package org.apache.flink.connector.jdbc.dialect.psql > public Optional<String> getUpsertStatement( > String tableName, String[] fieldNames, String[] uniqueKeyFields) { > String uniqueColumns = > Arrays.stream(uniqueKeyFields) > .map(this::quoteIdentifier) > .collect(Collectors.joining(", ")); > String updateClause = > Arrays.stream(fieldNames) > .map(f -> quoteIdentifier(f) + "=EXCLUDED." + > quoteIdentifier(f)) > .collect(Collectors.joining(", ")); > return Optional.of( > getInsertIntoStatement(tableName, fieldNames) > + " ON CONFLICT (" > + uniqueColumns > + ")" > + " DO UPDATE SET " > + updateClause); > } > {code} > > > To fix this problem, make the following changes to PostgresDialect: > {code:java} > // package org.apache.flink.connector.jdbc.dialect.psql > public Optional<String> getUpsertStatement( > String tableName, String[] fieldNames, String[] uniqueKeyFields) { > String uniqueColumns = > Arrays.stream(uniqueKeyFields) > .map(this::quoteIdentifier) > .collect(Collectors.joining(", ")); > List tempList = Arrays.asList(uniqueKeyFields); > String updateClause = > Arrays.stream(fieldNames) > .filter(f->!tempList.contains(f)) > .map(f -> quoteIdentifier(f) + "=EXCLUDED." + > quoteIdentifier(f)) > .collect(Collectors.joining(", ")); > return Optional.of( > getInsertIntoStatement(tableName, fieldNames) > + " ON CONFLICT (" > + uniqueColumns > + ")" > + " DO UPDATE SET " > + updateClause); > } > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)