Bodong Liu created FLINK-33785: ---------------------------------- Summary: TableJdbcUpsertOutputFormat could not deal with DELETE record correctly when primary keys were set Key: FLINK-33785 URL: https://issues.apache.org/jira/browse/FLINK-33785 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: jdbc-3.1.1 Environment: Flink: 1.17.1
Jdbc connector: 3.1.1 Postgresql: 16.1 Reporter: Bodong Liu Attachments: image-2023-12-08-22-24-20-295.png, image-2023-12-08-22-24-26-493.png, image-2023-12-08-22-24-58-986.png, image-2023-12-08-22-28-44-948.png, image-2023-12-08-22-38-08-559.png, image-2023-12-08-22-40-35-530.png, image-2023-12-08-22-42-06-566.png h1. Issue Description When using jdbc connector to DELETE records in database, I found it CAN NOT delete records correctly. h1. Reproduction steps The steps are as follows: * Create a table with 5 fields and a pk. DDL in postgres: {code:java} create table public.fake ( id bigint not null default nextval('fake_id_seq'::regclass), name character varying(128) not null, age integer, location character varying(256), birthday timestamp without time zone default CURRENT_TIMESTAMP, primary key (id, name) );{code} !image-2023-12-08-22-24-26-493.png! * Insert some data into the table: {code:java} INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1, 'Jack', 10, null, '2023-12-08 21:35:46.000000'); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2, 'Jerry', 18, 'Fake Location', '2023-12-08 13:36:17.088295'); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3, 'John', 20, null, null); INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4, 'Marry', null, null, '2023-12-08 13:37:09.721785'); {code} !image-2023-12-08-22-24-58-986.png! * Run the flink code: {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final String[] fieldNames = {"id", "name", "age", "location", "birthday"}; final int[] fieldTypes = { Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR, Types.TIMESTAMP }; final String[] primaryKeys = {"id", "name"}; InternalJdbcConnectionOptions internalJdbcConnectionOptions = InternalJdbcConnectionOptions.builder() .setClassLoader(Thread.currentThread().getContextClassLoader()) .setDriverName(Driver.class.getName()) .setDBUrl("jdbc:postgresql://localhost:5432/postgres") .setUsername("postgres") .setPassword("postgres") .setTableName("fake") .setParallelism(1) .setConnectionCheckTimeoutSeconds(10) .setDialect(new PostgresDialect()) .build(); JdbcOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> jdbcOutputFormat = JdbcOutputFormat.builder() .setFieldNames(fieldNames) .setKeyFields(primaryKeys) .setFieldTypes(fieldTypes) .setOptions(internalJdbcConnectionOptions) .setFlushIntervalMills(1000) .setFlushMaxSize(10) .setMaxRetryTimes(3) .build(); GenericJdbcSinkFunction<Tuple2<Boolean, Row>> jdbcSinkFunction = new GenericJdbcSinkFunction<>(jdbcOutputFormat); Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.000000"); // Row to delete Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp); Tuple2<Boolean, Row> element = Tuple2.of(false, row); env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction); env.execute(); } {code} When the code executed successfully, we can see that the record id=1 and name=Jack was not deleted. h1. Cause Analysis In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was set in the JdbcDmlOptions, the method will return a 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'. !image-2023-12-08-22-28-44-948.png! And in 'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor', the method get all the fieldNames instead of keyFields to build the delete sql statement. So the detele sql may not execute correctly. !image-2023-12-08-22-38-08-559.png! h1. How to fix * Use the real keyFields then fallback to fieldNames to build the executor. !image-2023-12-08-22-42-06-566.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)