lvyanquan commented on code in PR #3968:
URL: https://github.com/apache/flink-cdc/pull/3968#discussion_r2097720401


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+    public static final ConfigOption<String> TABLES =

Review Comment:
   Add a new line between these two ConfigOption.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("decoderbufs")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<DebeziumChangelogMode> CHANGELOG_MODE =

Review Comment:
   Unused ConfigOption.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("decoderbufs")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<DebeziumChangelogMode> CHANGELOG_MODE =
+            ConfigOptions.key("changelog-mode")
+                    .enumType(DebeziumChangelogMode.class)
+                    .defaultValue(DebeziumChangelogMode.ALL)
+                    .withDescription(
+                            "The changelog mode used for encoding streaming 
changes.\n"
+                                    + "\"all\": Encodes changes as retract 
stream using all RowKinds. This is the default mode.\n"
+                                    + "\"upsert\": Encodes changes as upsert 
stream that describes idempotent updates on a key. It can be used for tables 
with primary keys when replica identity FULL is not an option.");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+            ConfigOptions.key("scan.incremental.snapshot.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Incremental snapshot is a new mechanism to read 
snapshot of a table. "
+                                    + "Compared to the old snapshot mechanism, 
the incremental snapshot has many advantages, including:\n"
+                                    + "(1) source can be parallel during 
snapshot reading, \n"
+                                    + "(2) source can perform checkpoints in 
the chunk granularity during snapshot reading, \n"
+                                    + "(3) source doesn't need to acquire 
global read lock before snapshot reading.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<String> 
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
+            ConfigOptions.key("scan.startup.specific-offset.gtid-set")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional GTID set used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-events")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional number of events to skip after the 
specific starting offset");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS =

Review Comment:
   Unused ConfigOption.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("decoderbufs")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<DebeziumChangelogMode> CHANGELOG_MODE =
+            ConfigOptions.key("changelog-mode")
+                    .enumType(DebeziumChangelogMode.class)
+                    .defaultValue(DebeziumChangelogMode.ALL)
+                    .withDescription(
+                            "The changelog mode used for encoding streaming 
changes.\n"
+                                    + "\"all\": Encodes changes as retract 
stream using all RowKinds. This is the default mode.\n"
+                                    + "\"upsert\": Encodes changes as upsert 
stream that describes idempotent updates on a key. It can be used for tables 
with primary keys when replica identity FULL is not an option.");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+            ConfigOptions.key("scan.incremental.snapshot.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Incremental snapshot is a new mechanism to read 
snapshot of a table. "
+                                    + "Compared to the old snapshot mechanism, 
the incremental snapshot has many advantages, including:\n"
+                                    + "(1) source can be parallel during 
snapshot reading, \n"
+                                    + "(2) source can perform checkpoints in 
the chunk granularity during snapshot reading, \n"
+                                    + "(3) source doesn't need to acquire 
global read lock before snapshot reading.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<String> 
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
+            ConfigOptions.key("scan.startup.specific-offset.gtid-set")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional GTID set used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-events")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional number of events to skip after the 
specific starting offset");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-rows")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("Optional number of rows to skip after 
the specific offset");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.startup.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"timestamp\" 
startup mode");
+
+    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+            ConfigOptions.key("heartbeat.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Optional interval of sending heartbeat event for 
tracing the latest available binlog offsets");
+
+    public static final org.apache.flink.configuration.ConfigOption<Double>
+            SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+                    org.apache.flink.configuration.ConfigOptions.key(
+                                    
"chunk-key.even-distribution.factor.upper-bound")
+                            .doubleType()
+                            .defaultValue(1000.0d)
+                            
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
+                            .withDescription(
+                                    "The upper bound of chunk key distribution 
factor. The distribution factor is used to determine whether the"
+                                            + " table is evenly distribution 
or not."
+                                            + " The table chunks would use 
evenly calculation optimization when the data distribution is even,"
+                                            + " and the query for splitting 
would happen when it is uneven."
+                                            + " The distribution factor could 
be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    public static final org.apache.flink.configuration.ConfigOption<Double>

Review Comment:
   org.apache.flink.configuration.ConfigOption => ConfigOption



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("decoderbufs")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<DebeziumChangelogMode> CHANGELOG_MODE =
+            ConfigOptions.key("changelog-mode")
+                    .enumType(DebeziumChangelogMode.class)
+                    .defaultValue(DebeziumChangelogMode.ALL)
+                    .withDescription(
+                            "The changelog mode used for encoding streaming 
changes.\n"
+                                    + "\"all\": Encodes changes as retract 
stream using all RowKinds. This is the default mode.\n"
+                                    + "\"upsert\": Encodes changes as upsert 
stream that describes idempotent updates on a key. It can be used for tables 
with primary keys when replica identity FULL is not an option.");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+            ConfigOptions.key("scan.incremental.snapshot.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Incremental snapshot is a new mechanism to read 
snapshot of a table. "
+                                    + "Compared to the old snapshot mechanism, 
the incremental snapshot has many advantages, including:\n"
+                                    + "(1) source can be parallel during 
snapshot reading, \n"
+                                    + "(2) source can perform checkpoints in 
the chunk granularity during snapshot reading, \n"
+                                    + "(3) source doesn't need to acquire 
global read lock before snapshot reading.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<String> 
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =

Review Comment:
   Unused ConfigOption.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("decoderbufs")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<DebeziumChangelogMode> CHANGELOG_MODE =
+            ConfigOptions.key("changelog-mode")
+                    .enumType(DebeziumChangelogMode.class)
+                    .defaultValue(DebeziumChangelogMode.ALL)
+                    .withDescription(
+                            "The changelog mode used for encoding streaming 
changes.\n"
+                                    + "\"all\": Encodes changes as retract 
stream using all RowKinds. This is the default mode.\n"
+                                    + "\"upsert\": Encodes changes as upsert 
stream that describes idempotent updates on a key. It can be used for tables 
with primary keys when replica identity FULL is not an option.");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ENABLED =

Review Comment:
   Unused ConfigOption.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
+import 
org.apache.flink.cdc.connectors.postgres.factory.PostgresDataSourceFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
+
+/** Integration tests for Postgres source. */
+public class PostgresPipelineITCaseTest extends PostgresTestBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresPipelineITCaseTest.class);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(
+                    POSTGRES_CONTAINER, "inventory", "inventory", TEST_USER, 
TEST_PASSWORD);
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    @BeforeClass
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @AfterClass
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        POSTGRES_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
+    @Before
+    public void before() {
+        TestValuesTableFactory.clearAllData();
+        env.setParallelism(4);
+        env.enableCheckpointing(2000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+    }
+
+    @Test
+    public void testInitialStartupMode() throws Exception {
+        inventoryDatabase.createAndInitialize();
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGRES_CONTAINER.getHost())
+                                
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(inventoryDatabase.getDatabaseName())
+                                .tableList("inventory.products")
+                                .startupOptions(StartupOptions.initial())
+                                .serverTimeZone("UTC")
+                                
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
+        configFactory.database(inventoryDatabase.getDatabaseName());
+        configFactory.slotName(getSlotName());
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                PostgresDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        TableId tableId = TableId.tableId("inventory", "products");
+        CreateTableEvent createTableEvent = 
getProductsCreateTableEvent(tableId);
+
+        // generate snapshot data
+        List<Event> expectedSnapshot = getSnapshotExpected(tableId);
+
+        // In this configuration, several subtasks might emit their 
corresponding CreateTableEvent
+        // to downstream. Since it is not possible to predict how many 
CreateTableEvents should we
+        // expect, we simply filter them out from expected sets, and assert 
there's at least one.
+        List<Event> actual = fetchResultsExcept(events, 
expectedSnapshot.size(), createTableEvent);
+        assertThat(actual.subList(0, expectedSnapshot.size()))
+                .containsExactlyInAnyOrder(expectedSnapshot.toArray(new 
Event[0]));
+    }
+
+    private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, 
T sideEvent) {
+        List<T> result = new ArrayList<>(size);
+        List<T> sideResults = new ArrayList<>();
+        while (size > 0 && iter.hasNext()) {
+            T event = iter.next();
+            if (!event.equals(sideEvent)) {
+                result.add(event);
+                size--;
+            } else {
+                sideResults.add(sideEvent);
+            }
+        }
+        // Also ensure we've received at least one or many side events.
+        assertThat(sideResults).isNotEmpty();
+        return result;
+    }
+
+    private List<Event> getSnapshotExpected(TableId tableId) {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(255).notNull(),
+                            DataTypes.VARCHAR(512),
+                            DataTypes.FLOAT()

Review Comment:
   It's better to add a test for all supported that described in 
PostgresTypeUtils.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
+import 
org.apache.flink.cdc.connectors.postgres.factory.PostgresDataSourceFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static 
org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
+
+/** Integration tests for Postgres source. */
+public class PostgresPipelineITCaseTest extends PostgresTestBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresPipelineITCaseTest.class);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(
+                    POSTGRES_CONTAINER, "inventory", "inventory", TEST_USER, 
TEST_PASSWORD);
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    @BeforeClass
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @AfterClass
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        POSTGRES_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
+    @Before
+    public void before() {
+        TestValuesTableFactory.clearAllData();
+        env.setParallelism(4);
+        env.enableCheckpointing(2000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+    }
+
+    @Test
+    public void testInitialStartupMode() throws Exception {
+        inventoryDatabase.createAndInitialize();
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGRES_CONTAINER.getHost())
+                                
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(inventoryDatabase.getDatabaseName())
+                                .tableList("inventory.products")
+                                .startupOptions(StartupOptions.initial())
+                                .serverTimeZone("UTC")
+                                
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
+        configFactory.database(inventoryDatabase.getDatabaseName());
+        configFactory.slotName(getSlotName());
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                PostgresDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        TableId tableId = TableId.tableId("inventory", "products");
+        CreateTableEvent createTableEvent = 
getProductsCreateTableEvent(tableId);
+
+        // generate snapshot data
+        List<Event> expectedSnapshot = getSnapshotExpected(tableId);
+
+        // In this configuration, several subtasks might emit their 
corresponding CreateTableEvent
+        // to downstream. Since it is not possible to predict how many 
CreateTableEvents should we
+        // expect, we simply filter them out from expected sets, and assert 
there's at least one.
+        List<Event> actual = fetchResultsExcept(events, 
expectedSnapshot.size(), createTableEvent);
+        assertThat(actual.subList(0, expectedSnapshot.size()))
+                .containsExactlyInAnyOrder(expectedSnapshot.toArray(new 
Event[0]));
+    }
+
+    private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, 
T sideEvent) {
+        List<T> result = new ArrayList<>(size);
+        List<T> sideResults = new ArrayList<>();
+        while (size > 0 && iter.hasNext()) {
+            T event = iter.next();
+            if (!event.equals(sideEvent)) {
+                result.add(event);
+                size--;
+            } else {
+                sideResults.add(sideEvent);
+            }
+        }
+        // Also ensure we've received at least one or many side events.
+        assertThat(sideResults).isNotEmpty();
+        return result;
+    }
+
+    private List<Event> getSnapshotExpected(TableId tableId) {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(255).notNull(),
+                            DataTypes.VARCHAR(512),
+                            DataTypes.FLOAT()

Review Comment:
   It's better to add a test for Whole database synchronization or multi-tables 
synchronization too.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("decoderbufs")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<DebeziumChangelogMode> CHANGELOG_MODE =
+            ConfigOptions.key("changelog-mode")
+                    .enumType(DebeziumChangelogMode.class)
+                    .defaultValue(DebeziumChangelogMode.ALL)
+                    .withDescription(
+                            "The changelog mode used for encoding streaming 
changes.\n"
+                                    + "\"all\": Encodes changes as retract 
stream using all RowKinds. This is the default mode.\n"
+                                    + "\"upsert\": Encodes changes as upsert 
stream that describes idempotent updates on a key. It can be used for tables 
with primary keys when replica identity FULL is not an option.");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+            ConfigOptions.key("scan.incremental.snapshot.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Incremental snapshot is a new mechanism to read 
snapshot of a table. "
+                                    + "Compared to the old snapshot mechanism, 
the incremental snapshot has many advantages, including:\n"
+                                    + "(1) source can be parallel during 
snapshot reading, \n"
+                                    + "(2) source can perform checkpoints in 
the chunk granularity during snapshot reading, \n"
+                                    + "(3) source doesn't need to acquire 
global read lock before snapshot reading.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =

Review Comment:
   Unused ConfigOption.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("decoderbufs")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<DebeziumChangelogMode> CHANGELOG_MODE =
+            ConfigOptions.key("changelog-mode")
+                    .enumType(DebeziumChangelogMode.class)
+                    .defaultValue(DebeziumChangelogMode.ALL)
+                    .withDescription(
+                            "The changelog mode used for encoding streaming 
changes.\n"
+                                    + "\"all\": Encodes changes as retract 
stream using all RowKinds. This is the default mode.\n"
+                                    + "\"upsert\": Encodes changes as upsert 
stream that describes idempotent updates on a key. It can be used for tables 
with primary keys when replica identity FULL is not an option.");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_ENABLED =
+            ConfigOptions.key("scan.incremental.snapshot.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Incremental snapshot is a new mechanism to read 
snapshot of a table. "
+                                    + "Compared to the old snapshot mechanism, 
the incremental snapshot has many advantages, including:\n"
+                                    + "(1) source can be parallel during 
snapshot reading, \n"
+                                    + "(2) source can perform checkpoints in 
the chunk granularity during snapshot reading, \n"
+                                    + "(3) source doesn't need to acquire 
global read lock before snapshot reading.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<String> 
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
+            ConfigOptions.key("scan.startup.specific-offset.gtid-set")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional GTID set used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =

Review Comment:
   Unused ConfigOption.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to