lvyanquan commented on code in PR #3968:
URL: https://github.com/apache/flink-cdc/pull/3968#discussion_r2136976198


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("pgoutput")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=

Review Comment:
   This ConfigOption is useless, we should remove it.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("pgoutput")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")

Review Comment:
   This ConfigOption is useless, we should remove it.



##########
docs/content/docs/connectors/pipeline-connectors/postgres.md:
##########
@@ -0,0 +1,436 @@
+---
+title: "MySQL"
+weight: 2
+type: docs
+aliases:
+- /connectors/pipeline-connectors/mysql
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Postgres Connector
+
+Postgres connector allows reading snapshot data and incremental data from 
Postgres database and provides end-to-end full-database data synchronization 
capabilities.
+This document describes how to setup the Postgres connector.
+
+
+## Example
+
+An example of the pipeline for reading data from Postgres and sink to Doris 
can be defined as follows:
+
+```yaml
+source:
+   type: posgtres
+   name: Postgres Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*
+   decoding.plugin.name:  pgoutput
+   slot.name: pgtest
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: pass
+
+pipeline:
+   name: Postgres to Doris Pipeline
+   parallelism: 4
+```
+
+## Connector Options
+
+<div class="highlight">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 10%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 65%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>hostname</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>IP address or hostname of the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>port</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">3306</td>
+      <td>Integer</td>
+      <td>Integer port number of the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>username</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Postgres database to use when connecting to the Postgres 
database server.</td>
+    </tr>
+    <tr>
+      <td>password</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password to use when connecting to the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>tables</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Table name of the Postgres database to monitor. The table-name also 
supports regular expressions to monitor multiple tables that satisfy the 
regular expressions. <br>
+          It is important to note that the dot (.) is treated as a delimiter 
for database and table names. 
+          If there is a need to use a dot (.) in a regular expression to match 
any character, it is necessary to escape the dot with a backslash.<br>
+          例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*</td>
+    </tr>
+    <tr>
+      <td>slot.name</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of the PostgreSQL logical decoding slot that was created 
for streaming changes from a particular plug-in
+          for a particular database/schema. The server uses this slot to 
stream events to the connector that you are configuring.
+          <br/>Slot names must conform to <a 
href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION";>PostgreSQL
 replication slot naming rules</a>, which state: "Each replication slot has a 
name, which can contain lower-case letters, numbers, and the underscore 
character."</td>
+    </tr> 
+    <tr>
+      <td>tables.exclude</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Table name of the MySQL database to exclude, parameter will have an 
exclusion effect after the tables parameter. The table-name also supports 
regular expressions to exclude multiple tables that satisfy the regular 
expressions. <br>
+          The usage is the same as the tables parameter</td>
+    </tr>
+    <tr>
+      <td>schema-change.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Whether to send schema change events, so that downstream sinks can 
respond to schema changes and achieve table structure synchronization.</td>

Review Comment:
   Schema changes were not supported now, we should not expose this config to 
user.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("pgoutput")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.startup.timestamp-millis")

Review Comment:
   This ConfigOption is useless, we should remove it.



##########
docs/content/docs/connectors/pipeline-connectors/postgres.md:
##########
@@ -0,0 +1,436 @@
+---
+title: "MySQL"
+weight: 2
+type: docs
+aliases:
+- /connectors/pipeline-connectors/mysql
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Postgres Connector
+
+Postgres connector allows reading snapshot data and incremental data from 
Postgres database and provides end-to-end full-database data synchronization 
capabilities.
+This document describes how to setup the Postgres connector.
+
+
+## Example
+
+An example of the pipeline for reading data from Postgres and sink to Doris 
can be defined as follows:
+
+```yaml
+source:
+   type: posgtres
+   name: Postgres Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*
+   decoding.plugin.name:  pgoutput
+   slot.name: pgtest
+
+sink:
+  type: doris
+  name: Doris Sink
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: pass
+
+pipeline:
+   name: Postgres to Doris Pipeline
+   parallelism: 4
+```
+
+## Connector Options
+
+<div class="highlight">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 10%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 65%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>hostname</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>IP address or hostname of the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>port</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">3306</td>
+      <td>Integer</td>
+      <td>Integer port number of the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>username</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Postgres database to use when connecting to the Postgres 
database server.</td>
+    </tr>
+    <tr>
+      <td>password</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password to use when connecting to the Postgres database server.</td>
+    </tr>
+    <tr>
+      <td>tables</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Table name of the Postgres database to monitor. The table-name also 
supports regular expressions to monitor multiple tables that satisfy the 
regular expressions. <br>
+          It is important to note that the dot (.) is treated as a delimiter 
for database and table names. 
+          If there is a need to use a dot (.) in a regular expression to match 
any character, it is necessary to escape the dot with a backslash.<br>
+          例如,adb.\.*.\.*, bdb.user_schema_[0-9].user_table_[0-9]+, 
[app|web].schema_\.*.order_\.*</td>
+    </tr>
+    <tr>
+      <td>slot.name</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of the PostgreSQL logical decoding slot that was created 
for streaming changes from a particular plug-in
+          for a particular database/schema. The server uses this slot to 
stream events to the connector that you are configuring.
+          <br/>Slot names must conform to <a 
href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION";>PostgreSQL
 replication slot naming rules</a>, which state: "Each replication slot has a 
