Anil Dasari created FLINK-36584:
-----------------------------------

             Summary: PostgresIncrementalSource is not exiting the flink 
execution when StartupOptions is snapshot and create multiple replication slots
                 Key: FLINK-36584
                 URL: https://issues.apache.org/jira/browse/FLINK-36584
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: 3.0.0
            Reporter: Anil Dasari


Issue-1. PostgresIncrementalSource is not exiting the Flink execution when 
StartupOptions is snapshot.
 
Postgres cdc module is using HybridSplitAssigner for batch scan and is 
[https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java#L128]
 still trying to create streaming split when snapshot splits are completed. 
 
Issue-2. When source parallelism is > 1 i.e 2, PostgresIncrementalSource is 
creating multiple replication slots.
 
postgres logs: 
{code:java}
flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] LOG:  logical decoding 
found consistent point at 0/1690B28
flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] DETAIL:  There are no 
running transactions.
flink-postgres-1     | 2024-10-22 18:56:57.649 UTC [48] LOG:  exported logical 
decoding snapshot: "00000006-00000003-1" with 0 transaction IDs
flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] LOG:  logical decoding 
found consistent point at 0/1690BF8
flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] DETAIL:  There are no 
running transactions.
flink-postgres-1     | 2024-10-22 18:56:58.226 UTC [51] LOG:  exported logical 
decoding snapshot: "00000008-00000003-1" with 0 transaction IDs
flink-postgres-1     | 2024-10-22 18:56:58.266 UTC [52] LOG:  logical decoding 
found consistent point at 0/1690C30
flink-postgres-1     | 2024-10-22 18:56:58.266 UTC [52] DETAIL:  There are no 
running transactions.
flink-postgres-1     | 2024-10-22 18:56:58.267 UTC [52] LOG:  exported logical 
decoding snapshot: "00000009-00000003-1" with 0 transaction IDs
flink-postgres-1     | 2024-10-22 18:56:58.612 UTC [51] LOG:  starting logical 
decoding for slot "flinkpostgres_0"
flink-postgres-1     | 2024-10-22 18:56:58.612 UTC [51] DETAIL:  Streaming 
transactions committing after 0/1690C30, reading WAL from 0/1690BF8.
flink-postgres-1     | 2024-10-22 18:56:58.614 UTC [51] LOG:  logical decoding 
found consistent point at 0/1690BF8
flink-postgres-1     | 2024-10-22 18:56:58.614 UTC [51] DETAIL:  There are no 
running transactions.
flink-postgres-1     | 2024-10-22 18:56:58.753 UTC [56] ERROR:  replication 
slot "flinkpostgres_1" does not exist
flink-postgres-1     | 2024-10-22 18:56:58.753 UTC [56] STATEMENT:  select 
pg_drop_replication_slot('flinkpostgres_1')
flink-postgres-1     | 2024-10-22 18:56:59.347 UTC [57] LOG:  starting logical 
decoding for slot "flinkpostgres_0"
flink-postgres-1     | 2024-10-22 18:56:59.347 UTC [57] DETAIL:  Streaming 
transactions committing after 0/1690C30, reading WAL from 0/1690C30.
flink-postgres-1     | 2024-10-22 18:56:59.348 UTC [57] LOG:  logical decoding 
found consistent point at 0/1690C30
flink-postgres-1     | 2024-10-22 18:56:59.348 UTC [57] DETAIL:  There are no 
running transactions.
flink-postgres-1     | 2024-10-22 18:56:59.423 UTC [59] ERROR:  replication 
slot "flinkpostgres_0" does not exist
flink-postgres-1     | 2024-10-22 18:56:59.423 UTC [59] STATEMENT:  select 
pg_drop_replication_slot('flinkpostgres_0')
flink-postgres-1     | 2024-10-22 18:56:59.673 UTC [60] ERROR:  replication 
slot "flinkpostgres_0" does not exist
flink-postgres-1     | 2024-10-22 18:56:59.673 UTC [60] STATEMENT:  select 
pg_drop_replication_slot('flinkpostgres_0') {code}
Setup:
 
flink-cdc version : 3.2.0 
flink version: 1.19
Steps to reproduce the issue:
 
1. main code: 
{code:java}
DebeziumDeserializationSchema<String> deserializer =
                new JsonDebeziumDeserializationSchema();

        JdbcIncrementalSource<String> postgresIncrementalSource =
                
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
                        .startupOptions(StartupOptions.snapshot())
                        .hostname("localhost")
                        .port(5432)
                        .database("test")
                        .schemaList("public")
                        .username("postgres")
                        .password("postgres")
                        .slotName("flinkpostgres")
                        .decodingPluginName("pgoutput")
                        .deserializer(deserializer)
//                        .splitSize(2)
                        .build();

       
        Configuration config = new Configuration();
        config.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
        config.setString("heartbeat.interval", "6000000"); // 100 minutes
        config.setString("heartbeat.timeout", "18000000");

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
        env.enableCheckpointing(300000);

        env.fromSource(
                        postgresIncrementalSource,
                        WatermarkStrategy.noWatermarks(),
                        "PostgresParallelSource")
                .setParallelism(2)
                .print();

        env.execute("Output Postgres Snapshot"); {code}
2. Create two tables and records in postgres
3. Run step#1 code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to