This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 042c6b2a7 [INLONG-6318][Sort] MySQL connector supports snapshots and 
restores the metric state (#6319)
042c6b2a7 is described below

commit 042c6b2a7bd33fb7346ca3173e208b15b16da7c7
Author: Xin Gong <genzhedangd...@gmail.com>
AuthorDate: Mon Nov 7 11:40:03 2022 +0800

    [INLONG-6318][Sort] MySQL connector supports snapshots and restores the 
metric state (#6319)
---
 .../sort/cdc/debezium/DebeziumSourceFunction.java  | 21 ++++++
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  |  2 +-
 .../source/metrics/MySqlSourceReaderMetrics.java   | 11 +++
 .../mysql/source/reader/MySqlRecordEmitter.java    |  4 +-
 .../cdc/mysql/source/reader/MySqlSourceReader.java | 26 +++++++-
 .../cdc/mysql/source/split/MySqlMetricSplit.java   | 78 ++++++++++++++++++++++
 .../sort/cdc/mysql/source/split/MySqlSplit.java    |  8 +++
 .../mysql/source/split/MySqlSplitSerializer.java   | 16 ++++-
 8 files changed, 160 insertions(+), 6 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index 7d7b9bcd0..80d6812f2 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
@@ -49,7 +50,9 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumOffset;
@@ -76,6 +79,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static 
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static 
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
 
@@ -227,6 +231,10 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     private SourceMetricData sourceMetricData;
 
+    private transient ListState<MetricState> metricStateListState;
+
+    private MetricState metricState;
+
     // 
---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
@@ -271,10 +279,19 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 stateStore.getUnionListState(
                         new ListStateDescriptor<>(
                                 HISTORY_RECORDS_STATE_NAME, 
BasicTypeInfo.STRING_TYPE_INFO));
+        if (this.inlongMetric != null) {
+            this.metricStateListState =
+                    stateStore.getUnionListState(
+                            new ListStateDescriptor<>(
+                                    INLONG_METRIC_STATE_NAME, 
TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
 
         if (context.isRestored()) {
             restoreOffsetState();
             restoreHistoryRecordsState();
+            metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
         } else {
             if (specificOffset != null) {
                 byte[] serializedOffset =
@@ -344,6 +361,10 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
         } else {
             snapshotOffsetState(functionSnapshotContext.getCheckpointId());
             snapshotHistoryRecordsState();
+            if (sourceMetricData != null && metricStateListState != null) {
+                
MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, 
sourceMetricData,
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
         }
     }
 
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index ea102f429..60484ddf4 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -167,7 +167,7 @@ public class MySqlSource<T>
                         sourceConfig.isIncludeSchemaChanges()),
                 readerContext.getConfiguration(),
                 mySqlSourceReaderContext,
-                sourceConfig);
+                sourceConfig, sourceReaderMetrics);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 45c81e560..19ca7c570 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -98,4 +98,15 @@ public class MySqlSourceReaderMetrics {
             sourceMetricData.outputMetrics(rowCountSize, rowDataSize);
         }
     }
+
+    public void initMetrics(long rowCountSize, long rowDataSize) {
+        if (sourceMetricData != null) {
+            sourceMetricData.getNumBytesIn().inc(rowDataSize);
+            sourceMetricData.getNumRecordsIn().inc(rowCountSize);
+        }
+    }
+
+    public SourceMetricData getSourceMetricData() {
+        return sourceMetricData;
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index d2cc328f9..f0fe28b57 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -145,8 +145,8 @@ public final class MySqlRecordEmitter<T>
                     new Collector<T>() {
                         @Override
                         public void collect(final T t) {
-                            sourceReaderMetrics.outputMetrics(1L,
-                                    
t.toString().getBytes(StandardCharsets.UTF_8).length);
+                            long byteNum = 
t.toString().getBytes(StandardCharsets.UTF_8).length;
+                            sourceReaderMetrics.outputMetrics(1L, byteNum);
                             output.collect(t);
                         }
 
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 07c96d542..140db93fb 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
 import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
 import org.apache.inlong.sort.cdc.mysql.source.events.BinlogSplitMetaEvent;
@@ -41,10 +42,12 @@ import 
org.apache.inlong.sort.cdc.mysql.source.events.LatestFinishedSplitsSizeRe
 import 
org.apache.inlong.sort.cdc.mysql.source.events.SuspendBinlogReaderAckEvent;
 import org.apache.inlong.sort.cdc.mysql.source.events.SuspendBinlogReaderEvent;
 import org.apache.inlong.sort.cdc.mysql.source.events.WakeupReaderEvent;
+import 
org.apache.inlong.sort.cdc.mysql.source.metrics.MySqlSourceReaderMetrics;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
 import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplitState;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit;
@@ -84,6 +87,7 @@ public class MySqlSourceReader<T>
     private final int subtaskId;
     private final MySqlSourceReaderContext mySqlSourceReaderContext;
     private MySqlBinlogSplit suspendedBinlogSplit;
+    private MySqlSourceReaderMetrics sourceReaderMetrics;
 
     public MySqlSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> 
elementQueue,
@@ -91,7 +95,8 @@ public class MySqlSourceReader<T>
             RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter,
             Configuration config,
             MySqlSourceReaderContext context,
-            MySqlSourceConfig sourceConfig) {
+            MySqlSourceConfig sourceConfig,
+            MySqlSourceReaderMetrics sourceReaderMetrics) {
         super(
                 elementQueue,
                 new SingleThreadFetcherManager<>(elementQueue, 
splitReaderSupplier::get),
@@ -104,6 +109,7 @@ public class MySqlSourceReader<T>
         this.subtaskId = context.getSourceReaderContext().getIndexOfSubtask();
         this.mySqlSourceReaderContext = context;
         this.suspendedBinlogSplit = null;
+        this.sourceReaderMetrics = sourceReaderMetrics;
     }
 
     @Override
@@ -142,6 +148,13 @@ public class MySqlSourceReader<T>
         if (suspendedBinlogSplit != null) {
             unfinishedSplits.add(suspendedBinlogSplit);
         }
+        SourceMetricData sourceMetricData = 
sourceReaderMetrics.getSourceMetricData();
+        LOG.info("inlong-metric-states snapshot sourceMetricData:{}", 
sourceMetricData);
+        if (sourceMetricData != null) {
+            unfinishedSplits.add(
+                    new 
MySqlMetricSplit(sourceMetricData.getNumBytesIn().getCount(),
+                            sourceMetricData.getNumRecordsIn().getCount()));
+        }
         return unfinishedSplits;
     }
 
@@ -171,6 +184,15 @@ public class MySqlSourceReader<T>
         List<MySqlSplit> unfinishedSplits = new ArrayList<>();
         for (MySqlSplit split : splits) {
             LOG.info("Add Split: " + split);
+            if (split.isMetricSplit()) {
+                MySqlMetricSplit mysqlMetricSplit = (MySqlMetricSplit) split;
+                LOG.info("inlong-metric-states restore metricSplit:{}", 
mysqlMetricSplit);
+                
sourceReaderMetrics.initMetrics(mysqlMetricSplit.getNumRecordsIn(),
+                        mysqlMetricSplit.getNumBytesIn());
+                LOG.info("inlong-metric-states restore sourceReaderMetrics:{}",
+                        sourceReaderMetrics.getSourceMetricData());
+                continue;
+            }
             if (split.isSnapshotSplit()) {
                 MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
                 if (snapshotSplit.isSnapshotReadFinished()) {
@@ -206,7 +228,7 @@ public class MySqlSourceReader<T>
         final String splitId = split.splitId();
         if (split.getTableSchemas().isEmpty()) {
             try (MySqlConnection jdbc =
-                         
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
+                    
DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {
                 Map<TableId, TableChanges.TableChange> tableSchemas =
                         
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
                 LOG.info("The table schema discovery for binlog split {} 
success", splitId);
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
new file mode 100644
index 000000000..433d1ce60
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java
@@ -0,0 +1,78 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.split;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges.TableChange;
+
+import java.util.Map;
+
+/**
+ * The split to describe a split of MySql metric.
+ */
+public class MySqlMetricSplit extends MySqlSplit {
+
+    private Long numRecordsIn = 0L;
+
+    private Long numBytesIn = 0L;
+
+    public Long getNumRecordsIn() {
+        return numRecordsIn;
+    }
+
+    public void setNumRecordsIn(Long numRecordsIn) {
+        this.numRecordsIn = numRecordsIn;
+    }
+
+    public Long getNumBytesIn() {
+        return numBytesIn;
+    }
+
+    public void setNumBytesIn(Long numBytesIn) {
+        this.numBytesIn = numBytesIn;
+    }
+
+    public MySqlMetricSplit(String splitId) {
+        super(splitId);
+    }
+
+    public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn) {
+        this("");
+        this.numBytesIn = numBytesIn;
+        this.numRecordsIn = numRecordsIn;
+    }
+
+    public void setMetricData(long count, long byteNum) {
+        numRecordsIn = numRecordsIn + count;
+        numBytesIn = numBytesIn + byteNum;
+    }
+
+    @Override
+    public Map<TableId, TableChange> getTableSchemas() {
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return "MysqlMetricSplit{"
+                + "numRecordsIn=" + numRecordsIn
+                + ", numBytesIn=" + numBytesIn
+                + '}';
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
index a84f196bd..5f29a3149 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java
@@ -34,6 +34,14 @@ public abstract class MySqlSplit implements SourceSplit {
         this.splitId = splitId;
     }
 
+    public final boolean isMetricSplit() {
+        return getClass() == MySqlMetricSplit.class;
+    }
+
+    public final MySqlMetricSplit asMetricSplit() {
+        return (MySqlMetricSplit) this;
+    }
+
     /** Checks whether this split is a snapshot split. */
     public final boolean isSnapshotSplit() {
         return getClass() == MySqlSnapshotSplit.class;
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 18f7161e5..6b7b5de77 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -56,6 +56,7 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
 
     private static final int SNAPSHOT_SPLIT_FLAG = 1;
     private static final int BINLOG_SPLIT_FLAG = 2;
+    private static final int METRIC_SPLIT_FLAG = 3;
 
     private static void writeTableSchemas(
             Map<TableId, TableChange> tableSchemas, DataOutputSerializer out) 
throws IOException {
@@ -167,7 +168,7 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
             // serialization
             snapshotSplit.serializedFormCache = result;
             return result;
-        } else {
+        } else if (split.isBinlogSplit()) {
             final MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
             // optimization: the splits lazily cache their own serialized form
             if (binlogSplit.serializedFormCache != null) {
@@ -189,6 +190,15 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
             // serialization
             binlogSplit.serializedFormCache = result;
             return result;
+        } else {
+            final MySqlMetricSplit mysqlMetricSplit = split.asMetricSplit();
+            final DataOutputSerializer out = SERIALIZER_CACHE.get();
+            out.writeInt(METRIC_SPLIT_FLAG);
+            out.writeLong(mysqlMetricSplit.getNumBytesIn());
+            out.writeLong(mysqlMetricSplit.getNumRecordsIn());
+            final byte[] result = out.getCopyOfBuffer();
+            out.clear();
+            return result;
         }
     }
 
@@ -255,6 +265,10 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
                     tableChangeMap,
                     totalFinishedSplitSize,
                     isSuspended);
+        } else if (splitKind == METRIC_SPLIT_FLAG) {
+            long numBytesIn = in.readLong();
+            long numRecordsIn = in.readLong();
+            return new MySqlMetricSplit(numBytesIn, numRecordsIn);
         } else {
             throw new IOException("Unknown split kind: " + splitKind);
         }

Reply via email to