This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a672e365 [INLONG-6962][Sort] Add read phase metric and table level 
metric for MySQL-CDC (#6966)
0a672e365 is described below

commit 0a672e3651680bc420d28305d679051c9866931d
Author: chestnufang <65438734+chestnu...@users.noreply.github.com>
AuthorDate: Mon Dec 19 21:49:03 2022 +0800

    [INLONG-6962][Sort] Add read phase metric and table level metric for 
MySQL-CDC (#6966)
    
    Co-authored-by: chestnufang <chestnuf...@tencent.com>
---
 .../source/metrics/MySqlSourceReaderMetrics.java   | 50 +++++++++++----
 .../mysql/source/reader/MySqlRecordEmitter.java    | 10 ++-
 .../cdc/mysql/source/reader/MySqlSourceReader.java | 22 +++++--
 .../cdc/mysql/source/split/MySqlMetricSplit.java   | 74 +++++++++++++++++++++-
 .../mysql/source/split/MySqlSplitSerializer.java   | 48 +++++++++++++-
 5 files changed, 181 insertions(+), 23 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 1ebd1c7a5..c941eb083 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -17,11 +17,19 @@
 
 package org.apache.inlong.sort.cdc.mysql.source.metrics;
 
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
 import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
+import 
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
 
 /**
  * A collection class for handling metrics in {@link MySqlSourceReader}.
@@ -49,7 +57,7 @@ public class MySqlSourceReaderMetrics {
      */
     private volatile long emitDelay = 0L;
 
-    private SourceMetricData sourceMetricData;
+    private SourceTableMetricData sourceTableMetricData;
 
     public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
@@ -57,7 +65,7 @@ public class MySqlSourceReaderMetrics {
 
     public void registerMetrics(MetricOption metricOption) {
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+            sourceTableMetricData = new SourceTableMetricData(metricOption, 
metricGroup);
         }
         metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) 
this::getFetchDelay);
         metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) 
this::getEmitDelay);
@@ -92,20 +100,38 @@ public class MySqlSourceReaderMetrics {
         this.emitDelay = emitDelay;
     }
 