name, which can contain lower-case letters, numbers, and the underscore 
character."</td>
+    </tr> 
+    <tr>
+      <td>tables.exclude</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Table name of the MySQL database to exclude, parameter will have an 
exclusion effect after the tables parameter. The table-name also supports 
regular expressions to exclude multiple tables that satisfy the regular 
expressions. <br>
+          The usage is the same as the tables parameter</td>
+    </tr>
+    <tr>
+      <td>schema-change.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Whether to send schema change events, so that downstream sinks can 
respond to schema changes and achieve table structure synchronization.</td>
+    </tr>
+    <tr>
+      <td>scan.incremental.snapshot.chunk.size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">8096</td>
+      <td>Integer</td>
+      <td>The chunk size (number of rows) of table snapshot, captured tables 
are split into multiple chunks when read the snapshot of table.</td>
+    </tr>
+    <tr>
+      <td>scan.snapshot.fetch.size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">1024</td>
+      <td>Integer</td>
+      <td>The maximum fetch size for per poll when read table snapshot.</td>
+    </tr>
+    <tr>
+      <td>scan.startup.mode</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">initial</td>
+      <td>String</td>
+      <td> Optional startup mode for Postgres CDC consumer, valid enumerations 
are "initial","latest-offset","committed-offset"和 ""snapshot"。</td>
+    </tr>
+    <tr>
+      <td>scan.startup.specific-offset.skip-events</td>

Review Comment:
   Remove this config as it was useless.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("pgoutput")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");

Review Comment:
   Only "initial"\"snapshot"\"latest-offset" were supported.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.function.Function;
+
+/** End-to-end tests for postgres cdc pipeline job. */
+public class PostgresE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresE2eITCase.class);
+
+    @Container
+    public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
+            new PostgreSQLContainer<>(PG_IMAGE)
+                    .withNetworkAliases(INTER_CONTAINER_POSTGRES_ALIAS)
+                    .withNetwork(NETWORK)
+                    .withDatabaseName("flink-test")
+                    .withUsername(POSTGRES_TEST_USER)
+                    .withPassword(POSTGRES_TEST_PASSWORD)
+                    .withLogConsumer(new Slf4jLogConsumer(LOG))
+                    .withCommand(
+                            "postgres",
+                            "-c",
+                            // default
+                            "fsync=off",
+                            "-c",
+                            "max_replication_slots=20",
+                            "-c",
+                            "wal_level=logical")
+                    .waitingFor(Wait.forListeningPort())
+                    .withStartupTimeout(Duration.ofSeconds(30L));
+
+    private final UniqueDatabase postgresInventoryDatabase =
+            new UniqueDatabase(
+                    POSTGRES_CONTAINER,
+                    "postgres",
+                    "postgres_inventory",
+                    POSTGRES_TEST_USER,
+                    POSTGRES_TEST_PASSWORD);
+
+    private final Function<String, String> dbNameFormatter = (s) -> 
String.format(s, "inventory");
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        postgresInventoryDatabase.createAndInitialize();
+    }
+
+    @AfterEach
+    public void after() {
+        super.after();
+        postgresInventoryDatabase.dropDatabase();
+    }
+
+    @Test
+    void testSyncWholeDatabase() throws Exception {
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: postgres\n"
+                                + "  hostname: %s\n"
+                                + "  port: %d\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*.\\.*\n"
+                                + "  slot.name: flinktest\n"
+                                + "  scan.startup.mode: initial\n"
+                                //                                + "  
server-time-zone: UTC\n"

Review Comment:
   We can remove this if it's unnecessary.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Configurations for {@link PostgresDataSource}. */
+@PublicEvolving
+public class PostgresDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the PostgreSQL 
database server.");
+    public static final ConfigOption<Integer> PG_PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(5432)
+                    .withDescription("Integer port number of the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the PostgreSQL database to use when 
connecting to the PostgreSQL database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the PostgreSQL 
database server.");
+
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the PostgreSQL tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> DECODING_PLUGIN_NAME =
+            ConfigOptions.key("decoding.plugin.name")
+                    .stringType()
+                    .defaultValue("pgoutput")
+                    .withDescription(
+                            "The name of the Postgres logical decoding plug-in 
installed on the server.\n"
+                                    + "Supported values are decoderbufs, 
wal2json, wal2json_rds, wal2json_streaming,\n"
+                                    + "wal2json_rds_streaming and pgoutput.");
+
+    public static final ConfigOption<String> SLOT_NAME =
+            ConfigOptions.key("slot.name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The name of the PostgreSQL logical decoding slot 
that was created for streaming changes "
+                                    + "from a particular plug-in for a 
particular database/schema. The server uses this slot "
+                                    + "to stream events to the connector that 
you are configuring.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the PostgreSQL database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build PostgreSQL database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for PostgreSQL CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.startup.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"timestamp\" 
startup mode");
+
+    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+            ConfigOptions.key("heartbeat.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Optional interval of sending heartbeat event for 
tracing the latest available binlog offsets");

Review Comment:
   binlog => replication slot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to