This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e0fa5bf5 [INLONG-6925][Sort] Add read phase metric and table level 
metric for MongoDB-CDC (#7069)
0e0fa5bf5 is described below

commit 0e0fa5bf5185d1b5ac2b1feb78fd82d4a872acfc
Author: kuansix <490305...@qq.com>
AuthorDate: Tue Dec 27 12:32:22 2022 +0800

    [INLONG-6925][Sort] Add read phase metric and table level metric for 
MongoDB-CDC (#7069)
---
 .../org/apache/inlong/sort/base/Constants.java     |  4 +++
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   | 38 +++++++++++++++++++---
 .../inlong/sort/cdc/mongodb/MongoDBSource.java     |  9 ++++-
 .../MongoDBConnectorDeserializationSchema.java     |  8 ++---
 .../cdc/mongodb/debezium/utils/RecordUtils.java    |  1 +
 .../sort/cdc/mongodb/table/MongoDBTableSource.java |  1 +
 6 files changed, 51 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 10fc5ea43..7af393fe7 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -84,6 +84,10 @@ public final class Constants {
      * Table Name used in inlong metric
      */
     public static final String TABLE_NAME = "table";
+    /**
+     * Collection Name used in inlong metric
+     */
+    public static final String COLLECTION_NAME = "collection";
     /**
      * Read Phase used in inlong metric
      */
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index d79188e99..9ef76410f 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sort.cdc.mongodb;
 
+import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
 import com.ververica.cdc.debezium.Validator;
+import io.debezium.connector.SnapshotRecord;
 import io.debezium.document.DocumentReader;
 import io.debezium.document.DocumentWriter;
 import io.debezium.embedded.Connect;
@@ -47,10 +49,12 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import 
org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumDeserializationSchema;
 import 
org.apache.inlong.sort.cdc.mongodb.debezium.internal.DebeziumChangeConsumer;
@@ -63,6 +67,8 @@ import 
org.apache.inlong.sort.cdc.mongodb.debezium.internal.FlinkOffsetBackingSt
 import org.apache.inlong.sort.cdc.mongodb.debezium.internal.Handover;
 import org.apache.inlong.sort.cdc.mongodb.debezium.internal.SchemaRecord;
 import org.apache.inlong.sort.cdc.mongodb.debezium.utils.DatabaseHistoryUtil;
+import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +77,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.UUID;
@@ -231,7 +238,9 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     private String inlongAudit;
 
-    private SourceMetricData sourceMetricData;
+    private SourceTableMetricData sourceMetricData;
+
+    private boolean migrateAll;
 
     private transient ListState<MetricState> metricStateListState;
 
@@ -243,13 +252,15 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             DebeziumDeserializationSchema<T> deserializer,
             Properties properties,
             @Nullable DebeziumOffset specificOffset,
-            Validator validator, String inlongMetric, String inlongAudit) {
+            Validator validator, String inlongMetric,
+            String inlongAudit, boolean migrateAll) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
+        this.migrateAll = migrateAll;
     }
 
     @Override
@@ -445,7 +456,12 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup,
+                    Arrays.asList(Constants.DATABASE_NAME, 
Constants.COLLECTION_NAME));
+            if (migrateAll) {
+                // register sub source metric data from metric state
+                sourceMetricData.registerSubMetricsGroup(metricState);
+            }
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", 
FlinkOffsetBackingStore.class.getCanonicalName());
@@ -485,7 +501,21 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
                             @Override
                             public void deserialize(SourceRecord record, 
Collector<T> out) throws Exception {
-                                if (sourceMetricData != null) {
+                                if (sourceMetricData != null && record != null 
&& migrateAll) {
+                                    Struct value = (Struct) record.value();
+                                    Struct source = 
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
+                                    if (null == source) {
+                                        source = 
value.getStruct(RecordUtils.DOCUMENT_TO_FIELD);
+                                    }
+                                    String dbName = 
source.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
+                                    String collectionName =
+                                            
source.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
+                                    SnapshotRecord snapshotRecord = 
SnapshotRecord.fromSource(source);
+                                    boolean isSnapshotRecord = 
(SnapshotRecord.TRUE == snapshotRecord);
+                                    sourceMetricData
+                                            .outputMetricsWithEstimate(new 
String[]{dbName, collectionName},
+                                                    isSnapshotRecord, value);
+                                } else if (sourceMetricData != null && record 
!= null) {
                                     
sourceMetricData.outputMetricsWithEstimate(record.value());
                                 }
                                 deserializer.deserialize(record, out);
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
index ba6fc0862..be30c827d 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -140,6 +140,7 @@ public class MongoDBSource {
         private DebeziumDeserializationSchema<T> deserializer;
         private String inlongMetric;
         private String inlongAudit;
+        private boolean migrateAll;
 
         /** The comma-separated list of hostname and port pairs of mongodb 
servers. */
         public Builder<T> hosts(String hosts) {
@@ -327,6 +328,11 @@ public class MongoDBSource {
             return this;
         }
 
+        public Builder<T> migrateAll(Boolean migrateAll) {
+            this.migrateAll = migrateAll;
+            return this;
+        }
+
         /** Build connection uri. */
         @VisibleForTesting
         public ConnectionString buildConnectionUri() {
@@ -446,7 +452,8 @@ public class MongoDBSource {
                     Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), 
HEARTBEAT_TOPIC_NAME_DEFAULT);
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, null, 
Validator.getDefaultValidator(), inlongMetric, inlongAudit);
+                    deserializer, props, null, Validator.getDefaultValidator(),
+                    inlongMetric, inlongAudit, migrateAll);
         }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index bdb6e18ed..dc1f71430 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -91,8 +91,6 @@ public class MongoDBConnectorDeserializationSchema
     /** TypeInformation of the produced {@link RowData}. */
     private final TypeInformation<RowData> resultTypeInfo;
 
-    public static final String DOCUMENT_TO_FIELD = "to";
-
     /** Local Time zone. */
     private final ZoneId localTimeZone;
 
@@ -125,8 +123,7 @@ public class MongoDBConnectorDeserializationSchema
             ZoneId localTimeZone,
             RowKindValidator rowValidator,
             boolean sourceMultipleEnable) {
-        // this.hasMetadata = checkNotNull(metadataConverters).length > 0;
-        this.hasMetadata = true;
+        this.hasMetadata = checkNotNull(metadataConverters).length > 0;
         this.sourceMultipleEnable = sourceMultipleEnable;
         this.appendMetadataCollector = new 
AppendMetadataCollector(metadataConverters, sourceMultipleEnable);
         this.physicalConverter = createConverter(physicalDataType);
@@ -214,7 +211,8 @@ public class MongoDBConnectorDeserializationSchema
                 if (!rowKindValidator.validate(MongoRowKind.RENAME)) {
                     return;
                 }
-                GenericRowData rename = extractMongoDMLData(value, 
DOCUMENT_TO_FIELD, OperationType.RENAME.getValue());
+                GenericRowData rename =
+                        extractMongoDMLData(value, 
RecordUtils.DOCUMENT_TO_FIELD, OperationType.RENAME.getValue());
                 rename.setRowKind(RowKind.INSERT);
                 emit(record, rename, out);
                 break;
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
index 4f23c6a06..fa0def5b4 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
@@ -44,6 +44,7 @@ import org.bson.BsonValue;
  */
 public class RecordUtils {
 
+    public static final String DOCUMENT_TO_FIELD = "to";
     private static final List<BsonType> INT_TYPE = 
Arrays.asList(BsonType.INT32, BsonType.INT64);
     private static final List<BsonType> BOOL_TYPE = 
Arrays.asList(BsonType.BOOLEAN);
     private static final List<BsonType> DOUBLE_TYPE = 
Arrays.asList(BsonType.DOUBLE, BsonType.DECIMAL128);
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index dd6f319dc..2bddb9ace 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -198,6 +198,7 @@ public class MongoDBTableSource implements ScanTableSource, 
SupportsReadingMetad
         
Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
         Optional.ofNullable(inlongMetric).ifPresent(builder::inlongMetric);
         Optional.ofNullable(inlongAudit).ifPresent(builder::inlongAudit);
+        
Optional.ofNullable(sourceMultipleEnable).ifPresent(builder::migrateAll);
         DebeziumSourceFunction<RowData> sourceFunction = builder.build();
 
         return SourceFunctionProvider.of(sourceFunction, false);

Reply via email to