[ https://issues.apache.org/jira/browse/FLINK-36584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893337#comment-17893337 ]
Anil Dasari commented on FLINK-36584: ------------------------------------- [~loserwang1024] Thanks for the sharing details. On issue 2: I've enabled the skip snapshot backfill by using {{{}PostgresSourceBuilder.PostgresIncrementalSource#skipSnapshotBackfill(true){}}}, and the task index replication slots have been created. However, I've noticed that {{PostgresScanFetchTask#maybeCreateSlotForBackFillReadTask}} is bypassing the task-based replication slot when skipSnapshotBackfill is set to true. I'm currently debugging the code to determine where 'backfillslottest_0' is used in the process. when set to true: {code:java} flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] LOG: logical decoding found consistent point at 0/1677190 flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-28 04:57:26.240 UTC [212] LOG: exported logical decoding snapshot: "00000006-0000013E-1" with 0 transaction IDs flink-postgres-1 | 2024-10-28 04:57:27.291 UTC [214] ERROR: replication slot "backfillslottest_0" does not exist flink-postgres-1 | 2024-10-28 04:57:27.291 UTC [214] STATEMENT: select pg_drop_replication_slot('backfillslottest_0') flink-postgres-1 | 2024-10-28 04:57:27.734 UTC [216] LOG: starting logical decoding for slot "backfillslottest" flink-postgres-1 | 2024-10-28 04:57:27.734 UTC [216] DETAIL: Streaming transactions committing after 0/16771C8, reading WAL from 0/1677190. flink-postgres-1 | 2024-10-28 04:57:27.735 UTC [216] LOG: logical decoding found consistent point at 0/1677190 flink-postgres-1 | 2024-10-28 04:57:27.735 UTC [216] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-28 04:57:27.846 UTC [218] LOG: starting logical decoding for slot "backfillslottest" flink-postgres-1 | 2024-10-28 04:57:27.846 UTC [218] DETAIL: Streaming transactions committing after 0/16771C8, reading WAL from 0/1677190. flink-postgres-1 | 2024-10-28 04:57:27.847 UTC [218] LOG: logical decoding found consistent point at 0/1677190 flink-postgres-1 | 2024-10-28 04:57:27.847 UTC [218] DETAIL: There are no running transactions. {code} when set to false: {code:java} flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] LOG: logical decoding found consistent point at 0/1677090 flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-28 04:56:55.409 UTC [199] LOG: exported logical decoding snapshot: "00000007-0000001A-1" with 0 transaction IDs flink-postgres-1 | 2024-10-28 04:56:55.614 UTC [199] LOG: starting logical decoding for slot "nobackfillslot_0" flink-postgres-1 | 2024-10-28 04:56:55.614 UTC [199] DETAIL: Streaming transactions committing after 0/16770C8, reading WAL from 0/1677090. flink-postgres-1 | 2024-10-28 04:56:55.615 UTC [199] LOG: logical decoding found consistent point at 0/1677090 flink-postgres-1 | 2024-10-28 04:56:55.615 UTC [199] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-28 04:56:56.238 UTC [201] LOG: starting logical decoding for slot "nobackfillslot_0" flink-postgres-1 | 2024-10-28 04:56:56.238 UTC [201] DETAIL: Streaming transactions committing after 0/16770C8, reading WAL from 0/1677090. flink-postgres-1 | 2024-10-28 04:56:56.239 UTC [201] LOG: logical decoding found consistent point at 0/1677090 flink-postgres-1 | 2024-10-28 04:56:56.239 UTC [201] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-28 04:56:56.295 UTC [203] ERROR: replication slot "nobackfillslot_0" does not exist flink-postgres-1 | 2024-10-28 04:56:56.295 UTC [203] STATEMENT: select pg_drop_replication_slot('nobackfillslot_0') flink-postgres-1 | 2024-10-28 04:56:56.391 UTC [204] ERROR: replication slot "nobackfillslot_0" does not exist flink-postgres-1 | 2024-10-28 04:56:56.391 UTC [204] STATEMENT: select pg_drop_replication_slot('nobackfillslot_0') flink-postgres-1 | 2024-10-28 04:56:56.765 UTC [206] LOG: starting logical decoding for slot "nobackfillslot" flink-postgres-1 | 2024-10-28 04:56:56.765 UTC [206] DETAIL: Streaming transactions committing after 0/1677060, reading WAL from 0/1677028. flink-postgres-1 | 2024-10-28 04:56:56.766 UTC [206] LOG: logical decoding found consistent point at 0/1677028 flink-postgres-1 | 2024-10-28 04:56:56.766 UTC [206] DETAIL: There are no running transactions. flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] LOG: starting logical decoding for slot "nobackfillslot" flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] DETAIL: Streaming transactions committing after 0/1677060, reading WAL from 0/1677028. flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] LOG: logical decoding found consistent point at 0/1677028 flink-postgres-1 | 2024-10-28 04:57:04.548 UTC [209] DETAIL: There are no running transactions. {code} Followup question on issue -1: streamsplit created [here|https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L140] (even when the startup mode is set to snapshot only and new table scans are disabled) is different from the backfill task created [here|[https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java#L123].] Could you please clarify ? Backfill task is submitted at the end of the snapshot task and not at the end of snapshot (i.e on completion of all snapshot tasks) ? > PostgresIncrementalSource is not exiting the flink execution when > StartupOptions is snapshot and create multiple replication slots > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-36584 > URL: https://issues.apache.org/jira/browse/FLINK-36584 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: 3.0.0 > Reporter: Anil Dasari > Priority: Major > > Issue-1. PostgresIncrementalSource is not exiting the Flink execution when > StartupOptions is snapshot. > > Postgres cdc module is using HybridSplitAssigner for batch scan and is > [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128] > still trying to create streaming split when snapshot splits are completed. > > Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is > creating multiple replication slots. > > postgres logs: > {code:java} > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: logical > decoding found consistent point at 0/1690B28 > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: exported > logical decoding snapshot: "00000006-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: exported > logical decoding snapshot: "00000008-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.267 UTC [52] LOG: exported > logical decoding snapshot: "00000009-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690BF8. > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] ERROR: replication > slot "flinkpostgres_1" does not exist > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_1') > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690C30. > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_0') > flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_0') {code} > Setup: > > flink-cdc version : 3.2.0 > flink version: 1.19 > Steps to reproduce the issue: > > 1. main code: > {code:java} > DebeziumDeserializationSchema<String> deserializer = > new JsonDebeziumDeserializationSchema(); > JdbcIncrementalSource<String> postgresIncrementalSource = > > PostgresSourceBuilder.PostgresIncrementalSource.<String>builder() > .startupOptions(StartupOptions.snapshot()) > .hostname("localhost") > .port(5432) > .database("test") > .schemaList("public") > .username("postgres") > .password("postgres") > .slotName("flinkpostgres") > .decodingPluginName("pgoutput") > .deserializer(deserializer) > // .splitSize(2) > .build(); > > Configuration config = new Configuration(); > config.set(RestartStrategyOptions.RESTART_STRATEGY, "none"); > config.setString("heartbeat.interval", "6000000"); // 100 minutes > config.setString("heartbeat.timeout", "18000000"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(config); > env.enableCheckpointing(300000); > env.fromSource( > postgresIncrementalSource, > WatermarkStrategy.noWatermarks(), > "PostgresParallelSource") > .setParallelism(2) > .print(); > env.execute("Output Postgres Snapshot"); {code} > 2. Create two tables and records in postgres > 3. Run step#1 code. -- This message was sent by Atlassian Jira (v8.20.10#820010)