This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new 9348626926 [INLONG-11357][Sort] Add new source metrics for 
sort-connector-sqlserver-cdc-v1.15 (#11358)
9348626926 is described below

commit 9348626926f1354b5b7e994926a9254d32136f77
Author: PeterZh6 <zhanghengyuan1...@outlook.com>
AuthorDate: Wed Oct 16 11:26:11 2024 +0800

    [INLONG-11357][Sort] Add new source metrics for 
sort-connector-sqlserver-cdc-v1.15 (#11358)
---
 .../sort/sqlserver/DebeziumSourceFunction.java     | 70 +++++++++++++++++----
 .../RowDataDebeziumDeserializeSchema.java          | 71 ++++++++++++++--------
 .../inlong/sort/sqlserver/SqlServerSource.java     | 11 +++-
 .../sort/sqlserver/SqlServerTableSource.java       |  1 +
 4 files changed, 113 insertions(+), 40 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
index 01118d6513..c480ad1d45 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.sqlserver;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
+
 import com.ververica.cdc.debezium.Validator;
 import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
 import com.ververica.cdc.debezium.internal.DebeziumOffset;
@@ -61,6 +64,8 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -199,17 +204,24 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
     /** Buffer the events from the source and record the errors from the 
debezium. */
     private transient Handover handover;
 
+    private transient SourceExactlyMetric sourceExactlyMetric;
+
+    private final MetricOption metricOption;
+
+    private transient Map<Long, Long> checkpointStartTimeMap;
+
     // 
---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
             DebeziumDeserializationSchema<T> deserializer,
             Properties properties,
             @Nullable DebeziumOffset specificOffset,
-            Validator validator) {
+            Validator validator, MetricOption metricOption) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
+        this.metricOption = metricOption;
     }
 
     @Override
@@ -222,6 +234,14 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
         this.executor = Executors.newSingleThreadExecutor(threadFactory);
         this.handover = new Handover();
         this.changeConsumer = new DebeziumChangeConsumer(handover);
+        if (metricOption != null) {
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
+        if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+            ((RowDataDebeziumDeserializeSchema) deserializer)
+                    .setSourceExactlyMetric(sourceExactlyMetric);
+        }
+        this.checkpointStartTimeMap = new HashMap<>();
     }
 
     // ------------------------------------------------------------------------
@@ -306,17 +326,33 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     @Override
     public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
-        if (handover.hasError()) {
-            LOG.debug("snapshotState() called on closed source");
-            throw new FlinkRuntimeException(
-                    "Call snapshotState() on closed source, checkpoint 
failed.");
-        } else {
-            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
-            snapshotHistoryRecordsState();
-        }
-        if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
-            ((RowDataDebeziumDeserializeSchema) deserializer)
-                    
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+        try {
+            if (handover.hasError()) {
+                LOG.debug("snapshotState() called on closed source");
+                throw new FlinkRuntimeException(
+                        "Call snapshotState() on closed source, checkpoint 
failed.");
+            } else {
+                snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+                snapshotHistoryRecordsState();
+            }
+            if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+                ((RowDataDebeziumDeserializeSchema) deserializer)
+                        
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+            }
+            if (checkpointStartTimeMap != null) {
+                
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), 
System.currentTimeMillis());
+            } else {
+                LOG.error("checkpointStartTimeMap is null, can't record the 
start time of checkpoint");
+            }
+
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotCreate();
+            }
+        } catch (Exception e) {
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotCreate();
+            }
+            throw e;
         }
     }
 
@@ -498,6 +534,16 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 schema.flushAudit();
                 schema.updateLastCheckpointId(checkpointId);
             }
