This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new af3437e3d [INLONG-7006][Sort] Optimize reading metric at the table 
level, and the metric label is passed in by a specific connector. (#7007)
af3437e3d is described below

commit af3437e3dfdb4ff81fb6807ec2e7b99e66915fcb
Author: chestnufang <[email protected]>
AuthorDate: Wed Dec 21 15:50:43 2022 +0800

    [INLONG-7006][Sort] Optimize reading metric at the table level, and the 
metric label is passed in by a specific connector. (#7007)
    
    Co-authored-by: chestnufang <[email protected]>
---
 .../base/metric/sub/SourceTableMetricData.java     | 89 +++++++++-------------
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |  5 +-
 .../source/metrics/MySqlSourceReaderMetrics.java   |  5 +-
 .../oracle/debezium/DebeziumSourceFunction.java    |  5 +-
 .../sort/cdc/postgres/DebeziumSourceFunction.java  |  5 +-
 5 files changed, 51 insertions(+), 58 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
index 3ca762bea..eac1224f6 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java
@@ -24,6 +24,7 @@ import static 
org.apache.inlong.sort.base.Constants.READ_PHASE;
 
 import com.google.common.collect.Maps;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
@@ -55,9 +56,15 @@ public class SourceTableMetricData extends SourceMetricData 
implements SourceSub
      * The sub source metric data container of source metric data
      */
     private final Map<String, SourceMetricData> subSourceMetricMap = 
Maps.newHashMap();
+    /**
+     * The sub source metric label of source sub metric group and this must be 
consistent with the schema information
+     * recorded by the specific connector.
+     */
+    private final List<String> tableMetricLabelList;
 
-    public SourceTableMetricData(MetricOption option, MetricGroup metricGroup) 
{
+    public SourceTableMetricData(MetricOption option, MetricGroup metricGroup, 
List<String> tableMetricLabelList) {
         super(option, metricGroup);
+        this.tableMetricLabelList = tableMetricLabelList;
     }
 
     /**
@@ -83,7 +90,7 @@ public class SourceTableMetricData extends SourceMetricData 
implements SourceSub
         }
         Map<String, MetricState> subMetricStateMap = 
metricState.getSubMetricStateMap();
         for (Entry<String, MetricState> subMetricStateEntry : 
subMetricStateMap.entrySet()) {
-            String[] schemaInfoArray = 
parseSchemaIdentify(subMetricStateEntry.getKey());
+            String[] schemaInfoArray = 
subMetricStateEntry.getKey().split(Constants.SPILT_SEMICOLON);
             final MetricState subMetricState = subMetricStateEntry.getValue();
             SourceMetricData subSourceMetricData = 
buildSubSourceMetricData(schemaInfoArray,
                     subMetricState, this);
@@ -121,13 +128,8 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
         String metricGroupLabels = labels.entrySet().stream().map(entry -> 
entry.getKey() + "=" + entry.getValue())
                 .collect(Collectors.joining(DELIMITER));
         StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
-        if (schemaInfoArray.length == 2) {
-            
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-                    
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
-        } else if (schemaInfoArray.length == 3) {
-            
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-                    
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
-                    
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+        for (int i = 0; i < tableMetricLabelList.size(); i++) {
+            
labelBuilder.append(DELIMITER).append(tableMetricLabelList.get(i)).append("=").append(schemaInfoArray[i]);
         }
         MetricOption metricOption = MetricOption.builder()
                 .withInitRecords(subMetricState != null ? 
subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L)
@@ -135,32 +137,7 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
                 .withInlongLabels(labelBuilder.toString())
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
-        return new SourceTableMetricData(metricOption, 
sourceMetricData.getMetricGroup());
-    }
-
-    /**
-     * build record schema identify,in the form of database.schema.table or 
database.table
-     *
-     * @param database the database name of record
-     * @param schema the schema name of record
-     * @param table the table name of record
-     * @return the record schema identify
-     */
-    public String buildSchemaIdentify(String database, String schema, String 
table) {
-        if (schema == null) {
-            return database + Constants.SEMICOLON + table;
-        }
-        return database + Constants.SEMICOLON + schema + Constants.SEMICOLON + 
table;
-    }
-
-    /**
-     * parse record schema identify
-     *
-     * @param schemaIdentify the schema identify of record
-     * @return the record schema identify array,String[]{database,table}
-     */
-    public String[] parseSchemaIdentify(String schemaIdentify) {
-        return schemaIdentify.split(Constants.SPILT_SEMICOLON);
+        return new SourceTableMetricData(metricOption, 
sourceMetricData.getMetricGroup(), tableMetricLabelList);
     }
 
     /**
@@ -176,22 +153,8 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
             outputMetricsWithEstimate(data);
             return;
         }
-        String identify = buildSchemaIdentify(database, null, table);
-        SourceMetricData subSourceMetricData;
-        if (subSourceMetricMap.containsKey(identify)) {
-            subSourceMetricData = subSourceMetricMap.get(identify);
-        } else {
-            subSourceMetricData = buildSubSourceMetricData(new 
String[]{database, table}, this);
-            subSourceMetricMap.put(identify, subSourceMetricData);
-        }
-        // source metric and sub source metric output metrics
-        long rowCountSize = 1L;
-        long rowDataSize = 
data.toString().getBytes(StandardCharsets.UTF_8).length;
-        this.outputMetrics(rowCountSize, rowDataSize);
-        subSourceMetricData.outputMetrics(rowCountSize, rowDataSize);
-
-        // output read phase metric
-        outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : 
ReadPhase.INCREASE_PHASE);
+        // output sub source metric
+        outputMetricsWithEstimate(new String[]{database, table}, 
isSnapshotRecord, data);
     }
 
     /**
@@ -209,12 +172,28 @@ public class SourceTableMetricData extends 
SourceMetricData implements SourceSub
             outputMetricsWithEstimate(data);
             return;
         }
-        String identify = buildSchemaIdentify(database, schema, table);
+        // output sub source metric
+        outputMetricsWithEstimate(new String[]{database, schema, table}, 
isSnapshotRecord, data);
+    }
+
+    /**
+     * output metrics with estimate
+     *
+     * @param recordSchemaInfoArray the schema info of record
+     * @param isSnapshotRecord is it snapshot record
+     * @param data the data of record
+     */
+    public void outputMetricsWithEstimate(String[] recordSchemaInfoArray, 
boolean isSnapshotRecord, Object data) {
+        if (recordSchemaInfoArray == null) {
+            outputMetricsWithEstimate(data);
+            return;
+        }
+        String identify = String.join(Constants.SEMICOLON, 
recordSchemaInfoArray);
         SourceMetricData subSourceMetricData;
         if (subSourceMetricMap.containsKey(identify)) {
             subSourceMetricData = subSourceMetricMap.get(identify);
         } else {
-            subSourceMetricData = buildSubSourceMetricData(new 
String[]{database, schema, table}, this);
+            subSourceMetricData = 
buildSubSourceMetricData(recordSchemaInfoArray, this);
             subSourceMetricMap.put(identify, subSourceMetricData);
         }
         // source metric and sub source metric output metrics
@@ -274,7 +253,9 @@ public class SourceTableMetricData extends SourceMetricData 
implements SourceSub
     @Override
     public String toString() {
         return "SourceTableMetricData{"
-                + "readPhaseMetricDataMap=" + readPhaseMetricDataMap
+                + "numRecordsIn=" + getNumRecordsIn().getCount()
+                + ", numBytesIn=" + getNumBytesIn().getCount()
+                + ", readPhaseMetricDataMap=" + readPhaseMetricDataMap
                 + ", subSourceMetricMap=" + subSourceMetricMap
                 + '}';
     }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index a446456cf..d99da8389 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -27,6 +27,7 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.Arrays;
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -50,6 +51,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffset;
 import 
org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffsetSerializer;
@@ -454,7 +456,8 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup);
+            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup,
+                    Arrays.asList(Constants.DATABASE_NAME, 
Constants.TABLE_NAME));
             if (migrateAll) {
                 // register sub source metric data from metric state
                 sourceMetricData.registerSubMetricsGroup(metricState);
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index c941eb083..2655ab7ac 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -21,10 +21,12 @@ import static 
org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
@@ -65,7 +67,8 @@ public class MySqlSourceReaderMetrics {
 
     public void registerMetrics(MetricOption metricOption) {
         if (metricOption != null) {
-            sourceTableMetricData = new SourceTableMetricData(metricOption, 
metricGroup);
+            sourceTableMetricData = new SourceTableMetricData(metricOption, 
metricGroup,
+                    Arrays.asList(Constants.DATABASE_NAME, 
Constants.TABLE_NAME));
         }
         metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) 
this::getFetchDelay);
         metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) 
this::getEmitDelay);
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
index d114d5834..37b628d03 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
@@ -35,6 +35,7 @@ import 
io.debezium.relational.history.TableChanges.TableChange;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.UUID;
@@ -67,6 +68,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
@@ -456,7 +458,8 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup);
+            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup,
+                    Arrays.asList(Constants.DATABASE_NAME, 
Constants.SCHEMA_NAME, Constants.TABLE_NAME));
             if (sourceMultipleEnable) {
                 // register sub source metric data from metric state
                 sourceMetricData.registerSubMetricsGroup(metricState);
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
index cc7f4c46b..ef3f744de 100644
--- 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
@@ -44,6 +44,7 @@ import 
io.debezium.relational.history.TableChanges.TableChange;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.UUID;
@@ -76,6 +77,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.base.metric.MetricOption;
@@ -456,7 +458,8 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup);
+            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup,
+                    Arrays.asList(Constants.DATABASE_NAME, 
Constants.SCHEMA_NAME, Constants.TABLE_NAME));
             if (migrateAll) {
                 // register sub source metric data from metric state
                 sourceMetricData.registerSubMetricsGroup(metricState);

Reply via email to