This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3126faaf5 [INLONG-6751][Sort] Add read phase metric and table level 
metric for Oracle (#6808)
3126faaf5 is described below

commit 3126faaf5e8851aa62a8772eeb4096bf47034613
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Tue Dec 13 16:49:37 2022 +0800

    [INLONG-6751][Sort] Add read phase metric and table level metric for Oracle 
(#6808)
---
 .../base/metric/sub/SourceTableMetricData.java     | 56 ++++++++++++++++---
 .../sort/cdc/base/util/CallbackCollector.java      | 47 ++++++++++++++++
 .../inlong/sort/cdc/oracle/OracleSource.java       |  9 +++-
 .../oracle/debezium/DebeziumSourceFunction.java    | 63 ++++++++++++++++------
 .../sort/cdc/oracle/table/OracleTableSource.java   |  3 +-
 5 files changed, 152 insertions(+), 26 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
index 112000903..a11d73711 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
@@ -122,9 +122,14 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
         String metricGroupLabels = labels.entrySet().stream().map(entry -> 
entry.getKey() + "=" + entry.getValue())
                 .collect(Collectors.joining(DELIMITER));
         StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
-        
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-                
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
-
+        if (schemaInfoArray.length == 2) {
+            
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+                    
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
+        } else if (schemaInfoArray.length == 3) {
+            
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+                    
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
+                    
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+        }
         MetricOption metricOption = MetricOption.builder()
                 .withInitRecords(subMetricState != null ? 
subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(subMetricState != null ? 
subMetricState.getMetricValue(NUM_BYTES_IN) : 0L)
@@ -135,14 +140,18 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
     }
 
     /**
-     * build record schema identify,in the form of database.table
+     * build record schema identify,in the form of database.schema.table or 
database.table
      *
      * @param database the database name of record
+     * @param schema the schema name of record
      * @param table the table name of record
      * @return the record schema identify
      */
-    public String buildSchemaIdentify(String database, String table) {
-        return database + Constants.SEMICOLON + table;
+    public String buildSchemaIdentify(String database, String schema, String 
table) {
+        if (schema == null) {
+            return database + Constants.SEMICOLON + table;
+        }
+        return database + Constants.SEMICOLON + schema + Constants.SEMICOLON + 
table;
     }
 
     /**
@@ -168,7 +177,7 @@ public class SourceTableMetricData extends SourceMetricData 
implements SourceSub
             outputMetricsWithEstimate(data);
             return;
         }
-        String identify = buildSchemaIdentify(database, table);
+        String identify = buildSchemaIdentify(database, null, table);
         SourceMetricData subSourceMetricData;
         if (subSourceMetricMap.containsKey(identify)) {
             subSourceMetricData = subSourceMetricMap.get(identify);
@@ -186,6 +195,39 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
         outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : 
ReadPhase.INCREASE_PHASE);
     }
 
+    /**
+     * output metrics with estimate
+     *
+     * @param database the database name of record
+     * @param schema the schema name of record
+     * @param table the table name of record
+     * @param isSnapshotRecord is it snapshot record
+     * @param data the data of record
+     */
+    public void outputMetricsWithEstimate(String database, String schema, 
String table,
+            boolean isSnapshotRecord, Object data) {
+        if (StringUtils.isBlank(database) || StringUtils.isBlank(schema) || 
StringUtils.isBlank(table)) {
+            outputMetricsWithEstimate(data);
+            return;
+        }
+        String identify = buildSchemaIdentify(database, schema, table);
+        SourceMetricData subSourceMetricData;
+        if (subSourceMetricMap.containsKey(identify)) {
+            subSourceMetricData = subSourceMetricMap.get(identify);
+        } else {
+            subSourceMetricData = buildSubSourceMetricData(new 
String[]{database, schema, table}, this);
+            subSourceMetricMap.put(identify, subSourceMetricData);
+        }
+        // source metric and sub source metric output metrics
+        long rowCountSize = 1L;
+        long rowDataSize = 
data.toString().getBytes(StandardCharsets.UTF_8).length;
+        this.outputMetrics(rowCountSize, rowDataSize);
+        subSourceMetricData.outputMetrics(rowCountSize, rowDataSize);
+
+        // output read phase metric
+        outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : 
ReadPhase.INCREASE_PHASE);
+    }
+
     /**
      * output read phase metric
      *
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
new file mode 100644
index 000000000..289f470bf
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.debezium.utils;
+
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A collector supporting callback.
+ */
+public class CallbackCollector<T> implements Collector<T> {
+
+    private final ThrowingConsumer<T, Exception> callback;
+
+    public CallbackCollector(ThrowingConsumer<T, Exception> callback) {
+        this.callback = callback;
+    }
+
+    @Override
+    public void collect(T t) {
+        try {
+            callback.accept(t);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
index 5aacc1dcc..921bee3f1 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java
@@ -55,6 +55,7 @@ public class OracleSource {
         private DebeziumDeserializationSchema<T> deserializer;
         private String inlongMetric;
         private String inlongAudit;
+        private boolean sourceMultipleEnable;
 
         public Builder<T> hostname(String hostname) {
             this.hostname = hostname;
@@ -141,6 +142,11 @@ public class OracleSource {
             return this;
         }
 
+        public Builder<T> sourceMultipleEnable(boolean sourceMultipleEnable) {
+            this.sourceMultipleEnable = sourceMultipleEnable;
+            return this;
+        }
+
         public DebeziumSourceFunction<T> build() {
             Properties props = new Properties();
             props.setProperty("connector.class", 
OracleConnector.class.getCanonicalName());
@@ -184,7 +190,8 @@ public class OracleSource {
             }
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, specificOffset, new 
OracleValidator(props), inlongMetric, inlongAudit);
+                    deserializer, props, specificOffset, new 
OracleValidator(props),
+                    inlongMetric, inlongAudit, sourceMultipleEnable);
         }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
index c67538bf5..8966b87d5 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
@@ -23,6 +23,9 @@ import static 
org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 
 import com.ververica.cdc.debezium.Validator;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
 import io.debezium.document.DocumentReader;
 import io.debezium.document.DocumentWriter;
 import io.debezium.embedded.Connect;
@@ -65,6 +68,12 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffset;
 import 
org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffsetSerializer;
@@ -72,15 +81,12 @@ import 
org.apache.inlong.sort.cdc.base.debezium.internal.FlinkDatabaseHistory;
 import 
org.apache.inlong.sort.cdc.base.debezium.internal.FlinkOffsetBackingStore;
 import org.apache.inlong.sort.cdc.base.debezium.internal.Handover;
 import org.apache.inlong.sort.cdc.base.debezium.internal.SchemaRecord;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.base.util.DatabaseHistoryUtil;
-import org.apache.inlong.sort.base.util.MetricStateUtils;
-import 
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer;
 import 
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeFetcher;
+import 
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer;
 import 
org.apache.inlong.sort.cdc.oracle.debezium.internal.FlinkDatabaseSchemaHistory;
+import org.apache.inlong.sort.cdc.oracle.debezium.utils.CallbackCollector;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -232,7 +238,9 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     private String inlongAudit;
 
-    private SourceMetricData sourceMetricData;
+    private boolean sourceMultipleEnable;
+
+    private SourceTableMetricData sourceMetricData;
 
     private transient ListState<MetricState> metricStateListState;
 
@@ -246,13 +254,15 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             @Nullable DebeziumOffset specificOffset,
             Validator validator,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            boolean sourceMultipleEnable) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
+        this.sourceMultipleEnable = sourceMultipleEnable;
     }
 
     @Override
@@ -447,7 +457,11 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup);
+            if (sourceMultipleEnable) {
+                // register sub source metric data from metric state
+                sourceMetricData.registerSubMetricsGroup(metricState);
+            }
         }
 
         properties.setProperty("name", "engine");
@@ -488,19 +502,34 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
                             @Override
                             public void deserialize(SourceRecord record, 
Collector<T> out) throws Exception {
-                                if (sourceMetricData != null) {
-                                    
sourceMetricData.outputMetricsWithEstimate(record.value());
-                                }
-                                deserializer.deserialize(record, out);
+                                deserializer.deserialize(record, new 
CallbackCollector<>(inputRow -> {
+                                    if (sourceMetricData != null) {
+                                        
sourceMetricData.outputMetricsWithEstimate(record.value());
+                                    }
+                                    out.collect(inputRow);
+                                }));
                             }
 
                             @Override
                             public void deserialize(SourceRecord record, 
Collector<T> out,
                                     TableChange tableSchema) throws Exception {
-                                if (sourceMetricData != null) {
-                                    
sourceMetricData.outputMetricsWithEstimate(record.value());
-                                }
-                                deserializer.deserialize(record, out, 
tableSchema);
+                                deserializer.deserialize(record, new 
CallbackCollector<>(inputRow -> {
+                                    if (sourceMetricData != null && record != 
null && sourceMultipleEnable) {
+                                        Struct value = (Struct) record.value();
+                                        Struct source = 
value.getStruct(Envelope.FieldName.SOURCE);
+                                        String dbName = 
source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
+                                        String schemaName = 
source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
+                                        String tableName = 
source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+                                        SnapshotRecord snapshotRecord = 
SnapshotRecord.fromSource(source);
+                                        boolean isSnapshotRecord = 
(SnapshotRecord.TRUE == snapshotRecord);
+                                        sourceMetricData
+                                                
.outputMetricsWithEstimate(dbName, schemaName, tableName,
+                                                        isSnapshotRecord, 
value);
+                                    } else if (sourceMetricData != null && 
record != null) {
+                                        
sourceMetricData.outputMetricsWithEstimate(record.value());
+                                    }
+                                    out.collect(inputRow);
+                                }), tableSchema);
                             }
 
                             @Override
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
index cbb586cb7..41c9ff75c 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java
@@ -145,7 +145,8 @@ public class OracleTableSource implements ScanTableSource, 
SupportsReadingMetada
                         .startupOptions(startupOptions)
                         .deserializer(deserializer)
                         .inlongMetric(inlongMetric)
-                        .inlongAudit(inlongAudit);
+                        .inlongAudit(inlongAudit)
+                        .sourceMultipleEnable(sourceMultipleEnable);
         DebeziumSourceFunction<RowData> sourceFunction = builder.build();
 
         return SourceFunctionProvider.of(sourceFunction, false);

Reply via email to