Bodong Liu created FLINK-33785:
----------------------------------

             Summary: TableJdbcUpsertOutputFormat could not deal with DELETE 
record correctly when primary keys were set
                 Key: FLINK-33785
                 URL: https://issues.apache.org/jira/browse/FLINK-33785
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: jdbc-3.1.1
         Environment: Flink: 1.17.1

Jdbc connector: 3.1.1

Postgresql: 16.1
            Reporter: Bodong Liu
         Attachments: image-2023-12-08-22-24-20-295.png, 
image-2023-12-08-22-24-26-493.png, image-2023-12-08-22-24-58-986.png, 
image-2023-12-08-22-28-44-948.png, image-2023-12-08-22-38-08-559.png, 
image-2023-12-08-22-40-35-530.png, image-2023-12-08-22-42-06-566.png

h1. Issue Description

When using jdbc connector to DELETE records in database, I found it CAN NOT 
delete records correctly.
h1. Reproduction steps

The steps are as follows:
 * Create a table with 5 fields and a pk. DDL in postgres:

 
{code:java}
create table public.fake
(
    id       bigint                 not null default 
nextval('fake_id_seq'::regclass),
    name     character varying(128) not null,
    age      integer,
    location character varying(256),
    birthday timestamp without time zone     default CURRENT_TIMESTAMP,
    primary key (id, name)
);{code}
!image-2023-12-08-22-24-26-493.png!

 
 * Insert some data into the table:

{code:java}
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (1, 'Jack', 
10, null, '2023-12-08 21:35:46.000000');
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (2, 'Jerry', 
18, 'Fake Location', '2023-12-08 13:36:17.088295');
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (3, 'John', 
20, null, null);
INSERT INTO public.fake (id, name, age, location, birthday) VALUES (4, 'Marry', 
null, null, '2023-12-08 13:37:09.721785');
{code}
!image-2023-12-08-22-24-58-986.png!
 * Run the flink code:

{code:java}
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

    final String[] fieldNames = {"id", "name", "age", "location", "birthday"};
    final int[] fieldTypes = {
            Types.BIGINT, Types.VARCHAR, Types.INTEGER, Types.VARCHAR, 
Types.TIMESTAMP
    };
    final String[] primaryKeys = {"id", "name"};
    InternalJdbcConnectionOptions internalJdbcConnectionOptions =
            InternalJdbcConnectionOptions.builder()
                    
.setClassLoader(Thread.currentThread().getContextClassLoader())
                    .setDriverName(Driver.class.getName())
                    .setDBUrl("jdbc:postgresql://localhost:5432/postgres")
                    .setUsername("postgres")
                    .setPassword("postgres")
                    .setTableName("fake")
                    .setParallelism(1)
                    .setConnectionCheckTimeoutSeconds(10)
                    .setDialect(new PostgresDialect())
                    .build();
    JdbcOutputFormat<Tuple2<Boolean, Row>, Row, 
JdbcBatchStatementExecutor<Row>> jdbcOutputFormat =
            JdbcOutputFormat.builder()
                    .setFieldNames(fieldNames)
                    .setKeyFields(primaryKeys)
                    .setFieldTypes(fieldTypes)
                    .setOptions(internalJdbcConnectionOptions)
                    .setFlushIntervalMills(1000)
                    .setFlushMaxSize(10)
                    .setMaxRetryTimes(3)
                    .build();

    GenericJdbcSinkFunction<Tuple2<Boolean, Row>> jdbcSinkFunction =
            new GenericJdbcSinkFunction<>(jdbcOutputFormat);

    Timestamp timestamp = Timestamp.valueOf("2023-12-08 21:35:46.000000");
    // Row to delete
    Row row = Row.ofKind(RowKind.DELETE, 1L, "Jack", 10, null, timestamp);
    Tuple2<Boolean, Row> element = Tuple2.of(false, row);
    
env.fromCollection(Collections.singleton(element)).addSink(jdbcSinkFunction);
    env.execute();
} {code}
When the code executed successfully, we can see that the record id=1 and 
name=Jack was not deleted.
h1. Cause Analysis

In the build method of JdbcOutputFormat.Builder, if 'keyFields' option was set 
in the JdbcDmlOptions, the method will return a 
'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat'.

!image-2023-12-08-22-28-44-948.png!

And in 
'org.apache.flink.connector.jdbc.internal.TableJdbcUpsertOutputFormat#createDeleteExecutor',
 the method get all the fieldNames instead of keyFields to build the delete sql 
statement. So the detele sql may not execute correctly.

!image-2023-12-08-22-38-08-559.png!
h1. How to fix
 * Use the real keyFields then fallback to fieldNames to build the executor.

!image-2023-12-08-22-42-06-566.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to