Mrart commented on code in PR #3995:
URL: https://github.com/apache/flink-cdc/pull/3995#discussion_r2532408471


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");

Review Comment:
   Why do we need jdbc.url after hostIP and port, even if necessary, please 
fill in the description.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");

Review Comment:
    please fill in the description more detail



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the oracle 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the oracle 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the oracle database to use when 
connecting to the oracle database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the oracle tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the oracle database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build oracle database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for oracle CDC consumer, 
valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");

Review Comment:
   up+1



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.utils;
+
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+
+import io.debezium.relational.Column;
+
+/** Utilities for converting from oracle types to {@link DataType}s. */
+public class OracleTypeUtils {

Review Comment:
   
https://debezium.io/documentation/reference/stable/connectors/oracle.html#oracle-data-type-mappings,
 We need to refer to the documentation to do that?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the oracle 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the oracle 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the oracle database to use when 
connecting to the oracle database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the oracle tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."

Review Comment:
   Why do we need db.tables when we have db configuration? Please correct the 
description



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the oracle 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the oracle 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the oracle database to use when 
connecting to the oracle database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the oracle tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the oracle database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build oracle database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for oracle CDC consumer, 
valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");

Review Comment:
   The documentation only says initial and the earliest-offset and 
latest-offset schemas, so make sure you know which ones are supported.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the oracle 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the oracle 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the oracle database to use when 
connecting to the oracle database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the oracle tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the oracle database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build oracle database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for oracle CDC consumer, 
valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()

Review Comment:
   up+1
   



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the oracle 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the oracle 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the oracle database to use when 
connecting to the oracle database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the oracle tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the oracle database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build oracle database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for oracle CDC consumer, 
valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<String> 
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
+            ConfigOptions.key("scan.startup.specific-offset.gtid-set")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional GTID set used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-events")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional number of events to skip after the 
specific starting offset");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-rows")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("Optional number of rows to skip after 
the specific offset");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.startup.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"timestamp\" 
startup mode");
+
+    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+            ConfigOptions.key("heartbeat.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Optional interval of sending heartbeat event for 
tracing the latest available binlog offsets");
+
+    // 
----------------------------------------------------------------------------
+    // experimental options, won't add them to documentation
+    // 
----------------------------------------------------------------------------
+    @Experimental
+    public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
+            ConfigOptions.key("chunk-meta.group.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The group size of chunk meta, if the meta size 
exceeds the group size, the meta will be divided into multiple groups.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
+                    .doubleType()
+                    .defaultValue(1000.0d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
+                    .withDescription(
+                            "The upper bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query oracle for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
+                    .doubleType()
+                    .defaultValue(0.05d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
+                    .withDescription(
+                            "The lower bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query oracle for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
+            ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to close idle readers at the end of the 
snapshot phase. This feature depends on "
+                                    + "FLIP-147: Support Checkpoints After 
Tasks Finished. The flink version is required to be "
+                                    + "greater than or equal to 1.14 when 
enabling this feature.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
+            ConfigOptions.key("schema-change.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether send schema change events, by default is 
true. If set to false, the schema changes will not be sent.");

Review Comment:
   We now support schema-change.enable?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the oracle 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the oracle 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the oracle database to use when 
connecting to the oracle database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the oracle tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the oracle database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build oracle database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for oracle CDC consumer, 
valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<String> 
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
+            ConfigOptions.key("scan.startup.specific-offset.gtid-set")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional GTID set used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-events")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(

Review Comment:
   up+1



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the oracle 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the oracle 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the oracle database to use when 
connecting to the oracle database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the oracle tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the oracle database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build oracle database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for oracle CDC consumer, 
valid enumerations are "
+                                    + "\"initial\", \"earliest-offset\", 
\"latest-offset\", \"timestamp\"\n"
+                                    + "or \"specific-offset\"");
+
+    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSET_FILE 
=
+            ConfigOptions.key("scan.startup.specific-offset.file")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file name used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_SPECIFIC_OFFSET_POS =
+            ConfigOptions.key("scan.startup.specific-offset.pos")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional binlog file position used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<String> 
SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET =
+            ConfigOptions.key("scan.startup.specific-offset.gtid-set")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional GTID set used in case of 
\"specific-offset\" startup mode");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-events")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional number of events to skip after the 
specific starting offset");
+
+    public static final ConfigOption<Long> 
SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS =
+            ConfigOptions.key("scan.startup.specific-offset.skip-rows")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("Optional number of rows to skip after 
the specific offset");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.startup.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"timestamp\" 
startup mode");
+
+    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+            ConfigOptions.key("heartbeat.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Optional interval of sending heartbeat event for 
tracing the latest available binlog offsets");
+
+    // 
----------------------------------------------------------------------------
+    // experimental options, won't add them to documentation
+    // 
----------------------------------------------------------------------------
+    @Experimental
+    public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
+            ConfigOptions.key("chunk-meta.group.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The group size of chunk meta, if the meta size 
exceeds the group size, the meta will be divided into multiple groups.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
+                    .doubleType()
+                    .defaultValue(1000.0d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
+                    .withDescription(
+                            "The upper bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query oracle for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
+                    .doubleType()
+                    .defaultValue(0.05d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
+                    .withDescription(
+                            "The lower bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query oracle for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
+            ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to close idle readers at the end of the 
snapshot phase. This feature depends on "
+                                    + "FLIP-147: Support Checkpoints After 
Tasks Finished. The flink version is required to be "
+                                    + "greater than or equal to 1.14 when 
enabling this feature.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
+            ConfigOptions.key("schema-change.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether send schema change events, by default is 
true. If set to false, the schema changes will not be sent.");
+
+    @Experimental
+    public static final ConfigOption<String> DATABASE_TABLE_CASE_INSENSITIVE =
+            ConfigOptions.key("debezium.database.tablename.case.insensitive")
+                    .stringType()
+                    .defaultValue("false")
+                    .withDescription(
+                            "The case sensitivity of database and table names 
usually depends on the type of database and its configuration.");
+
+    @Experimental
+    public static final ConfigOption<String> DATABASE_CONNECTION_ADAPTER =
+            ConfigOptions.key("debezium.database.connection.adapter")
+                    .stringType()
+                    .defaultValue("logminer")
+                    .withDescription("Database connection adapter.");
+
+    @Experimental
+    public static final ConfigOption<String> LOG_MINING_STRATEGY =
+            ConfigOptions.key("debezium.log.mining.strategy")
+                    .stringType()
+                    .defaultValue("online_catalog")
+                    .withDescription("A strategy in log data analysis or 
mining.");
+
+    @Experimental
+    public static final ConfigOption<String> LOG_MINING_CONTINUOUS_MINE =
+            ConfigOptions.key("debezium.log.mining.continuous.mine")

Review Comment:
   debezium.* No need to enumerate every value, right?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference;
+
+import io.debezium.data.geometry.Geography;
+import io.debezium.data.geometry.Geometry;
+import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Schema;
+
+/** {@link DataType} inference for oracle debezium {@link Schema}. */
+@Internal
+public class OracleSchemaDataTypeInference extends 
DebeziumSchemaDataTypeInference {
+
+    private static final long serialVersionUID = 1L;
+
+    protected DataType inferStruct(Object value, Schema schema) {
+        // the Geometry datatype in oracle will be converted to
+        // a String with Json format
+        if (Point.LOGICAL_NAME.equals(schema.name())
+                || Geometry.LOGICAL_NAME.equals(schema.name())
+                || Geography.LOGICAL_NAME.equals(schema.name())) {
+            return DataTypes.STRING();

Review Comment:
   We don't currently support point-related field types?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessor.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.source.MetadataAccessor;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** {@link MetadataAccessor} for {@link OracleDataSource}. */
+@Internal
+public class OracleMetadataAccessor implements MetadataAccessor {
+
+    private final OracleSourceConfig sourceConfig;
+
+    public OracleMetadataAccessor(OracleSourceConfig sourceConfig) {
+        this.sourceConfig = sourceConfig;
+    }
+
+    /**
+     * Always throw {@link UnsupportedOperationException} because oracle does 
not support namespace.
+     */
+    @Override
+    public List<String> listNamespaces() {
+        throw new UnsupportedOperationException("List namespace is not 
supported by oracle.");
+    }
+
+    /**
+     * List all database from oracle.
+     *
+     * @param namespace This parameter is ignored because oracle does not 
support namespace.
+     * @return The list of database
+     */
+    @Override
+    public List<String> listSchemas(@Nullable String namespace) {
+        return OracleSchemaUtils.listDatabases(sourceConfig);
+    }
+
+    /**
+     * List tables from oracle.
+     *
+     * @param namespace This parameter is ignored because oracle does not 
support namespace.
+     * @param dbName The database to list tables from. If null, list tables 
from all databases.
+     * @return The list of {@link TableId}s.
+     */
+    @Override
+    public List<TableId> listTables(@Nullable String namespace, @Nullable 
String dbName) {
+        return OracleSchemaUtils.listTables(sourceConfig, dbName);
+    }
+
+    /**
+     * Get the {@link Schema} of the given table.
+     *
+     * @param tableId The {@link TableId} of the given table.
+     * @return The {@link Schema} of the table.
+     */
+    @Override
+    public Schema getTableSchema(TableId tableId) {
+        Schema schema = OracleSchemaUtils.getTableSchema(tableId, 
sourceConfig);
+        return schema;

Review Comment:
   Why not just return?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The jdbc url.");
+    public static final ConfigOption<String> SCHEMALIST =
+            ConfigOptions.key("schemalist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("schema list.");

Review Comment:
   If tables are in db.schema.table format, why do we need schemalist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to