[ https://issues.apache.org/jira/browse/FLINK-36584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893043#comment-17893043 ]
Hongshun Wang commented on FLINK-36584: --------------------------------------- [~dasarianil] , the design is like [https://netflixtechblog.com/dblog-a-generic-change-data-capture-framework-69351fb9099b.] # At first, we split the table with a key(often primary key) into multiple chunks, which named snapshot split in flink cdc. # Then we can assign these snapshot splits to readers in parallelism. For each snapshot split which chunk key is between key_start and key_end: ## mark current log position, which named low_watermark. ## read the chunk data by jdbc (select * from xxx where chunk_key > key_start and chunk_key <= key_end) into memory as an image. ## mark current log position, which named high_watermark. ## Then we apply log bewteen low_watermark and high_watermark(also with chunk_key between key_start and key_end) into chunk image and then emit all the chunk data to downstream. {*}Now, the data is exactly an image at the point of high_watermark{*}. ## When an snapshot split is finished, the enumerator will record its high_watermark. # When streamsplit is read, it will find the chunk's high_watermark depedends on chunk key, and only read data after its *high_watermark.* ??*streamsplit here is the records between lower LSN (pre split data snapshot) and higher LSN (post split data snapshot) when not the same*?? Yes, steam split started from the lowest of all the data snapshot spilt's high_watermark because the log before it its no need for each chunks. However, the log after the lowest LSN may have not been read for some chunks, while may have been read for other chunks. > PostgresIncrementalSource is not exiting the flink execution when > StartupOptions is snapshot and create multiple replication slots > ---------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-36584 > URL: https://issues.apache.org/jira/browse/FLINK-36584 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: 3.0.0 > Reporter: Anil Dasari > Priority: Major > > Issue-1. PostgresIncrementalSource is not exiting the Flink execution when > StartupOptions is snapshot. > > Postgres cdc module is using HybridSplitAssigner for batch scan and is > [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128] > still trying to create streaming split when snapshot splits are completed. > > Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is > creating multiple replication slots. > > postgres logs: > {code:java} > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: logical > decoding found consistent point at 0/1690B28 > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:57.649 UTC [48] LOG: exported > logical decoding snapshot: "00000006-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.226 UTC [51] LOG: exported > logical decoding snapshot: "00000008-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:58.266 UTC [52] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.267 UTC [52] LOG: exported > logical decoding snapshot: "00000009-00000003-1" with 0 transaction IDs > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:58.612 UTC [51] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690BF8. > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] LOG: logical > decoding found consistent point at 0/1690BF8 > flink-postgres-1 | 2024-10-22 18:56:58.614 UTC [51] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] ERROR: replication > slot "flinkpostgres_1" does not exist > flink-postgres-1 | 2024-10-22 18:56:58.753 UTC [56] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_1') > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] LOG: starting > logical decoding for slot "flinkpostgres_0" > flink-postgres-1 | 2024-10-22 18:56:59.347 UTC [57] DETAIL: Streaming > transactions committing after 0/1690C30, reading WAL from 0/1690C30. > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] LOG: logical > decoding found consistent point at 0/1690C30 > flink-postgres-1 | 2024-10-22 18:56:59.348 UTC [57] DETAIL: There are no > running transactions. > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.423 UTC [59] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_0') > flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] ERROR: replication > slot "flinkpostgres_0" does not exist > flink-postgres-1 | 2024-10-22 18:56:59.673 UTC [60] STATEMENT: select > pg_drop_replication_slot('flinkpostgres_0') {code} > Setup: > > flink-cdc version : 3.2.0 > flink version: 1.19 > Steps to reproduce the issue: > > 1. main code: > {code:java} > DebeziumDeserializationSchema<String> deserializer = > new JsonDebeziumDeserializationSchema(); > JdbcIncrementalSource<String> postgresIncrementalSource = > > PostgresSourceBuilder.PostgresIncrementalSource.<String>builder() > .startupOptions(StartupOptions.snapshot()) > .hostname("localhost") > .port(5432) > .database("test") > .schemaList("public") > .username("postgres") > .password("postgres") > .slotName("flinkpostgres") > .decodingPluginName("pgoutput") > .deserializer(deserializer) > // .splitSize(2) > .build(); > > Configuration config = new Configuration(); > config.set(RestartStrategyOptions.RESTART_STRATEGY, "none"); > config.setString("heartbeat.interval", "6000000"); // 100 minutes > config.setString("heartbeat.timeout", "18000000"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(config); > env.enableCheckpointing(300000); > env.fromSource( > postgresIncrementalSource, > WatermarkStrategy.noWatermarks(), > "PostgresParallelSource") > .setParallelism(2) > .print(); > env.execute("Output Postgres Snapshot"); {code} > 2. Create two tables and records in postgres > 3. Run step#1 code. -- This message was sent by Atlassian Jira (v8.20.10#820010)