+            if (checkpointStartTimeMap != null) {
+                Long snapShotStartTimeById = 
checkpointStartTimeMap.remove(checkpointId);
+                if (snapShotStartTimeById != null && sourceExactlyMetric != 
null) {
+                    sourceExactlyMetric.incNumSnapshotComplete();
+                    sourceExactlyMetric.recordSnapshotToCheckpointDelay(
+                            System.currentTimeMillis() - 
snapShotStartTimeById);
+                }
+            } else {
+                LOG.error("checkpointStartTimeMap is null, can't get the start 
time of checkpoint");
+            }
         } catch (Exception e) {
             // ignore exception if we are no longer running
             LOG.warn("Ignore error when committing offset to database.", e);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
index d90f470513..394ee0297b 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
@@ -139,37 +139,49 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
 
     @Override
     public void deserialize(SourceRecord record, Collector<RowData> out) 
throws Exception {
-        Envelope.Operation op = Envelope.operationFor(record);
-        Struct value = (Struct) record.value();
-        Schema valueSchema = record.valueSchema();
-        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
-            GenericRowData insert = extractAfterRow(value, valueSchema);
-            validator.validate(insert, RowKind.INSERT);
-            insert.setRowKind(RowKind.INSERT);
-            if (sourceExactlyMetric != null) {
-                out = new MetricsCollector<>(out, sourceExactlyMetric);
+        long deserializeStartTime = System.currentTimeMillis();
+        try {
+            Envelope.Operation op = Envelope.operationFor(record);
+            Struct value = (Struct) record.value();
+            Schema valueSchema = record.valueSchema();
+            if (op == Envelope.Operation.CREATE || op == 
Envelope.Operation.READ) {
+                GenericRowData insert = extractAfterRow(value, valueSchema);
+                validator.validate(insert, RowKind.INSERT);
+                insert.setRowKind(RowKind.INSERT);
+                if (sourceExactlyMetric != null) {
+                    out = new MetricsCollector<>(out, sourceExactlyMetric);
+                }
+                emit(record, insert, out);
+            } else if (op == Envelope.Operation.DELETE) {
+                GenericRowData delete = extractBeforeRow(value, valueSchema);
+                validator.validate(delete, RowKind.DELETE);
+                delete.setRowKind(RowKind.DELETE);
+                emit(record, delete, out);
+            } else {
+                if (changelogMode == DebeziumChangelogMode.ALL) {
+                    GenericRowData before = extractBeforeRow(value, 
valueSchema);
+                    validator.validate(before, RowKind.UPDATE_BEFORE);
+                    before.setRowKind(RowKind.UPDATE_BEFORE);
+                    emit(record, before, out);
+                }
+
+                GenericRowData after = extractAfterRow(value, valueSchema);
+                validator.validate(after, RowKind.UPDATE_AFTER);
+                after.setRowKind(RowKind.UPDATE_AFTER);
+                if (sourceExactlyMetric != null) {
+                    out = new MetricsCollector<>(out, sourceExactlyMetric);
+                }
+                emit(record, after, out);
             }
-            emit(record, insert, out);
-        } else if (op == Envelope.Operation.DELETE) {
-            GenericRowData delete = extractBeforeRow(value, valueSchema);
-            validator.validate(delete, RowKind.DELETE);
-            delete.setRowKind(RowKind.DELETE);
-            emit(record, delete, out);
-        } else {
-            if (changelogMode == DebeziumChangelogMode.ALL) {
-                GenericRowData before = extractBeforeRow(value, valueSchema);
-                validator.validate(before, RowKind.UPDATE_BEFORE);
-                before.setRowKind(RowKind.UPDATE_BEFORE);
-                emit(record, before, out);
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumDeserializeSuccess();
+                
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - 
deserializeStartTime);
             }
-
-            GenericRowData after = extractAfterRow(value, valueSchema);
-            validator.validate(after, RowKind.UPDATE_AFTER);
-            after.setRowKind(RowKind.UPDATE_AFTER);
+        } catch (Exception e) {
             if (sourceExactlyMetric != null) {
-                out = new MetricsCollector<>(out, sourceExactlyMetric);
+                sourceExactlyMetric.incNumDeserializeError();
             }
-            emit(record, after, out);
+            throw e;
         }
     }
 
@@ -697,4 +709,9 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             sourceExactlyMetric.updateLastCheckpointId(checkpointId);
         }
     }
+
+    /** allow DebeziumSourceFunction to set the SourceExactlyMetric */
+    public void setSourceExactlyMetric(SourceExactlyMetric 
sourceExactlyMetric) {
+        this.sourceExactlyMetric = sourceExactlyMetric;
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
index 6a094521a5..92353bf0cf 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.sqlserver;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+
 import com.ververica.cdc.connectors.sqlserver.SqlServerValidator;
 import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
 import io.debezium.connector.sqlserver.SqlServerConnector;
@@ -51,6 +53,7 @@ public class SqlServerSource {
         private Properties dbzProperties;
         private StartupOptions startupOptions = StartupOptions.initial();
         private DebeziumDeserializationSchema<T> deserializer;
+        private MetricOption metricOption;
 
         public Builder<T> hostname(String hostname) {
             this.hostname = hostname;
@@ -114,6 +117,12 @@ public class SqlServerSource {
             return this;
         }
 
+        /** metricOption used to instantiate SourceExactlyMetric when 
inlong.metric.labels is present in flink sql */
+        public Builder<T> metricOption(MetricOption metricOption) {
+            this.metricOption = metricOption;
+            return this;
+        }
+
         public DebeziumSourceFunction<T> build() {
             Properties props = new Properties();
             props.setProperty("connector.class", 
SqlServerConnector.class.getCanonicalName());
@@ -154,7 +163,7 @@ public class SqlServerSource {
             }
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, null, new SqlServerValidator(props));
+                    deserializer, props, null, new SqlServerValidator(props), 
metricOption);
         }
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
index c49dd9747a..87defcedca 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
@@ -144,6 +144,7 @@ public class SqlServerTableSource implements 
ScanTableSource, SupportsReadingMet
                         .debeziumProperties(dbzProperties)
                         .startupOptions(startupOptions)
                         .deserializer(deserializer)
+                        .metricOption(metricOption)
                         .build();
         return SourceFunctionProvider.of(sourceFunction, false);
     }

Reply via email to