Anil Dasari created FLINK-36584: ----------------------------------- Summary: PostgresIncrementalSource is not exiting the flink execution when StartupOptions is snapshot and create multiple replication slots Key: FLINK-36584 URL: https://issues.apache.org/jira/browse/FLINK-36584 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: 3.0.0 Reporter: Anil Dasari
Issue-1. PostgresIncrementalSource is not exiting the Flink execution when StartupOptions is snapshot. Postgres cdc module is using HybridSplitAssigner for batch scan and is [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128] still trying to create streaming split when snapshot splits are completed. Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is creating multiple replication slots. postgres logs: {code:java} flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: logical decoding found consistent point at 0/1690B28 flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: exported logical decoding snapshot: "00000006-00000003-1" with 0 transaction IDs flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: logical decoding found consistent point at 0/1690BF8 flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: exported logical decoding snapshot: "00000008-00000003-1" with 0 transaction IDs flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] LOG: logical decoding found consistent point at 0/1690C30 flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-22 18:56:58.267 UTC [52] LOG: exported logical decoding snapshot: "00000009-00000003-1" with 0 transaction IDs flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] LOG: starting logical decoding for slot "flinkpostgres_0" flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] DETAIL: Streaming transactions committing after 0/1690C30, reading WAL from 0/1690BF8. flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] LOG: logical decoding found consistent point at 0/1690BF8 flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] ERROR: replication slot "flinkpostgres_1" does not exist flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] STATEMENT: select pg_drop_replication_slot('flinkpostgres_1') flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] LOG: starting logical decoding for slot "flinkpostgres_0" flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] DETAIL: Streaming transactions committing after 0/1690C30, reading WAL from 0/1690C30. flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] LOG: logical decoding found consistent point at 0/1690C30 flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] ERROR: replication slot "flinkpostgres_0" does not exist flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] STATEMENT: select pg_drop_replication_slot('flinkpostgres_0') flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] ERROR: replication slot "flinkpostgres_0" does not exist flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] STATEMENT: select pg_drop_replication_slot('flinkpostgres_0') {code} Setup: flink-cdc version : 3.2.0 flink version: 1.19 Steps to reproduce the issue: 1. main code: {code:java} DebeziumDeserializationSchema<String> deserializer = new JsonDebeziumDeserializationSchema(); JdbcIncrementalSource<String> postgresIncrementalSource = PostgresSourceBuilder.PostgresIncrementalSource.<String>builder() .startupOptions(StartupOptions.snapshot()) .hostname("localhost") .port(5432) .database("test") .schemaList("public") .username("postgres") .password("postgres") .slotName("flinkpostgres") .decodingPluginName("pgoutput") .deserializer(deserializer) // .splitSize(2) .build(); Configuration config = new Configuration(); config.set(RestartStrategyOptions.RESTART_STRATEGY, "none"); config.setString("heartbeat.interval", "6000000"); // 100 minutes config.setString("heartbeat.timeout", "18000000"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); env.enableCheckpointing(300000); env.fromSource( postgresIncrementalSource, WatermarkStrategy.noWatermarks(), "PostgresParallelSource") .setParallelism(2) .print(); env.execute("Output Postgres Snapshot"); {code} 2. Create two tables and records in postgres 3. Run step#1 code. -- This message was sent by Atlassian Jira (v8.20.10#820010)