-    public void outputMetrics(long rowCountSize, long rowDataSize) {
-        if (sourceMetricData != null) {
-            sourceMetricData.outputMetrics(rowCountSize, rowDataSize);
+    public void outputMetrics(String database, String table, boolean 
isSnapshotRecord, Object data) {
+        if (sourceTableMetricData != null) {
+            sourceTableMetricData.outputMetricsWithEstimate(database, table, 
isSnapshotRecord, data);
         }
     }
 
-    public void initMetrics(long rowCountSize, long rowDataSize) {
-        if (sourceMetricData != null) {
-            sourceMetricData.getNumBytesIn().inc(rowDataSize);
-            sourceMetricData.getNumRecordsIn().inc(rowCountSize);
+    public void initMetrics(long rowCountSize, long rowDataSize, Map<String, 
Long> readPhaseMetricMap,
+            Map<String, MySqlTableMetric> tableMetricMap) {
+        if (sourceTableMetricData != null) {
+            // node level metric data
+            sourceTableMetricData.getNumBytesIn().inc(rowDataSize);
+            sourceTableMetricData.getNumRecordsIn().inc(rowCountSize);
+
+            // register read phase metric data and table level metric data
+            if (readPhaseMetricMap != null && tableMetricMap != null) {
+                MetricState metricState = new MetricState();
+                metricState.setMetrics(readPhaseMetricMap);
+                Map<String, MetricState> subMetricStateMap = new HashMap<>();
+                tableMetricMap.entrySet().stream().filter(v -> v.getValue() != 
null).forEach(entry -> {
+                    MetricState subMetricState = new MetricState();
+                    subMetricState.setMetrics(ImmutableMap
+                            .of(NUM_RECORDS_IN, 
entry.getValue().getNumRecordsIn(), NUM_BYTES_IN,
+                                    entry.getValue().getNumBytesIn()));
+                    subMetricStateMap.put(entry.getKey(), subMetricState);
+                });
+                metricState.setSubMetricStateMap(subMetricStateMap);
+                sourceTableMetricData.registerSubMetricsGroup(metricState);
+            }
         }
     }
 
-    public SourceMetricData getSourceMetricData() {
-        return sourceMetricData;
+    public SourceTableMetricData getSourceMetricData() {
+        return sourceTableMetricData;
     }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index e09b3ee88..eb961df00 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.cdc.mysql.source.reader;
 
 import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
+import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
 import io.debezium.document.Array;
 import io.debezium.relational.TableId;
@@ -38,7 +39,6 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
@@ -146,8 +146,12 @@ public final class MySqlRecordEmitter<T>
 
                         @Override
                         public void collect(final T t) {
-                            long byteNum = 
t.toString().getBytes(StandardCharsets.UTF_8).length;
-                            sourceReaderMetrics.outputMetrics(1L, byteNum);
+                            Struct value = (Struct) element.value();
+                            Struct source = 
value.getStruct(Envelope.FieldName.SOURCE);
+                            String databaseName = 
source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
+                            String tableName = 
source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+
+                            sourceReaderMetrics.outputMetrics(databaseName, 
tableName, iSnapShot, t);
                             output.collect(t);
                         }
 
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 29a4a1282..0a7ea82c6 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mysql.source.reader;
 import io.debezium.connector.mysql.MySqlConnection;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
+import java.util.Map.Entry;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
@@ -28,7 +29,7 @@ import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
 import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
 import org.apache.inlong.sort.cdc.mysql.source.events.BinlogSplitMetaEvent;
@@ -47,6 +48,7 @@ import 
org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit;
+import 
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplitState;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
@@ -147,12 +149,19 @@ public class MySqlSourceReader<T>
         if (suspendedBinlogSplit != null) {
             unfinishedSplits.add(suspendedBinlogSplit);
         }
-        SourceMetricData sourceMetricData = 
sourceReaderMetrics.getSourceMetricData();
+        SourceTableMetricData sourceMetricData = 
sourceReaderMetrics.getSourceMetricData();
         LOG.info("inlong-metric-states snapshot sourceMetricData:{}", 
sourceMetricData);
         if (sourceMetricData != null) {
-            unfinishedSplits.add(
-                    new 
MySqlMetricSplit(sourceMetricData.getNumBytesIn().getCount(),
-                            sourceMetricData.getNumRecordsIn().getCount()));
+            long countNumBytesIn = sourceMetricData.getNumBytesIn().getCount();
+            long countNumRecordsIn = 
sourceMetricData.getNumRecordsIn().getCount();
+            Map<String, Long> readPhaseMetricMap = 
sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect(
+                    Collectors.toMap(v -> v.getKey().getPhase(), e -> 
e.getValue().getReadPhase().getCount()));
+            Map<String, MySqlTableMetric> tableMetricMap = 
sourceMetricData.getSubSourceMetricMap().entrySet().stream()
+                    .collect(Collectors.toMap(Entry::getKey,
+                            e -> new 
MySqlTableMetric(e.getValue().getNumRecordsIn().getCount(),
+                                    e.getValue().getNumBytesIn().getCount())));
+            unfinishedSplits
+                    .add(new MySqlMetricSplit(countNumBytesIn, 
countNumRecordsIn, readPhaseMetricMap, tableMetricMap));
         }
         return unfinishedSplits;
     }
@@ -187,7 +196,8 @@ public class MySqlSourceReader<T>
                 MySqlMetricSplit mysqlMetricSplit = (MySqlMetricSplit) split;
                 LOG.info("inlong-metric-states restore metricSplit:{}", 
mysqlMetricSplit);
                 
sourceReaderMetrics.initMetrics(mysqlMetricSplit.getNumRecordsIn(),
-                        mysqlMetricSplit.getNumBytesIn());
+                        mysqlMetricSplit.getNumBytesIn(), 
mysqlMetricSplit.getReadPhaseMetricMap(),
+                        mysqlMetricSplit.getTableMetricMap());
                 LOG.info("inlong-metric-states restore sourceReaderMetrics:{}",
                         sourceReaderMetrics.getSourceMetricData());
                 continue;
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
index 368d97260..c9d832207 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mysql.source.split;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges.TableChange;
 
+import java.io.Serializable;
 import java.util.Map;
 
 /**
@@ -31,6 +32,16 @@ public class MySqlMetricSplit extends MySqlSplit {
 
     private Long numBytesIn = 0L;
 
+    /**
+     * The table level metric in a split of mysql metric.
+     */
+    private Map<String, MySqlTableMetric> tableMetricMap;
+
+    /**
+     * The read phase timestamp metric in a split of mysql metric.
+     */
+    private Map<String, Long> readPhaseMetricMap;
+
     public Long getNumRecordsIn() {
         return numRecordsIn;
     }
@@ -47,14 +58,34 @@ public class MySqlMetricSplit extends MySqlSplit {
         this.numBytesIn = numBytesIn;
     }
 
+    public Map<String, MySqlTableMetric> getTableMetricMap() {
+        return tableMetricMap;
+    }
+
+    public void setTableMetricMap(
+            Map<String, MySqlTableMetric> tableMetricMap) {
+        this.tableMetricMap = tableMetricMap;
+    }
+
+    public Map<String, Long> getReadPhaseMetricMap() {
+        return readPhaseMetricMap;
+    }
+
+    public void setReadPhaseMetricMap(Map<String, Long> readPhaseMetricMap) {
+        this.readPhaseMetricMap = readPhaseMetricMap;
+    }
+
     public MySqlMetricSplit(String splitId) {
         super(splitId);
     }
 
-    public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn) {
+    public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn, Map<String, 
Long> readPhaseMetricMap,
+            Map<String, MySqlTableMetric> tableMetricMap) {
         this("");
         this.numBytesIn = numBytesIn;
         this.numRecordsIn = numRecordsIn;
+        this.readPhaseMetricMap = readPhaseMetricMap;
+        this.tableMetricMap = tableMetricMap;
     }
 
     public void setMetricData(long count, long byteNum) {
@@ -72,6 +103,47 @@ public class MySqlMetricSplit extends MySqlSplit {
         return "MysqlMetricSplit{"
                 + "numRecordsIn=" + numRecordsIn
                 + ", numBytesIn=" + numBytesIn
+                + ", tableMetricMap=" + tableMetricMap
+                + ", readPhaseMetricMap=" + readPhaseMetricMap
                 + '}';
     }
+
+    /**
+     * The mysql table level metric in a split of mysql metric.
+     */
+    public static class MySqlTableMetric implements Serializable {
+
+        private Long numRecordsIn;
+
+        private Long numBytesIn;
+
+        public MySqlTableMetric(Long numRecordsIn, Long numBytesIn) {
+            this.numRecordsIn = numRecordsIn;
+            this.numBytesIn = numBytesIn;
+        }
+
+        public Long getNumRecordsIn() {
+            return numRecordsIn;
+        }
+
+        public void setNumRecordsIn(Long numRecordsIn) {
+            this.numRecordsIn = numRecordsIn;
+        }
+
+        public Long getNumBytesIn() {
+            return numBytesIn;
+        }
+
+        public void setNumBytesIn(Long numBytesIn) {
+            this.numBytesIn = numBytesIn;
+        }
+
+        @Override
+        public String toString() {
+            return "MySqlTableMetric{"
+                    + "numRecordsIn=" + numRecordsIn
+                    + ", numBytesIn=" + numBytesIn
+                    + '}';
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 1f1247b21..325ef4fca 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import 
org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric;
 
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.readBinlogPosition;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.rowToSerializedString;
@@ -134,6 +135,46 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
         return finishedSplitsInfo;
     }
 
+    private static void writeReadPhaseMetric(Map<String, Long> 
readPhaseMetrics, DataOutputSerializer out)
+            throws IOException {
+        final int size = readPhaseMetrics.size();
+        out.writeInt(size);
+        for (Map.Entry<String, Long> entry : readPhaseMetrics.entrySet()) {
+            out.writeUTF(entry.getKey());
+            out.writeLong(entry.getValue());
+        }
+    }
+
+    private static Map<String, Long> readReadPhaseMetric(DataInputDeserializer 
in) throws IOException {
+        Map<String, Long> readPhaseMetrics = new HashMap<>();
+        final int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            readPhaseMetrics.put(in.readUTF(), in.readLong());
+        }
+        return readPhaseMetrics;
+    }
+
+    private static void writeTableMetrics(Map<String, MySqlTableMetric> 
tableMetrics, DataOutputSerializer out)
+            throws IOException {
+        final int size = tableMetrics.size();
+        out.writeInt(size);
+        for (Map.Entry<String, MySqlTableMetric> entry : 
tableMetrics.entrySet()) {
+            out.writeUTF(entry.getKey());
+            out.writeLong(entry.getValue().getNumRecordsIn());
+            out.writeLong(entry.getValue().getNumBytesIn());
+        }
+    }
+
+    private static Map<String, MySqlTableMetric> 
readTableMetrics(DataInputDeserializer in) throws IOException {
+        Map<String, MySqlTableMetric> tableMetrics = new HashMap<>();
+        final int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            String tableIdentify = in.readUTF();
+            tableMetrics.put(tableIdentify, new 
MySqlTableMetric(in.readLong(), in.readLong()));
+        }
+        return tableMetrics;
+    }
+
     @Override
     public int getVersion() {
         return VERSION;
@@ -195,6 +236,8 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
             out.writeInt(METRIC_SPLIT_FLAG);
             out.writeLong(mysqlMetricSplit.getNumBytesIn());
             out.writeLong(mysqlMetricSplit.getNumRecordsIn());
+            writeReadPhaseMetric(mysqlMetricSplit.getReadPhaseMetricMap(), 
out);
+            writeTableMetrics(mysqlMetricSplit.getTableMetricMap(), out);
             final byte[] result = out.getCopyOfBuffer();
             out.clear();
             return result;
@@ -267,7 +310,10 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
         } else if (splitKind == METRIC_SPLIT_FLAG) {
             long numBytesIn = in.readLong();
             long numRecordsIn = in.readLong();
-            return new MySqlMetricSplit(numBytesIn, numRecordsIn);
+            Map<String, Long> readPhaseMetricMap = readReadPhaseMetric(in);
+            Map<String, MySqlTableMetric> tableMetricMap = 
readTableMetrics(in);
+
+            return new MySqlMetricSplit(numBytesIn, numRecordsIn, 
readPhaseMetricMap, tableMetricMap);
         } else {
             throw new IOException("Unknown split kind: " + splitKind);
         }

Reply via email to