This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f4093967ae [INLONG-8969][Sort] Optimize Iceberg Source audit report logic (#8970) f4093967ae is described below commit f4093967ae7d21ab6c0eba79bf5648ea38fbaff9 Author: vernedeng <verned...@apache.org> AuthorDate: Fri Sep 22 16:21:14 2023 +0800 [INLONG-8969][Sort] Optimize Iceberg Source audit report logic (#8970) * [INLONG-8969][Sort] Optimize Iceberg Source audit report logic --- .../inlong/sort/base/metric/SourceMetricData.java | 6 ----- .../sort/base/util/CalculateObjectSizeUtils.java | 7 ----- .../inlong/sort/iceberg/source/IcebergSource.java | 4 +-- .../iceberg/source/reader/IcebergSourceReader.java | 2 +- .../source/reader/IcebergSourceSplitReader.java | 4 +-- .../reader/InlongIcebergSourceReaderMetrics.java | 31 +++++++++++++++++++--- .../source/reader/RowDataRecordFactory.java | 2 +- .../source/utils/RecyclableJoinedRowData.java | 9 ++++++- 8 files changed, 42 insertions(+), 23 deletions(-) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index c77f688045..ec64f53b39 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -38,7 +38,6 @@ import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; -import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataArraySize; import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; /** @@ -243,11 +242,6 @@ public class SourceMetricData implements MetricData { outputMetrics(1, getDataSize(data)); } - public void outputMetricsWithEstimate(Object[] records) { - long size = getDataArraySize(records); - outputMetrics(records.length, size); - } - public void outputMetricsWithEstimate(Object data, long fetchDelay, long emitDelay) { outputMetrics(1, getDataSize(data)); this.fetchDelay = fetchDelay; diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java index 748522693b..0826eb2a9e 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java @@ -20,7 +20,6 @@ package org.apache.inlong.sort.base.util; import org.apache.flink.table.data.binary.BinaryRowData; import java.nio.charset.StandardCharsets; -import java.util.Arrays; /** * calculate tool for object @@ -45,10 +44,4 @@ public class CalculateObjectSizeUtils { return size; } - public static long getDataArraySize(Object[] objects) { - return Arrays.stream(objects) - .mapToLong(CalculateObjectSizeUtils::getDataSize) - .sum(); - } - } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java index d5b58e050c..adff22e7cd 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java @@ -164,8 +164,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn @Override public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) { - InlongIcebergSourceReaderMetrics metrics = - new InlongIcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name()); + InlongIcebergSourceReaderMetrics<T> metrics = + new InlongIcebergSourceReaderMetrics<>(readerContext.metricGroup(), lazyTable().name()); metrics.registerMetrics(metricOption); return new IcebergSourceReader<>(metrics, readerFunction, readerContext); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java index beaf321e51..ad3a9b13d4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java @@ -39,7 +39,7 @@ public class IcebergSourceReader<T> SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> { public IcebergSourceReader( - InlongIcebergSourceReaderMetrics metrics, + InlongIcebergSourceReaderMetrics<T> metrics, ReaderFunction<T> readerFunction, SourceReaderContext context) { super( diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java index 146c2dad7e..002ac947a4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java @@ -44,7 +44,7 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceSplitReader.class); - private final InlongIcebergSourceReaderMetrics metrics; + private final InlongIcebergSourceReaderMetrics<T> metrics; private final ReaderFunction<T> openSplitFunction; private final int indexOfSubtask; private final Queue<IcebergSourceSplit> splits; @@ -54,7 +54,7 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I private String currentSplitId; IcebergSourceSplitReader( - InlongIcebergSourceReaderMetrics metrics, + InlongIcebergSourceReaderMetrics<T> metrics, ReaderFunction<T> openSplitFunction, SourceReaderContext context) { this.metrics = metrics; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java index 60cca86ec3..a2431d3fbd 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java @@ -19,16 +19,20 @@ package org.apache.inlong.sort.iceberg.source.reader; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.iceberg.source.utils.RecyclableJoinedRowData; import lombok.extern.slf4j.Slf4j; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.table.data.RowData; import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics; +import java.nio.charset.StandardCharsets; + /** * Inlong iceberg source reader metrics */ @Slf4j -public class InlongIcebergSourceReaderMetrics extends IcebergSourceReaderMetrics { +public class InlongIcebergSourceReaderMetrics<T> extends IcebergSourceReaderMetrics { private final MetricGroup metrics; private SourceMetricData sourceMetricData; @@ -46,10 +50,31 @@ public class InlongIcebergSourceReaderMetrics extends IcebergSourceReaderMetrics } } - public void outputMetricsWithEstimate(ArrayBatchRecords batchRecord) { + public void outputMetricsWithEstimate(ArrayBatchRecords<T> batchRecord) { if (sourceMetricData != null) { - sourceMetricData.outputMetricsWithEstimate(batchRecord.records()); + int dataCount = batchRecord.numberOfRecords(); + T[] records = batchRecord.records(); + for (int i = 0; i < dataCount; i++) { + long dataSize = getDataSize(records[i]); + long dataTime = getDataTime(records[i]); + sourceMetricData.outputMetrics(1, dataSize, dataTime); + } + } + } + private long getDataTime(T object) { + if (object instanceof RecyclableJoinedRowData) { + return ((RecyclableJoinedRowData) object).getDataTime(); + } + return System.currentTimeMillis(); + } + + private long getDataSize(T object) { + if (object instanceof RecyclableJoinedRowData) { + RowData physical = ((RecyclableJoinedRowData) object).getPhysicalRowData(); + return physical.toString().getBytes(StandardCharsets.UTF_8).length; + } + return object.toString().getBytes(StandardCharsets.UTF_8).length; } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java index 80110cf014..f17186449a 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java @@ -69,6 +69,6 @@ class RowDataRecordFactory implements RecordFactory<RowData> { RowData physical = RowDataCloneUtil.clonePhysical(from, recyclable.getPhysicalRowData(), rowType, fieldSerializers); RowData meta = RowDataCloneUtil.cloneMeta(from, recyclable.getMetaRowData(), metadataConverters); - batch[position] = recyclable.replace(physical.getRowKind(), physical, meta); + batch[position] = recyclable.replace(physical.getRowKind(), physical, meta, System.currentTimeMillis()); } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java index 28327d2d8d..4dcec331d4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java @@ -39,6 +39,7 @@ public class RecyclableJoinedRowData implements RowData { private RowKind rowKind = RowKind.INSERT; private RowData physicalRowData; private RowData metaRowData; + private long dataTime; public RecyclableJoinedRowData() { } @@ -48,10 +49,12 @@ public class RecyclableJoinedRowData implements RowData { metaRowData = new GenericRowData(metaSize); } - public RecyclableJoinedRowData replace(RowKind rowKind, RowData physicalRowData, RowData metaRowData) { + public RecyclableJoinedRowData replace(RowKind rowKind, RowData physicalRowData, RowData metaRowData, + long dataTime) { this.rowKind = rowKind; this.physicalRowData = physicalRowData; this.metaRowData = metaRowData; + this.dataTime = dataTime; return this; } @@ -63,6 +66,10 @@ public class RecyclableJoinedRowData implements RowData { return metaRowData; } + public long getDataTime() { + return dataTime; + } + // --------------------------------------------------------------------------------------------- @Override