[ https://issues.apache.org/jira/browse/FLINK-36584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893670#comment-17893670 ]
Hongshun Wang commented on FLINK-36584: --------------------------------------- Yes, when skip backfill, the snapshot spot only read the current state, and do not read log. And the stream split will begin from the begging offset.Thus, it can only provide at-least once semantics. > Backfill task is submitted at the end of the snapshot task and not at the end > of snapshot. Yes, each backfill task is executed with each snapshot task and not at the completion of all snapshot tasks for parallel reading . Each backfill task may read log of different time duration. Thus, we still need to do some extra skip operation in stream split. > PostgresIncrementalSource is not exiting the flink execution when > StartupOptions is snapshot and create multiple replication slots > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-36584 > URL: https://issues.apache.org/jira/browse/FLINK-36584 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: 3.0.0 > Reporter: Anil Dasari > Priority: Major > Attachments: image-2024-10-29-10-00-44-321.png > > > Issue-1. PostgresIncrementalSource is not exiting the Flink execution when > StartupOptions is snapshot. > > Postgres cdc module is using HybridSplitAssigner for batch scan and is > [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128] > still trying to create streaming split when snapshot splits are completed. > > Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is > creating multiple replication slots. > > postgres logs: > {code:java} > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: logical > decoding found consistent point at 0/1690B28 > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: exported > logical decoding snapshot: "00000006-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: exported > logical decoding snapshot: "00000008-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.267 UTC [52] LOG: exported > logical decoding snapshot: "00000009-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690BF8. > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] ERROR: replication > slot "flinkpostgres_1" does not exist > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_1') > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690C30. > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_0') > flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_0') {code} > Setup: > > flink-cdc version : 3.2.0 > flink version: 1.19 > Steps to reproduce the issue: > > 1. main code: > {code:java} > DebeziumDeserializationSchema<String> deserializer = > new JsonDebeziumDeserializationSchema(); > JdbcIncrementalSource<String> postgresIncrementalSource = > > PostgresSourceBuilder.PostgresIncrementalSource.<String>builder() > .startupOptions(StartupOptions.snapshot()) > .hostname("localhost") > .port(5432) > .database("test") > .schemaList("public") > .username("postgres") > .password("postgres") > .slotName("flinkpostgres") > .decodingPluginName("pgoutput") > .deserializer(deserializer) > // .splitSize(2) > .build(); > > Configuration config = new Configuration(); > config.set(RestartStrategyOptions.RESTART_STRATEGY, "none"); > config.setString("heartbeat.interval", "6000000"); // 100 minutes > config.setString("heartbeat.timeout", "18000000"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(config); > env.enableCheckpointing(300000); > env.fromSource( > postgresIncrementalSource, > WatermarkStrategy.noWatermarks(), > "PostgresParallelSource") > .setParallelism(2) > .print(); > env.execute("Output Postgres Snapshot"); {code} > 2. Create two tables and records in postgres > 3. Run step#1 code. -- This message was sent by Atlassian Jira (v8.20.10#820010)