lvyanquan commented on code in PR #3995:
URL: https://github.com/apache/flink-cdc/pull/3995#discussion_r2647403630


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.parser;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.types.DataType;
+
+import io.debezium.connector.oracle.antlr.OracleDdlParser;
+import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.text.ParsingException;
+import org.antlr.v4.runtime.tree.ParseTreeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static io.debezium.antlr.AntlrDdlParser.getText;
+import static 
org.apache.flink.cdc.connectors.oracle.utils.OracleTypeUtils.fromDbzColumn;
+
+/** Parser listener that is parsing Oracle ALTER TABLE statements. */
+public class OracleAlterTableParserListener extends BaseParserListener {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(
+                    
io.debezium.connector.oracle.antlr.listener.AlterTableParserListener.class);

Review Comment:
    LoggerFactory.getLogger(OracleAlterTableParserListener.class);



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OraclePipelineRecordEmitter.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+
+/** The {@link RecordEmitter} implementation for Oracle pipeline connector. */
+public class OraclePipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {

Review Comment:
   We can clarify that the generic type `T` here is `Event`.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSourceOptions.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+import org.apache.flink.cdc.common.configuration.Configuration;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Configurations for {@link OracleDataSource}. */
+@PublicEvolving
+public class OracleDataSourceOptions {
+
+    public static final ConfigOption<String> JDBC_URL =
+            ConfigOptions.key("jdbc.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The url for oracle jdbc ,the url will be used 
preferentially,if no url is configured, then use 
\"jdbc:oracle:thin:@localhost:1521:orcl\",but oracle 19c url is 
\"jdbc:oracle:thin:@//localhost:1521/pdb1\",so the url property is option to 
adapt to different versions of Oracle");
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the oracle 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(3306)
+                    .withDescription("Integer port number of the oracle 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the oracle database to use when 
connecting to the oracle database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+
+    public static final ConfigOption<String> DATABASE =
+            ConfigOptions.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the oracle 
database server.");
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the oracle tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. test.user_table_[0-9]+, 
test[0-9].[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the oracle database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build oracle database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for oracle CDC consumer, 
valid enumerations are "
+                                    + "\"initial\", \"latest-offset\", 
\"snapshot\"");
+
+    public static final ConfigOption<Duration> HEARTBEAT_INTERVAL =
+            ConfigOptions.key("heartbeat.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Optional interval of sending heartbeat event for 
tracing the latest available binlog offsets");
+
+    @Experimental
+    public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
+            ConfigOptions.key("chunk-meta.group.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The group size of chunk meta, if the meta size 
exceeds the group size, the meta will be divided into multiple groups.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
+                    .doubleType()
+                    .defaultValue(1000.0d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
+                    .withDescription(
+                            "The upper bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query oracle for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
+                    .doubleType()
+                    .defaultValue(0.05d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
+                    .withDescription(
+                            "The lower bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query oracle for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
+            ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to close idle readers at the end of the 
snapshot phase. This feature depends on "
+                                    + "FLIP-147: Support Checkpoints After 
Tasks Finished. The flink version is required to be "
+                                    + "greater than or equal to 1.14 when 
enabling this feature.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
+            ConfigOptions.key("schema-change.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether send schema change events, by default is 
true. If set to false, the schema changes will not be sent.");

Review Comment:
   What would happen if we alter table structure but setting 
`schema-change.enabled` to false? Should we always set this parameter to true?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleDataSource.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source;
+
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.source.DataSource;
+import org.apache.flink.cdc.common.source.EventSourceProvider;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.source.MetadataAccessor;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import 
org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
+import 
org.apache.flink.cdc.connectors.oracle.source.reader.OracleTableSourceReader;
+import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a Oracle redo log 
from a logical
+ * description.
+ */
+public class OracleDataSource implements DataSource, SupportsReadingMetadata {
+
+    private final OracleSourceConfig sourceConfig;
+    private final Configuration config;
+    private final OracleSourceConfigFactory configFactory;
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    private final List<OracleReadableMetaData> readableMetadataList;
+
+    public OracleDataSource(
+            OracleSourceConfigFactory configFactory,
+            Configuration config,
+            List<OracleReadableMetaData> readableMetadataList) {
+        this.sourceConfig = configFactory.create(0);
+        this.config = config;
+        this.metadataKeys = Collections.emptyList();
+        this.readableMetadataList = readableMetadataList;
+        this.configFactory = configFactory;
+    }
+
+    @Override
+    public EventSourceProvider getEventSourceProvider() {
+
+        OracleDialect oracleDialect = new OracleDialect();
+        OracleEventDeserializer deserializer =
+                new OracleEventDeserializer(
+                        DebeziumChangelogMode.ALL,
+                        
config.get(OracleDataSourceOptions.SCHEMA_CHANGE_ENABLED),
+                        readableMetadataList);
+
+        RedoLogOffsetFactory offsetFactory = new RedoLogOffsetFactory();
+        OracleTableSourceReader oracleChangeEventSource =
+                new OracleTableSourceReader(
+                        configFactory, deserializer, offsetFactory, 
oracleDialect);

Review Comment:
   Can be updated to the following code to avoid `Raw use of parameterized 
class`.
   ```
           OracleEventDeserializer<Event> deserializer =
                   new OracleEventDeserializer<>(
                           DebeziumChangelogMode.ALL,
                           
config.get(OracleDataSourceOptions.SCHEMA_CHANGE_ENABLED),
                           readableMetadataList);
   
           RedoLogOffsetFactory offsetFactory = new RedoLogOffsetFactory();
           OracleTableSourceReader<Event> oracleChangeEventSource =
                   new OracleTableSourceReader<>(
                           configFactory, deserializer, offsetFactory, 
oracleDialect);
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/RecordsFormatter.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.testutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Formatter that formats the {@link SourceRecord} to String. */
+public class RecordsFormatter {

Review Comment:
   This class seems to be useless.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OracleTableSourceReader.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.config.SourceConfig;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader;
+import org.apache.flink.cdc.connectors.oracle.source.OracleDialect;
+import org.apache.flink.cdc.connectors.oracle.source.OracleSourceBuilder;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import 
org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffsetFactory;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+/**
+ * The basic source of Incremental Snapshot framework for JDBC datasource, it 
is based on FLIP-27
+ * and Watermark Signal Algorithm which supports parallel reading snapshot of 
table and then
+ * continue to capture data change by streaming reading.
+ */
+public class OracleTableSourceReader<T> extends 
OracleSourceBuilder.OracleIncrementalSource<T> {

Review Comment:
   We can clarify that the generic type `T` here is `Event`.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OraclePipelineRecordEmitter.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+
+/** The {@link RecordEmitter} implementation for Oracle pipeline connector. */
+public class OraclePipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+    private static final long serialVersionUID = 1L;
+    // Used when startup mode is initial
+    private final Set<TableId> alreadySendCreateTableTables;
+    private final Map<TableId, CreateTableEvent> createTableEventCache;
+
+    // Used when startup mode is not initial
+    private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
+    private final boolean isBounded;
+    private final OracleSourceConfig sourceConfig;
+
+    public OraclePipelineRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            boolean includeSchemaChanges,
+            OffsetFactory offsetFactory,
+            OracleSourceConfig sourceConfig) {
+        super(
+                debeziumDeserializationSchema,
+                sourceReaderMetrics,
+                includeSchemaChanges,
+                offsetFactory);
+        List<String> tableList = sourceConfig.getTableList();
+        this.sourceConfig = sourceConfig;
+        this.createTableEventCache = new HashMap<>();
+        this.alreadySendCreateTableTables = new HashSet<>();
+        try (JdbcConnection jdbc = 
OracleSchemaUtils.createOracleConnection(sourceConfig)) {
+
+            List<TableId> capturedTableIds = new ArrayList<>();
+            for (String table : tableList) {
+                TableId capturedTableId = 
TableId.parse(table.toUpperCase(Locale.ROOT));
+                capturedTableIds.add(capturedTableId);
+            }
+            for (TableId tableId : capturedTableIds) {
+                Schema schema = OracleSchemaUtils.getSchema(jdbc, tableId);
+                createTableEventCache.put(
+                        tableId,
+                        new CreateTableEvent(
+                                
org.apache.flink.cdc.common.event.TableId.tableId(
+                                        tableId.catalog(), tableId.table()),
+                                schema));
+            }
+            this.isBounded = 
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot start emitter to fetch table 
schema.", e);
+        }
+    }
+
+    @Override
+    protected void processElement(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
+            throws Exception {
+        if (shouldEmitAllCreateTableEventsInSnapshotMode && isBounded) {
+            for (TableId tableId : createTableEventCache.keySet()) {
+                output.collect((T) createTableEventCache.get(tableId));
+                alreadySendCreateTableTables.add(
+                        TableId.parse(tableId.schema() + "." + 
tableId.table()));
+            }
+            shouldEmitAllCreateTableEventsInSnapshotMode = false;
+        } else if (isLowWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
+            TableId tableId = 
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
+            if (!alreadySendCreateTableTables.contains(tableId)) {
+                try (JdbcConnection jdbc = 
OracleSchemaUtils.createOracleConnection(sourceConfig)) {
+                    sendCreateTableEvent(jdbc, tableId, output);
+                }
+                alreadySendCreateTableTables.add(
+                        TableId.parse(tableId.schema() + "." + 
tableId.table()));

Review Comment:
   alreadySendCreateTableTables.add(new TableId(tableId.schema(), null, 
tableId.table()));



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/TestTable.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.testutils;
+
+import 
org.apache.flink.cdc.connectors.oracle.table.OracleDeserializationConverterFactory;
+import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.RowRowConverter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.List;
+
+/**
+ * Test utility for creating converter, formatter and deserializer of a table 
in the test database.
+ */
+public class TestTable {

Review Comment:
   This class seems to be useless.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/OraclePipelineRecordEmitter.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+
+/** The {@link RecordEmitter} implementation for Oracle pipeline connector. */
+public class OraclePipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+    private static final long serialVersionUID = 1L;
+    // Used when startup mode is initial
+    private final Set<TableId> alreadySendCreateTableTables;
+    private final Map<TableId, CreateTableEvent> createTableEventCache;
+
+    // Used when startup mode is not initial
+    private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
+    private final boolean isBounded;
+    private final OracleSourceConfig sourceConfig;
+
+    public OraclePipelineRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            boolean includeSchemaChanges,
+            OffsetFactory offsetFactory,
+            OracleSourceConfig sourceConfig) {
+        super(
+                debeziumDeserializationSchema,
+                sourceReaderMetrics,
+                includeSchemaChanges,
+                offsetFactory);
+        List<String> tableList = sourceConfig.getTableList();
+        this.sourceConfig = sourceConfig;
+        this.createTableEventCache = new HashMap<>();
+        this.alreadySendCreateTableTables = new HashSet<>();
+        try (JdbcConnection jdbc = 
OracleSchemaUtils.createOracleConnection(sourceConfig)) {
+
+            List<TableId> capturedTableIds = new ArrayList<>();
+            for (String table : tableList) {
+                TableId capturedTableId = 
TableId.parse(table.toUpperCase(Locale.ROOT));
+                capturedTableIds.add(capturedTableId);
+            }
+            for (TableId tableId : capturedTableIds) {
+                Schema schema = OracleSchemaUtils.getSchema(jdbc, tableId);
+                createTableEventCache.put(
+                        tableId,
+                        new CreateTableEvent(
+                                
org.apache.flink.cdc.common.event.TableId.tableId(
+                                        tableId.catalog(), tableId.table()),
+                                schema));
+            }
+            this.isBounded = 
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
+        } catch (SQLException e) {
+            throw new RuntimeException("Cannot start emitter to fetch table 
schema.", e);
+        }
+    }
+
+    @Override
+    protected void processElement(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
+            throws Exception {
+        if (shouldEmitAllCreateTableEventsInSnapshotMode && isBounded) {
+            for (TableId tableId : createTableEventCache.keySet()) {
+                output.collect((T) createTableEventCache.get(tableId));
+                alreadySendCreateTableTables.add(
+                        TableId.parse(tableId.schema() + "." + 
tableId.table()));
+            }
+            shouldEmitAllCreateTableEventsInSnapshotMode = false;
+        } else if (isLowWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
+            TableId tableId = 
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
+            if (!alreadySendCreateTableTables.contains(tableId)) {
+                try (JdbcConnection jdbc = 
OracleSchemaUtils.createOracleConnection(sourceConfig)) {
+                    sendCreateTableEvent(jdbc, tableId, output);
+                }
+                alreadySendCreateTableTables.add(
+                        TableId.parse(tableId.schema() + "." + 
tableId.table()));
+            }
+        } else {
+            boolean isDataChangeRecord = isDataChangeRecord(element);
+            if (isDataChangeRecord || isSchemaChangeEvent(element)) {
+                TableId debeziumTableId = getTableId(element);
+                TableId tableId =
+                        TableId.parse(debeziumTableId.schema() + "." + 
debeziumTableId.table());

Review Comment:
   Can we use debeziumTableId directly here?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/testutils/OracleTestUtils.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.testutils;
+
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Oracle test utilities. */
+public class OracleTestUtils {

Review Comment:
   This class seems to be useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to