wenli xiao created FLINK-35277: ---------------------------------- Summary: Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. Key: FLINK-35277 URL: https://issues.apache.org/jira/browse/FLINK-35277 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: 3.1.0 Environment: Flink 1.18.0
DB2 11.5.x Reporter: wenli xiao Attachments: image-2024-04-30-22-19-17-350.png 1. background When attempting to use Flink CDC 3.1 in the Flink connector to load data from DB2 to Apache Doris, I set up DB2 using the Docker image {{ruanhang/db2-cdc-demo:v1}}. After configuring the DB2 asynchronous CDC, I tried to capture a table using {{CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE')}}. However, I encountered an error when attempting to add the eleventh table: [23505][-803] One or more values in the INSERT statement, UPDATE statement, or foreign key update caused by a DELETE statement are not valid because the primary key, unique constraint or unique index identified by "2" constrains table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 !image-2024-04-30-22-19-17-350.png! 2. The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate primary key. Here is the schema of {{Asncdc.IBMSNAP_PRUNCNTL}}: create table IBMSNAP_PRUNCNTL ( TARGET_SERVER CHARACTER(18) not null, TARGET_OWNER VARCHAR(128) not null, TARGET_TABLE VARCHAR(128) not null, SYNCHTIME TIMESTAMP(6), SYNCHPOINT VARCHAR(16) FOR BIT DATA, SOURCE_OWNER VARCHAR(128) not null, SOURCE_TABLE VARCHAR(128) not null, SOURCE_VIEW_QUAL SMALLINT not null, APPLY_QUAL CHARACTER(18) not null, SET_NAME CHARACTER(18) not null, CNTL_SERVER CHARACTER(18) not null, TARGET_STRUCTURE SMALLINT not null, CNTL_ALIAS CHARACTER(8), PHYS_CHANGE_OWNER VARCHAR(128), PHYS_CHANGE_TABLE VARCHAR(128), MAP_ID VARCHAR(10) not null ); create unique index IBMSNAP_PRUNCNTLX on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, APPLY_QUAL, SET_NAME, TARGET_SERVER, TARGET_TABLE, TARGET_OWNER); create unique index IBMSNAP_PRUNCNTLX1 on IBMSNAP_PRUNCNTL (MAP_ID); create index IBMSNAP_PRUNCNTLX2 on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); create index IBMSNAP_PRUNCNTLX3 on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); The issue stems from the logic in {{asncdc.addtable}} not aligning with the {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The original insert statement is as follows: -- Original insert statement SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 'TARGET_SERVER, ' || 'TARGET_OWNER, ' || 'TARGET_TABLE, ' || 'SYNCHTIME, ' || 'SYNCHPOINT, ' || 'SOURCE_OWNER, ' || 'SOURCE_TABLE, ' || 'SOURCE_VIEW_QUAL, ' || 'APPLY_QUAL, ' || 'SET_NAME, ' || 'CNTL_SERVER , ' || 'TARGET_STRUCTURE , ' || 'CNTL_ALIAS , ' || 'PHYS_CHANGE_OWNER , ' || 'PHYS_CHANGE_TABLE , ' || 'MAP_ID ' || ') VALUES ( ' || '''KAFKA'', ' || '''' || tableschema || ''', ' || '''' || tablename || ''', ' || 'NULL, ' || 'NULL, ' || '''' || tableschema || ''', ' || '''' || tablename || ''', ' || '0, ' || '''KAFKAQUAL'', ' || '''SET001'', ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '8, ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '''ASNCDC'', ' || '''CDC_' || tableschema || '_' || tablename || ''', ' || ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || ' )'; EXECUTE IMMEDIATE stmtSQL; The {{max(MAP_ID)}} logic is incorrect, as the correct result should be {{CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10))}}. This issue prevents the addition of the eleventh table. For more details about {{asncdcaddremove.sql}}, please refer to: [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189]. -- This message was sent by Atlassian Jira (v8.20.10#820010)