This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 042c6b2a7 [INLONG-6318][Sort] MySQL connector supports snapshots and restores the metric state (#6319) 042c6b2a7 is described below commit 042c6b2a7bd33fb7346ca3173e208b15b16da7c7 Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Mon Nov 7 11:40:03 2022 +0800 [INLONG-6318][Sort] MySQL connector supports snapshots and restores the metric state (#6319) --- .../sort/cdc/debezium/DebeziumSourceFunction.java | 21 ++++++ .../inlong/sort/cdc/mysql/source/MySqlSource.java | 2 +- .../source/metrics/MySqlSourceReaderMetrics.java | 11 +++ .../mysql/source/reader/MySqlRecordEmitter.java | 4 +- .../cdc/mysql/source/reader/MySqlSourceReader.java | 26 +++++++- .../cdc/mysql/source/split/MySqlMetricSplit.java | 78 ++++++++++++++++++++++ .../sort/cdc/mysql/source/split/MySqlSplit.java | 8 +++ .../mysql/source/split/MySqlSplitSerializer.java | 16 ++++- 8 files changed, 160 insertions(+), 6 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java index 7d7b9bcd0..80d6812f2 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; @@ -49,7 +50,9 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer; import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher; import org.apache.inlong.sort.cdc.debezium.internal.DebeziumOffset; @@ -76,6 +79,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; @@ -227,6 +231,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private SourceMetricData sourceMetricData; + private transient ListState<MetricState> metricStateListState; + + private MetricState metricState; + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( @@ -271,10 +279,19 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> stateStore.getUnionListState( new ListStateDescriptor<>( HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO)); + if (this.inlongMetric != null) { + this.metricStateListState = + stateStore.getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } if (context.isRestored()) { restoreOffsetState(); restoreHistoryRecordsState(); + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } else { if (specificOffset != null) { byte[] serializedOffset = @@ -344,6 +361,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> } else { snapshotOffsetState(functionSnapshotContext.getCheckpointId()); snapshotHistoryRecordsState(); + if (sourceMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java index ea102f429..60484ddf4 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java @@ -167,7 +167,7 @@ public class MySqlSource<T> sourceConfig.isIncludeSchemaChanges()), readerContext.getConfiguration(), mySqlSourceReaderContext, - sourceConfig); + sourceConfig, sourceReaderMetrics); } @Override diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java index 45c81e560..19ca7c570 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -98,4 +98,15 @@ public class MySqlSourceReaderMetrics { sourceMetricData.outputMetrics(rowCountSize, rowDataSize); } } + + public void initMetrics(long rowCountSize, long rowDataSize) { + if (sourceMetricData != null) { + sourceMetricData.getNumBytesIn().inc(rowDataSize); + sourceMetricData.getNumRecordsIn().inc(rowCountSize); + } + } + + public SourceMetricData getSourceMetricData() { + return sourceMetricData; + } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java index d2cc328f9..f0fe28b57 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java @@ -145,8 +145,8 @@ public final class MySqlRecordEmitter<T> new Collector<T>() { @Override public void collect(final T t) { - sourceReaderMetrics.outputMetrics(1L, - t.toString().getBytes(StandardCharsets.UTF_8).length); + long byteNum = t.toString().getBytes(StandardCharsets.UTF_8).length; + sourceReaderMetrics.outputMetrics(1L, byteNum); output.collect(t); } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java index 07c96d542..140db93fb 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java @@ -29,6 +29,7 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils; import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig; import org.apache.inlong.sort.cdc.mysql.source.events.BinlogSplitMetaEvent; @@ -41,10 +42,12 @@ import org.apache.inlong.sort.cdc.mysql.source.events.LatestFinishedSplitsSizeRe import org.apache.inlong.sort.cdc.mysql.source.events.SuspendBinlogReaderAckEvent; import org.apache.inlong.sort.cdc.mysql.source.events.SuspendBinlogReaderEvent; import org.apache.inlong.sort.cdc.mysql.source.events.WakeupReaderEvent; +import org.apache.inlong.sort.cdc.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset; import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState; +import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplitState; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit; @@ -84,6 +87,7 @@ public class MySqlSourceReader<T> private final int subtaskId; private final MySqlSourceReaderContext mySqlSourceReaderContext; private MySqlBinlogSplit suspendedBinlogSplit; + private MySqlSourceReaderMetrics sourceReaderMetrics; public MySqlSourceReader( FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue, @@ -91,7 +95,8 @@ public class MySqlSourceReader<T> RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter, Configuration config, MySqlSourceReaderContext context, - MySqlSourceConfig sourceConfig) { + MySqlSourceConfig sourceConfig, + MySqlSourceReaderMetrics sourceReaderMetrics) { super( elementQueue, new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get), @@ -104,6 +109,7 @@ public class MySqlSourceReader<T> this.subtaskId = context.getSourceReaderContext().getIndexOfSubtask(); this.mySqlSourceReaderContext = context; this.suspendedBinlogSplit = null; + this.sourceReaderMetrics = sourceReaderMetrics; } @Override @@ -142,6 +148,13 @@ public class MySqlSourceReader<T> if (suspendedBinlogSplit != null) { unfinishedSplits.add(suspendedBinlogSplit); } + SourceMetricData sourceMetricData = sourceReaderMetrics.getSourceMetricData(); + LOG.info("inlong-metric-states snapshot sourceMetricData:{}", sourceMetricData); + if (sourceMetricData != null) { + unfinishedSplits.add( + new MySqlMetricSplit(sourceMetricData.getNumBytesIn().getCount(), + sourceMetricData.getNumRecordsIn().getCount())); + } return unfinishedSplits; } @@ -171,6 +184,15 @@ public class MySqlSourceReader<T> List<MySqlSplit> unfinishedSplits = new ArrayList<>(); for (MySqlSplit split : splits) { LOG.info("Add Split: " + split); + if (split.isMetricSplit()) { + MySqlMetricSplit mysqlMetricSplit = (MySqlMetricSplit) split; + LOG.info("inlong-metric-states restore metricSplit:{}", mysqlMetricSplit); + sourceReaderMetrics.initMetrics(mysqlMetricSplit.getNumRecordsIn(), + mysqlMetricSplit.getNumBytesIn()); + LOG.info("inlong-metric-states restore sourceReaderMetrics:{}", + sourceReaderMetrics.getSourceMetricData()); + continue; + } if (split.isSnapshotSplit()) { MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit(); if (snapshotSplit.isSnapshotReadFinished()) { @@ -206,7 +228,7 @@ public class MySqlSourceReader<T> final String splitId = split.splitId(); if (split.getTableSchemas().isEmpty()) { try (MySqlConnection jdbc = - DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) { + DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) { Map<TableId, TableChanges.TableChange> tableSchemas = TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); LOG.info("The table schema discovery for binlog split {} success", splitId); diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java new file mode 100644 index 000000000..433d1ce60 --- /dev/null +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.cdc.mysql.source.split; + +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges.TableChange; + +import java.util.Map; + +/** + * The split to describe a split of MySql metric. + */ +public class MySqlMetricSplit extends MySqlSplit { + + private Long numRecordsIn = 0L; + + private Long numBytesIn = 0L; + + public Long getNumRecordsIn() { + return numRecordsIn; + } + + public void setNumRecordsIn(Long numRecordsIn) { + this.numRecordsIn = numRecordsIn; + } + + public Long getNumBytesIn() { + return numBytesIn; + } + + public void setNumBytesIn(Long numBytesIn) { + this.numBytesIn = numBytesIn; + } + + public MySqlMetricSplit(String splitId) { + super(splitId); + } + + public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn) { + this(""); + this.numBytesIn = numBytesIn; + this.numRecordsIn = numRecordsIn; + } + + public void setMetricData(long count, long byteNum) { + numRecordsIn = numRecordsIn + count; + numBytesIn = numBytesIn + byteNum; + } + + @Override + public Map<TableId, TableChange> getTableSchemas() { + return null; + } + + @Override + public String toString() { + return "MysqlMetricSplit{" + + "numRecordsIn=" + numRecordsIn + + ", numBytesIn=" + numBytesIn + + '}'; + } +} diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java index a84f196bd..5f29a3149 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplit.java @@ -34,6 +34,14 @@ public abstract class MySqlSplit implements SourceSplit { this.splitId = splitId; } + public final boolean isMetricSplit() { + return getClass() == MySqlMetricSplit.class; + } + + public final MySqlMetricSplit asMetricSplit() { + return (MySqlMetricSplit) this; + } + /** Checks whether this split is a snapshot split. */ public final boolean isSnapshotSplit() { return getClass() == MySqlSnapshotSplit.class; diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java index 18f7161e5..6b7b5de77 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java @@ -56,6 +56,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS private static final int SNAPSHOT_SPLIT_FLAG = 1; private static final int BINLOG_SPLIT_FLAG = 2; + private static final int METRIC_SPLIT_FLAG = 3; private static void writeTableSchemas( Map<TableId, TableChange> tableSchemas, DataOutputSerializer out) throws IOException { @@ -167,7 +168,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS // serialization snapshotSplit.serializedFormCache = result; return result; - } else { + } else if (split.isBinlogSplit()) { final MySqlBinlogSplit binlogSplit = split.asBinlogSplit(); // optimization: the splits lazily cache their own serialized form if (binlogSplit.serializedFormCache != null) { @@ -189,6 +190,15 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS // serialization binlogSplit.serializedFormCache = result; return result; + } else { + final MySqlMetricSplit mysqlMetricSplit = split.asMetricSplit(); + final DataOutputSerializer out = SERIALIZER_CACHE.get(); + out.writeInt(METRIC_SPLIT_FLAG); + out.writeLong(mysqlMetricSplit.getNumBytesIn()); + out.writeLong(mysqlMetricSplit.getNumRecordsIn()); + final byte[] result = out.getCopyOfBuffer(); + out.clear(); + return result; } } @@ -255,6 +265,10 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS tableChangeMap, totalFinishedSplitSize, isSuspended); + } else if (splitKind == METRIC_SPLIT_FLAG) { + long numBytesIn = in.readLong(); + long numRecordsIn = in.readLong(); + return new MySqlMetricSplit(numBytesIn, numRecordsIn); } else { throw new IOException("Unknown split kind: " + splitKind); }