This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0e0fa5bf5 [INLONG-6925][Sort] Add read phase metric and table level metric for MongoDB-CDC (#7069) 0e0fa5bf5 is described below commit 0e0fa5bf5185d1b5ac2b1feb78fd82d4a872acfc Author: kuansix <490305...@qq.com> AuthorDate: Tue Dec 27 12:32:22 2022 +0800 [INLONG-6925][Sort] Add read phase metric and table level metric for MongoDB-CDC (#7069) --- .../org/apache/inlong/sort/base/Constants.java | 4 +++ .../sort/cdc/mongodb/DebeziumSourceFunction.java | 38 +++++++++++++++++++--- .../inlong/sort/cdc/mongodb/MongoDBSource.java | 9 ++++- .../MongoDBConnectorDeserializationSchema.java | 8 ++--- .../cdc/mongodb/debezium/utils/RecordUtils.java | 1 + .../sort/cdc/mongodb/table/MongoDBTableSource.java | 1 + 6 files changed, 51 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 10fc5ea43..7af393fe7 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -84,6 +84,10 @@ public final class Constants { * Table Name used in inlong metric */ public static final String TABLE_NAME = "table"; + /** + * Collection Name used in inlong metric + */ + public static final String COLLECTION_NAME = "collection"; /** * Read Phase used in inlong metric */ diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java index d79188e99..9ef76410f 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java @@ -17,7 +17,9 @@ package org.apache.inlong.sort.cdc.mongodb; +import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope; import com.ververica.cdc.debezium.Validator; +import io.debezium.connector.SnapshotRecord; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; import io.debezium.embedded.Connect; @@ -47,10 +49,12 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumDeserializationSchema; import org.apache.inlong.sort.cdc.mongodb.debezium.internal.DebeziumChangeConsumer; @@ -63,6 +67,8 @@ import org.apache.inlong.sort.cdc.mongodb.debezium.internal.FlinkOffsetBackingSt import org.apache.inlong.sort.cdc.mongodb.debezium.internal.Handover; import org.apache.inlong.sort.cdc.mongodb.debezium.internal.SchemaRecord; import org.apache.inlong.sort.cdc.mongodb.debezium.utils.DatabaseHistoryUtil; +import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +77,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collection; import java.util.Properties; import java.util.UUID; @@ -231,7 +238,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private String inlongAudit; - private SourceMetricData sourceMetricData; + private SourceTableMetricData sourceMetricData; + + private boolean migrateAll; private transient ListState<MetricState> metricStateListState; @@ -243,13 +252,15 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> DebeziumDeserializationSchema<T> deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, - Validator validator, String inlongMetric, String inlongAudit) { + Validator validator, String inlongMetric, + String inlongAudit, boolean migrateAll) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; this.inlongMetric = inlongMetric; this.inlongAudit = inlongAudit; + this.migrateAll = migrateAll; } @Override @@ -445,7 +456,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption, metricGroup); + sourceMetricData = new SourceTableMetricData(metricOption, metricGroup, + Arrays.asList(Constants.DATABASE_NAME, Constants.COLLECTION_NAME)); + if (migrateAll) { + // register sub source metric data from metric state + sourceMetricData.registerSubMetricsGroup(metricState); + } } properties.setProperty("name", "engine"); properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); @@ -485,7 +501,21 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { - if (sourceMetricData != null) { + if (sourceMetricData != null && record != null && migrateAll) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD); + if (null == source) { + source = value.getStruct(RecordUtils.DOCUMENT_TO_FIELD); + } + String dbName = source.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD); + String collectionName = + source.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD); + SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source); + boolean isSnapshotRecord = (SnapshotRecord.TRUE == snapshotRecord); + sourceMetricData + .outputMetricsWithEstimate(new String[]{dbName, collectionName}, + isSnapshotRecord, value); + } else if (sourceMetricData != null && record != null) { sourceMetricData.outputMetricsWithEstimate(record.value()); } deserializer.deserialize(record, out); diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java index ba6fc0862..be30c827d 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java @@ -140,6 +140,7 @@ public class MongoDBSource { private DebeziumDeserializationSchema<T> deserializer; private String inlongMetric; private String inlongAudit; + private boolean migrateAll; /** The comma-separated list of hostname and port pairs of mongodb servers. */ public Builder<T> hosts(String hosts) { @@ -327,6 +328,11 @@ public class MongoDBSource { return this; } + public Builder<T> migrateAll(Boolean migrateAll) { + this.migrateAll = migrateAll; + return this; + } + /** Build connection uri. */ @VisibleForTesting public ConnectionString buildConnectionUri() { @@ -446,7 +452,8 @@ public class MongoDBSource { Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME_DEFAULT); return new DebeziumSourceFunction<>( - deserializer, props, null, Validator.getDefaultValidator(), inlongMetric, inlongAudit); + deserializer, props, null, Validator.getDefaultValidator(), + inlongMetric, inlongAudit, migrateAll); } } } diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java index bdb6e18ed..dc1f71430 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java @@ -91,8 +91,6 @@ public class MongoDBConnectorDeserializationSchema /** TypeInformation of the produced {@link RowData}. */ private final TypeInformation<RowData> resultTypeInfo; - public static final String DOCUMENT_TO_FIELD = "to"; - /** Local Time zone. */ private final ZoneId localTimeZone; @@ -125,8 +123,7 @@ public class MongoDBConnectorDeserializationSchema ZoneId localTimeZone, RowKindValidator rowValidator, boolean sourceMultipleEnable) { - // this.hasMetadata = checkNotNull(metadataConverters).length > 0; - this.hasMetadata = true; + this.hasMetadata = checkNotNull(metadataConverters).length > 0; this.sourceMultipleEnable = sourceMultipleEnable; this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters, sourceMultipleEnable); this.physicalConverter = createConverter(physicalDataType); @@ -214,7 +211,8 @@ public class MongoDBConnectorDeserializationSchema if (!rowKindValidator.validate(MongoRowKind.RENAME)) { return; } - GenericRowData rename = extractMongoDMLData(value, DOCUMENT_TO_FIELD, OperationType.RENAME.getValue()); + GenericRowData rename = + extractMongoDMLData(value, RecordUtils.DOCUMENT_TO_FIELD, OperationType.RENAME.getValue()); rename.setRowKind(RowKind.INSERT); emit(record, rename, out); break; diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java index 4f23c6a06..fa0def5b4 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java @@ -44,6 +44,7 @@ import org.bson.BsonValue; */ public class RecordUtils { + public static final String DOCUMENT_TO_FIELD = "to"; private static final List<BsonType> INT_TYPE = Arrays.asList(BsonType.INT32, BsonType.INT64); private static final List<BsonType> BOOL_TYPE = Arrays.asList(BsonType.BOOLEAN); private static final List<BsonType> DOUBLE_TYPE = Arrays.asList(BsonType.DOUBLE, BsonType.DECIMAL128); diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java index dd6f319dc..2bddb9ace 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java @@ -198,6 +198,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis); Optional.ofNullable(inlongMetric).ifPresent(builder::inlongMetric); Optional.ofNullable(inlongAudit).ifPresent(builder::inlongAudit); + Optional.ofNullable(sourceMultipleEnable).ifPresent(builder::migrateAll); DebeziumSourceFunction<RowData> sourceFunction = builder.build(); return SourceFunctionProvider.of(sourceFunction, false);