This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0a672e365 [INLONG-6962][Sort] Add read phase metric and table level metric for MySQL-CDC (#6966) 0a672e365 is described below commit 0a672e3651680bc420d28305d679051c9866931d Author: chestnufang <65438734+chestnu...@users.noreply.github.com> AuthorDate: Mon Dec 19 21:49:03 2022 +0800 [INLONG-6962][Sort] Add read phase metric and table level metric for MySQL-CDC (#6966) Co-authored-by: chestnufang <chestnuf...@tencent.com> --- .../source/metrics/MySqlSourceReaderMetrics.java | 50 +++++++++++---- .../mysql/source/reader/MySqlRecordEmitter.java | 10 ++- .../cdc/mysql/source/reader/MySqlSourceReader.java | 22 +++++-- .../cdc/mysql/source/split/MySqlMetricSplit.java | 74 +++++++++++++++++++++- .../mysql/source/split/MySqlSplitSerializer.java | 48 +++++++++++++- 5 files changed, 181 insertions(+), 23 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java index 1ebd1c7a5..c941eb083 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -17,11 +17,19 @@ package org.apache.inlong.sort.cdc.mysql.source.metrics; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; + +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData; import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader; +import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric; /** * A collection class for handling metrics in {@link MySqlSourceReader}. @@ -49,7 +57,7 @@ public class MySqlSourceReaderMetrics { */ private volatile long emitDelay = 0L; - private SourceMetricData sourceMetricData; + private SourceTableMetricData sourceTableMetricData; public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; @@ -57,7 +65,7 @@ public class MySqlSourceReaderMetrics { public void registerMetrics(MetricOption metricOption) { if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption, metricGroup); + sourceTableMetricData = new SourceTableMetricData(metricOption, metricGroup); } metricGroup.gauge("currentFetchEventTimeLag", (Gauge<Long>) this::getFetchDelay); metricGroup.gauge("currentEmitEventTimeLag", (Gauge<Long>) this::getEmitDelay); @@ -92,20 +100,38 @@ public class MySqlSourceReaderMetrics { this.emitDelay = emitDelay; } - public void outputMetrics(long rowCountSize, long rowDataSize) { - if (sourceMetricData != null) { - sourceMetricData.outputMetrics(rowCountSize, rowDataSize); + public void outputMetrics(String database, String table, boolean isSnapshotRecord, Object data) { + if (sourceTableMetricData != null) { + sourceTableMetricData.outputMetricsWithEstimate(database, table, isSnapshotRecord, data); } } - public void initMetrics(long rowCountSize, long rowDataSize) { - if (sourceMetricData != null) { - sourceMetricData.getNumBytesIn().inc(rowDataSize); - sourceMetricData.getNumRecordsIn().inc(rowCountSize); + public void initMetrics(long rowCountSize, long rowDataSize, Map<String, Long> readPhaseMetricMap, + Map<String, MySqlTableMetric> tableMetricMap) { + if (sourceTableMetricData != null) { + // node level metric data + sourceTableMetricData.getNumBytesIn().inc(rowDataSize); + sourceTableMetricData.getNumRecordsIn().inc(rowCountSize); + + // register read phase metric data and table level metric data + if (readPhaseMetricMap != null && tableMetricMap != null) { + MetricState metricState = new MetricState(); + metricState.setMetrics(readPhaseMetricMap); + Map<String, MetricState> subMetricStateMap = new HashMap<>(); + tableMetricMap.entrySet().stream().filter(v -> v.getValue() != null).forEach(entry -> { + MetricState subMetricState = new MetricState(); + subMetricState.setMetrics(ImmutableMap + .of(NUM_RECORDS_IN, entry.getValue().getNumRecordsIn(), NUM_BYTES_IN, + entry.getValue().getNumBytesIn())); + subMetricStateMap.put(entry.getKey(), subMetricState); + }); + metricState.setSubMetricStateMap(subMetricStateMap); + sourceTableMetricData.registerSubMetricsGroup(metricState); + } } } - public SourceMetricData getSourceMetricData() { - return sourceMetricData; + public SourceTableMetricData getSourceMetricData() { + return sourceTableMetricData; } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java index e09b3ee88..eb961df00 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java @@ -18,6 +18,7 @@ package org.apache.inlong.sort.cdc.mysql.source.reader; import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils; +import io.debezium.connector.AbstractSourceInfo; import io.debezium.data.Envelope; import io.debezium.document.Array; import io.debezium.relational.TableId; @@ -38,7 +39,6 @@ import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; import java.util.Map; import static org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition; @@ -146,8 +146,12 @@ public final class MySqlRecordEmitter<T> @Override public void collect(final T t) { - long byteNum = t.toString().getBytes(StandardCharsets.UTF_8).length; - sourceReaderMetrics.outputMetrics(1L, byteNum); + Struct value = (Struct) element.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + String databaseName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY); + String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY); + + sourceReaderMetrics.outputMetrics(databaseName, tableName, iSnapShot, t); output.collect(t); } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java index 29a4a1282..0a7ea82c6 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mysql.source.reader; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; +import java.util.Map.Entry; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordEmitter; @@ -28,7 +29,7 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData; import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils; import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig; import org.apache.inlong.sort.cdc.mysql.source.events.BinlogSplitMetaEvent; @@ -47,6 +48,7 @@ import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit; +import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplitState; import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplit; @@ -147,12 +149,19 @@ public class MySqlSourceReader<T> if (suspendedBinlogSplit != null) { unfinishedSplits.add(suspendedBinlogSplit); } - SourceMetricData sourceMetricData = sourceReaderMetrics.getSourceMetricData(); + SourceTableMetricData sourceMetricData = sourceReaderMetrics.getSourceMetricData(); LOG.info("inlong-metric-states snapshot sourceMetricData:{}", sourceMetricData); if (sourceMetricData != null) { - unfinishedSplits.add( - new MySqlMetricSplit(sourceMetricData.getNumBytesIn().getCount(), - sourceMetricData.getNumRecordsIn().getCount())); + long countNumBytesIn = sourceMetricData.getNumBytesIn().getCount(); + long countNumRecordsIn = sourceMetricData.getNumRecordsIn().getCount(); + Map<String, Long> readPhaseMetricMap = sourceMetricData.getReadPhaseMetricMap().entrySet().stream().collect( + Collectors.toMap(v -> v.getKey().getPhase(), e -> e.getValue().getReadPhase().getCount())); + Map<String, MySqlTableMetric> tableMetricMap = sourceMetricData.getSubSourceMetricMap().entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, + e -> new MySqlTableMetric(e.getValue().getNumRecordsIn().getCount(), + e.getValue().getNumBytesIn().getCount()))); + unfinishedSplits + .add(new MySqlMetricSplit(countNumBytesIn, countNumRecordsIn, readPhaseMetricMap, tableMetricMap)); } return unfinishedSplits; } @@ -187,7 +196,8 @@ public class MySqlSourceReader<T> MySqlMetricSplit mysqlMetricSplit = (MySqlMetricSplit) split; LOG.info("inlong-metric-states restore metricSplit:{}", mysqlMetricSplit); sourceReaderMetrics.initMetrics(mysqlMetricSplit.getNumRecordsIn(), - mysqlMetricSplit.getNumBytesIn()); + mysqlMetricSplit.getNumBytesIn(), mysqlMetricSplit.getReadPhaseMetricMap(), + mysqlMetricSplit.getTableMetricMap()); LOG.info("inlong-metric-states restore sourceReaderMetrics:{}", sourceReaderMetrics.getSourceMetricData()); continue; diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java index 368d97260..c9d832207 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlMetricSplit.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mysql.source.split; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges.TableChange; +import java.io.Serializable; import java.util.Map; /** @@ -31,6 +32,16 @@ public class MySqlMetricSplit extends MySqlSplit { private Long numBytesIn = 0L; + /** + * The table level metric in a split of mysql metric. + */ + private Map<String, MySqlTableMetric> tableMetricMap; + + /** + * The read phase timestamp metric in a split of mysql metric. + */ + private Map<String, Long> readPhaseMetricMap; + public Long getNumRecordsIn() { return numRecordsIn; } @@ -47,14 +58,34 @@ public class MySqlMetricSplit extends MySqlSplit { this.numBytesIn = numBytesIn; } + public Map<String, MySqlTableMetric> getTableMetricMap() { + return tableMetricMap; + } + + public void setTableMetricMap( + Map<String, MySqlTableMetric> tableMetricMap) { + this.tableMetricMap = tableMetricMap; + } + + public Map<String, Long> getReadPhaseMetricMap() { + return readPhaseMetricMap; + } + + public void setReadPhaseMetricMap(Map<String, Long> readPhaseMetricMap) { + this.readPhaseMetricMap = readPhaseMetricMap; + } + public MySqlMetricSplit(String splitId) { super(splitId); } - public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn) { + public MySqlMetricSplit(Long numBytesIn, Long numRecordsIn, Map<String, Long> readPhaseMetricMap, + Map<String, MySqlTableMetric> tableMetricMap) { this(""); this.numBytesIn = numBytesIn; this.numRecordsIn = numRecordsIn; + this.readPhaseMetricMap = readPhaseMetricMap; + this.tableMetricMap = tableMetricMap; } public void setMetricData(long count, long byteNum) { @@ -72,6 +103,47 @@ public class MySqlMetricSplit extends MySqlSplit { return "MysqlMetricSplit{" + "numRecordsIn=" + numRecordsIn + ", numBytesIn=" + numBytesIn + + ", tableMetricMap=" + tableMetricMap + + ", readPhaseMetricMap=" + readPhaseMetricMap + '}'; } + + /** + * The mysql table level metric in a split of mysql metric. + */ + public static class MySqlTableMetric implements Serializable { + + private Long numRecordsIn; + + private Long numBytesIn; + + public MySqlTableMetric(Long numRecordsIn, Long numBytesIn) { + this.numRecordsIn = numRecordsIn; + this.numBytesIn = numBytesIn; + } + + public Long getNumRecordsIn() { + return numRecordsIn; + } + + public void setNumRecordsIn(Long numRecordsIn) { + this.numRecordsIn = numRecordsIn; + } + + public Long getNumBytesIn() { + return numBytesIn; + } + + public void setNumBytesIn(Long numBytesIn) { + this.numBytesIn = numBytesIn; + } + + @Override + public String toString() { + return "MySqlTableMetric{" + + "numRecordsIn=" + numRecordsIn + + ", numBytesIn=" + numBytesIn + + '}'; + } + } } diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java index 1f1247b21..325ef4fca 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.inlong.sort.cdc.mysql.source.split.MySqlMetricSplit.MySqlTableMetric; import static org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.readBinlogPosition; import static org.apache.inlong.sort.cdc.mysql.source.utils.SerializerUtils.rowToSerializedString; @@ -134,6 +135,46 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS return finishedSplitsInfo; } + private static void writeReadPhaseMetric(Map<String, Long> readPhaseMetrics, DataOutputSerializer out) + throws IOException { + final int size = readPhaseMetrics.size(); + out.writeInt(size); + for (Map.Entry<String, Long> entry : readPhaseMetrics.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeLong(entry.getValue()); + } + } + + private static Map<String, Long> readReadPhaseMetric(DataInputDeserializer in) throws IOException { + Map<String, Long> readPhaseMetrics = new HashMap<>(); + final int size = in.readInt(); + for (int i = 0; i < size; i++) { + readPhaseMetrics.put(in.readUTF(), in.readLong()); + } + return readPhaseMetrics; + } + + private static void writeTableMetrics(Map<String, MySqlTableMetric> tableMetrics, DataOutputSerializer out) + throws IOException { + final int size = tableMetrics.size(); + out.writeInt(size); + for (Map.Entry<String, MySqlTableMetric> entry : tableMetrics.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeLong(entry.getValue().getNumRecordsIn()); + out.writeLong(entry.getValue().getNumBytesIn()); + } + } + + private static Map<String, MySqlTableMetric> readTableMetrics(DataInputDeserializer in) throws IOException { + Map<String, MySqlTableMetric> tableMetrics = new HashMap<>(); + final int size = in.readInt(); + for (int i = 0; i < size; i++) { + String tableIdentify = in.readUTF(); + tableMetrics.put(tableIdentify, new MySqlTableMetric(in.readLong(), in.readLong())); + } + return tableMetrics; + } + @Override public int getVersion() { return VERSION; @@ -195,6 +236,8 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS out.writeInt(METRIC_SPLIT_FLAG); out.writeLong(mysqlMetricSplit.getNumBytesIn()); out.writeLong(mysqlMetricSplit.getNumRecordsIn()); + writeReadPhaseMetric(mysqlMetricSplit.getReadPhaseMetricMap(), out); + writeTableMetrics(mysqlMetricSplit.getTableMetricMap(), out); final byte[] result = out.getCopyOfBuffer(); out.clear(); return result; @@ -267,7 +310,10 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS } else if (splitKind == METRIC_SPLIT_FLAG) { long numBytesIn = in.readLong(); long numRecordsIn = in.readLong(); - return new MySqlMetricSplit(numBytesIn, numRecordsIn); + Map<String, Long> readPhaseMetricMap = readReadPhaseMetric(in); + Map<String, MySqlTableMetric> tableMetricMap = readTableMetrics(in); + + return new MySqlMetricSplit(numBytesIn, numRecordsIn, readPhaseMetricMap, tableMetricMap); } else { throw new IOException("Unknown split kind: " + splitKind); }