[ https://issues.apache.org/jira/browse/FLINK-36584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17892678#comment-17892678 ]
Hongshun Wang commented on FLINK-36584: --------------------------------------- # Issue-1: yes, you can see the argue of snapshot mode design: [https://github.com/apache/flink-cdc/issues/2867.] "If we only include snapshot split , cannot get a consistency snapshot at any point, unless any operation is not allowed during snapshot splits read. For example, we have three snapshot splits. During the first split and third split, some update operations are applied. When read third snapshot split, it can read these operations to it, but the first splits cannot.{*}Make an extreme assumption: after the first split finished, we stop the job, then restart it from the savepoint in tomorrow. When the job finished, we get partial data in first split which is today's version , while get partial data in third split which is tomorrow's version."{*} # Issue-2: because we need to read backfill log which needed a slot, and it will be removed if snapshot split is finished. You can also skip backfill log which option `scan.incremental.snapshot.backfill.skip'`. > PostgresIncrementalSource is not exiting the flink execution when > StartupOptions is snapshot and create multiple replication slots > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-36584 > URL: https://issues.apache.org/jira/browse/FLINK-36584 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: 3.0.0 > Reporter: Anil Dasari > Priority: Major > > Issue-1. PostgresIncrementalSource is not exiting the Flink execution when > StartupOptions is snapshot. > > Postgres cdc module is using HybridSplitAssigner for batch scan and is > [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128] > still trying to create streaming split when snapshot splits are completed. > > Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is > creating multiple replication slots. > > postgres logs: > {code:java} > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: logical > decoding found consistent point at 0/1690B28 > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: exported > logical decoding snapshot: "00000006-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: exported > logical decoding snapshot: "00000008-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.267 UTC [52] LOG: exported > logical decoding snapshot: "00000009-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690BF8. > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] ERROR: replication > slot "flinkpostgres_1" does not exist > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_1') > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690C30. > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_0') > flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_0') {code} > Setup: > > flink-cdc version : 3.2.0 > flink version: 1.19 > Steps to reproduce the issue: > > 1. main code: > {code:java} > DebeziumDeserializationSchema<String> deserializer = > new JsonDebeziumDeserializationSchema(); > JdbcIncrementalSource<String> postgresIncrementalSource = > > PostgresSourceBuilder.PostgresIncrementalSource.<String>builder() > .startupOptions(StartupOptions.snapshot()) > .hostname("localhost") > .port(5432) > .database("test") > .schemaList("public") > .username("postgres") > .password("postgres") > .slotName("flinkpostgres") > .decodingPluginName("pgoutput") > .deserializer(deserializer) > // .splitSize(2) > .build(); > > Configuration config = new Configuration(); > config.set(RestartStrategyOptions.RESTART_STRATEGY, "none"); > config.setString("heartbeat.interval", "6000000"); // 100 minutes > config.setString("heartbeat.timeout", "18000000"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(config); > env.enableCheckpointing(300000); > env.fromSource( > postgresIncrementalSource, > WatermarkStrategy.noWatermarks(), > "PostgresParallelSource") > .setParallelism(2) > .print(); > env.execute("Output Postgres Snapshot"); {code} > 2. Create two tables and records in postgres > 3. Run step#1 code. -- This message was sent by Atlassian Jira (v8.20.10#820010)