loserwang1024 commented on code in PR #3950:
URL: https://github.com/apache/flink-cdc/pull/3950#discussion_r2052058018


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java:
##########
@@ -261,6 +261,110 @@ void testStartupFromLatestOffset(boolean 
parallelismSnapshot) throws Exception {
         result.getJobClient().get().cancel().get();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true})
+    public void testStartupFromCommittedOffset(boolean parallelismSnapshot) 
throws Exception {
+        setup(parallelismSnapshot);
+        initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+

Review Comment:
   How this test show commit offset? Maybe we can :
   1. Create a slot in advince.
   2.  Insert 2 data into database.
   3.  commit use PostgresStreamFetchTask#commitCurrentOffset as latest offset
   4. Insert another 2 data into database.
   
   Then start the flink job with existed slot in commit-offset mode.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnectionUtils.java:
##########
@@ -0,0 +1,55 @@
+package io.debezium.connector.postgresql.connection;
+
+import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.postgresql.SourceInfo;
+import io.debezium.connector.postgresql.Utils;
+import io.debezium.time.Conversions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PostgresConnectionUtils {

Review Comment:
   @phamvinh1712 Move it to io.debezium.connector.postgresql.Utils? It already 
has io.debezium.connector.postgresql.Utils#currentOffset, some logical is same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to