[ 
https://issues.apache.org/jira/browse/FLINK-36584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893673#comment-17893673
 ] 

Anil Dasari commented on FLINK-36584:
-------------------------------------

> Yes, when skip backfill, the snapshot spot only read the current state, and 
>do not read log. And the stream split will begin from the begging offset.Thus, 
>it can only provide at-least once semantics. 

Is there a reason for creating streamsplit when startup mode is snapshot only ? 
I am yet to check if streamsplit is executed when start up mode is snapshot 
only. 

> PostgresIncrementalSource is not exiting the flink execution when 
> StartupOptions is snapshot and create multiple replication slots
> ----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36584
>                 URL: https://issues.apache.org/jira/browse/FLINK-36584
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 3.0.0
>            Reporter: Anil Dasari
>            Priority: Major
>         Attachments: image-2024-10-29-10-00-44-321.png
>
>
> Issue-1. PostgresIncrementalSource is not exiting the Flink execution when 
> StartupOptions is snapshot.
>  
> Postgres cdc module is using HybridSplitAssigner for batch scan and is 
> [https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128]
>  still trying to create streaming split when snapshot splits are completed. 
>  
> Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is 
> creating multiple replication slots.
>  
> postgres logs: 
> {code:java}
> flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] LOG:  logical 
> decoding found consistent point at 0/1690B28
> flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] LOG:  exported 
> logical decoding snapshot: "00000006-00000003-1" with 0 transaction IDs
> flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] LOG:  logical 
> decoding found consistent point at 0/1690BF8
> flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] LOG:  exported 
> logical decoding snapshot: "00000008-00000003-1" with 0 transaction IDs
> flink-postgres-1     | 2024-10-22 18:56:58.266 UTC [52] LOG:  logical 
> decoding found consistent point at 0/1690C30
> flink-postgres-1     | 2024-10-22 18:56:58.266 UTC [52] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:58.267 UTC [52] LOG:  exported 
> logical decoding snapshot: "00000009-00000003-1" with 0 transaction IDs
> flink-postgres-1     | 2024-10-22 18:56:58.612 UTC [51] LOG:  starting 
> logical decoding for slot "flinkpostgres_0"
> flink-postgres-1     | 2024-10-22 18:56:58.612 UTC [51] DETAIL:  Streaming 
> transactions committing after 0/1690C30, reading WAL from 0/1690BF8.
> flink-postgres-1     | 2024-10-22 18:56:58.614 UTC [51] LOG:  logical 
> decoding found consistent point at 0/1690BF8
> flink-postgres-1     | 2024-10-22 18:56:58.614 UTC [51] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:58.753 UTC [56] ERROR:  replication 
> slot "flinkpostgres_1" does not exist
> flink-postgres-1     | 2024-10-22 18:56:58.753 UTC [56] STATEMENT:  select 
> pg_drop_replication_slot('flinkpostgres_1')
> flink-postgres-1     | 2024-10-22 18:56:59.347 UTC [57] LOG:  starting 
> logical decoding for slot "flinkpostgres_0"
> flink-postgres-1     | 2024-10-22 18:56:59.347 UTC [57] DETAIL:  Streaming 
> transactions committing after 0/1690C30, reading WAL from 0/1690C30.
> flink-postgres-1     | 2024-10-22 18:56:59.348 UTC [57] LOG:  logical 
> decoding found consistent point at 0/1690C30
> flink-postgres-1     | 2024-10-22 18:56:59.348 UTC [57] DETAIL:  There are no 
> running transactions.
> flink-postgres-1     | 2024-10-22 18:56:59.423 UTC [59] ERROR:  replication 
> slot "flinkpostgres_0" does not exist
> flink-postgres-1     | 2024-10-22 18:56:59.423 UTC [59] STATEMENT:  select 
> pg_drop_replication_slot('flinkpostgres_0')
> flink-postgres-1     | 2024-10-22 18:56:59.673 UTC [60] ERROR:  replication 
> slot "flinkpostgres_0" does not exist
> flink-postgres-1     | 2024-10-22 18:56:59.673 UTC [60] STATEMENT:  select 
> pg_drop_replication_slot('flinkpostgres_0') {code}
> Setup:
>  
> flink-cdc version : 3.2.0 
> flink version: 1.19
> Steps to reproduce the issue:
>  
> 1. main code: 
> {code:java}
> DebeziumDeserializationSchema<String> deserializer =
>                 new JsonDebeziumDeserializationSchema();
>         JdbcIncrementalSource<String> postgresIncrementalSource =
>                 
> PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
>                         .startupOptions(StartupOptions.snapshot())
>                         .hostname("localhost")
>                         .port(5432)
>                         .database("test")
>                         .schemaList("public")
>                         .username("postgres")
>                         .password("postgres")
>                         .slotName("flinkpostgres")
>                         .decodingPluginName("pgoutput")
>                         .deserializer(deserializer)
> //                        .splitSize(2)
>                         .build();
>        
>         Configuration config = new Configuration();
>         config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
>         config.setString("heartbeat.interval", "6000000"); // 100 minutes
>         config.setString("heartbeat.timeout", "18000000");
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(config);
>         env.enableCheckpointing(300000);
>         env.fromSource(
>                         postgresIncrementalSource,
>                         WatermarkStrategy.noWatermarks(),
>                         "PostgresParallelSource")
>                 .setParallelism(2)
>                 .print();
>         env.execute("Output Postgres Snapshot"); {code}
> 2. Create two tables and records in postgres
> 3. Run step#1 code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to