loserwang1024 commented on code in PR #3950:
URL: https://github.com/apache/flink-cdc/pull/3950#discussion_r2059684280


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java:
##########
@@ -305,6 +305,93 @@ void testStartupFromLatestOffset(boolean 
parallelismSnapshot) throws Exception {
         result.getJobClient().get().cancel().get();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true})
+    public void testStartupFromCommittedOffset(boolean parallelismSnapshot) 
throws Exception {
+        setup(true);
+        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'first','first description',0.1);");
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'second','second description',0.2);");
+        }
+
+        // newly create slot's confirmed lsn is latest. We will test whether 
committed mode starts
+        // from here.
+        String slotName = getSlotName();
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            // TODO: Remove it after adding publication to an existing 
replication slot.
+            statement.execute("CREATE PUBLICATION dbz_publication FOR TABLE 
inventory.products");
+            statement.execute(
+                    String.format(
+                            "select 
pg_create_logical_replication_slot('%s','pgoutput');",
+                            slotName));
+        }
+
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'thirth','thirth description',0.1);");
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'forth','forth description',0.2);");
+        }
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE debezium_source ("
+                                + " id INT NOT NULL,"
+                                + " name STRING,"
+                                + " description STRING,"
+                                + " weight DECIMAL(10,3),"
+                                + " PRIMARY KEY (id) NOT ENFORCED"
+                                + ") WITH ("
+                                + " 'connector' = 'postgres-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'schema-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'true',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "
+                                + " 'slot.name' = '%s',"
+                                + " 'scan.lsn-commit.checkpoints-num-delay' = 
'0',"
+                                + " 'scan.startup.mode' = 'committed-offset'"
+                                + ")",
+                        POSTGRES_CONTAINER.getHost(),
+                        POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+                        POSTGRES_CONTAINER.getUsername(),
+                        POSTGRES_CONTAINER.getPassword(),
+                        POSTGRES_CONTAINER.getDatabaseName(),
+                        "inventory",
+                        "products",
+                        slotName);

Review Comment:
   ```suggestion
                            publicName,
                           slotName);
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java:
##########
@@ -305,6 +305,93 @@ void testStartupFromLatestOffset(boolean 
parallelismSnapshot) throws Exception {
         result.getJobClient().get().cancel().get();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true})
+    public void testStartupFromCommittedOffset(boolean parallelismSnapshot) 
throws Exception {
+        setup(true);
+        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'first','first description',0.1);");
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'second','second description',0.2);");
+        }
+
+        // newly create slot's confirmed lsn is latest. We will test whether 
committed mode starts
+        // from here.
+        String slotName = getSlotName();
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            // TODO: Remove it after adding publication to an existing 
replication slot.
+            statement.execute("CREATE PUBLICATION dbz_publication FOR TABLE 
inventory.products");

Review Comment:
   ```suggestion
                 String slotName = getSlotName();
           String publicName = "dbz_publication_" + new Random().nextInt(1000);
           try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); 
Statement statement = connection.createStatement()) {
               // TODO: Remove it after adding publication to an existing 
replication slot.
               statement.execute(String.format("CREATE PUBLICATION %s FOR TABLE 
inventory.products;", publicName));
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java:
##########
@@ -305,6 +305,93 @@ void testStartupFromLatestOffset(boolean 
parallelismSnapshot) throws Exception {
         result.getJobClient().get().cancel().get();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true})
+    public void testStartupFromCommittedOffset(boolean parallelismSnapshot) 
throws Exception {
+        setup(true);
+        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'first','first description',0.1);");
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'second','second description',0.2);");
+        }
+
+        // newly create slot's confirmed lsn is latest. We will test whether 
committed mode starts
+        // from here.
+        String slotName = getSlotName();
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            // TODO: Remove it after adding publication to an existing 
replication slot.
+            statement.execute("CREATE PUBLICATION dbz_publication FOR TABLE 
inventory.products");
+            statement.execute(
+                    String.format(
+                            "select 
pg_create_logical_replication_slot('%s','pgoutput');",
+                            slotName));
+        }
+
+        try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'thirth','thirth description',0.1);");
+            statement.execute(
+                    "INSERT INTO inventory.products VALUES 
(default,'forth','forth description',0.2);");
+        }
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE debezium_source ("
+                                + " id INT NOT NULL,"
+                                + " name STRING,"
+                                + " description STRING,"
+                                + " weight DECIMAL(10,3),"
+                                + " PRIMARY KEY (id) NOT ENFORCED"
+                                + ") WITH ("
+                                + " 'connector' = 'postgres-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'schema-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'true',"
+                                + " 'decoding.plugin.name' = 'pgoutput', "

Review Comment:
   ```suggestion
                                + " 'decoding.plugin.name' = 'pgoutput', "
                                  + " 'debezium.publication.name'  = '%s', "
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to