This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f4093967ae [INLONG-8969][Sort] Optimize Iceberg Source audit report 
logic (#8970)
f4093967ae is described below

commit f4093967ae7d21ab6c0eba79bf5648ea38fbaff9
Author: vernedeng <verned...@apache.org>
AuthorDate: Fri Sep 22 16:21:14 2023 +0800

    [INLONG-8969][Sort] Optimize Iceberg Source audit report logic (#8970)
    
    * [INLONG-8969][Sort] Optimize Iceberg Source audit report logic
---
 .../inlong/sort/base/metric/SourceMetricData.java  |  6 -----
 .../sort/base/util/CalculateObjectSizeUtils.java   |  7 -----
 .../inlong/sort/iceberg/source/IcebergSource.java  |  4 +--
 .../iceberg/source/reader/IcebergSourceReader.java |  2 +-
 .../source/reader/IcebergSourceSplitReader.java    |  4 +--
 .../reader/InlongIcebergSourceReaderMetrics.java   | 31 +++++++++++++++++++---
 .../source/reader/RowDataRecordFactory.java        |  2 +-
 .../source/utils/RecyclableJoinedRowData.java      |  9 ++++++-
 8 files changed, 42 insertions(+), 23 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index c77f688045..ec64f53b39 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -38,7 +38,6 @@ import static 
org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
-import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataArraySize;
 import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
 
 /**
@@ -243,11 +242,6 @@ public class SourceMetricData implements MetricData {
         outputMetrics(1, getDataSize(data));
     }
 
-    public void outputMetricsWithEstimate(Object[] records) {
-        long size = getDataArraySize(records);
-        outputMetrics(records.length, size);
-    }
-
     public void outputMetricsWithEstimate(Object data, long fetchDelay, long 
emitDelay) {
         outputMetrics(1, getDataSize(data));
         this.fetchDelay = fetchDelay;
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
index 748522693b..0826eb2a9e 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.base.util;
 import org.apache.flink.table.data.binary.BinaryRowData;
 
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 
 /**
  * calculate tool for object
@@ -45,10 +44,4 @@ public class CalculateObjectSizeUtils {
         return size;
     }
 
-    public static long getDataArraySize(Object[] objects) {
-        return Arrays.stream(objects)
-                .mapToLong(CalculateObjectSizeUtils::getDataSize)
-                .sum();
-    }
-
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
index d5b58e050c..adff22e7cd 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
@@ -164,8 +164,8 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
 
     @Override
     public SourceReader<T, IcebergSourceSplit> 
createReader(SourceReaderContext readerContext) {
-        InlongIcebergSourceReaderMetrics metrics =
-                new 
InlongIcebergSourceReaderMetrics(readerContext.metricGroup(), 
lazyTable().name());
+        InlongIcebergSourceReaderMetrics<T> metrics =
+                new 
InlongIcebergSourceReaderMetrics<>(readerContext.metricGroup(), 
lazyTable().name());
         metrics.registerMetrics(metricOption);
         return new IcebergSourceReader<>(metrics, readerFunction, 
readerContext);
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
index beaf321e51..ad3a9b13d4 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -39,7 +39,7 @@ public class IcebergSourceReader<T>
             SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, 
IcebergSourceSplit, IcebergSourceSplit> {
 
     public IcebergSourceReader(
-            InlongIcebergSourceReaderMetrics metrics,
+            InlongIcebergSourceReaderMetrics<T> metrics,
             ReaderFunction<T> readerFunction,
             SourceReaderContext context) {
         super(
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
index 146c2dad7e..002ac947a4 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
@@ -44,7 +44,7 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSourceSplitReader.class);
 
-    private final InlongIcebergSourceReaderMetrics metrics;
+    private final InlongIcebergSourceReaderMetrics<T> metrics;
     private final ReaderFunction<T> openSplitFunction;
     private final int indexOfSubtask;
     private final Queue<IcebergSourceSplit> splits;
@@ -54,7 +54,7 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
     private String currentSplitId;
 
     IcebergSourceSplitReader(
-            InlongIcebergSourceReaderMetrics metrics,
+            InlongIcebergSourceReaderMetrics<T> metrics,
             ReaderFunction<T> openSplitFunction,
             SourceReaderContext context) {
         this.metrics = metrics;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
index 60cca86ec3..a2431d3fbd 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
@@ -19,16 +19,20 @@ package org.apache.inlong.sort.iceberg.source.reader;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.iceberg.source.utils.RecyclableJoinedRowData;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
 
+import java.nio.charset.StandardCharsets;
+
 /**
  * Inlong iceberg source reader metrics
  */
 @Slf4j
-public class InlongIcebergSourceReaderMetrics extends 
IcebergSourceReaderMetrics {
+public class InlongIcebergSourceReaderMetrics<T> extends 
IcebergSourceReaderMetrics {
 
     private final MetricGroup metrics;
     private SourceMetricData sourceMetricData;
@@ -46,10 +50,31 @@ public class InlongIcebergSourceReaderMetrics extends 
IcebergSourceReaderMetrics
         }
     }
 
-    public void outputMetricsWithEstimate(ArrayBatchRecords batchRecord) {
+    public void outputMetricsWithEstimate(ArrayBatchRecords<T> batchRecord) {
         if (sourceMetricData != null) {
-            sourceMetricData.outputMetricsWithEstimate(batchRecord.records());
+            int dataCount = batchRecord.numberOfRecords();
+            T[] records = batchRecord.records();
+            for (int i = 0; i < dataCount; i++) {
+                long dataSize = getDataSize(records[i]);
+                long dataTime = getDataTime(records[i]);
+                sourceMetricData.outputMetrics(1, dataSize, dataTime);
+            }
+
         }
+    }
 
+    private long getDataTime(T object) {
+        if (object instanceof RecyclableJoinedRowData) {
+            return ((RecyclableJoinedRowData) object).getDataTime();
+        }
+        return System.currentTimeMillis();
+    }
+
+    private long getDataSize(T object) {
+        if (object instanceof RecyclableJoinedRowData) {
+            RowData physical = ((RecyclableJoinedRowData) 
object).getPhysicalRowData();
+            return physical.toString().getBytes(StandardCharsets.UTF_8).length;
+        }
+        return object.toString().getBytes(StandardCharsets.UTF_8).length;
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
index 80110cf014..f17186449a 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
@@ -69,6 +69,6 @@ class RowDataRecordFactory implements RecordFactory<RowData> {
         RowData physical =
                 RowDataCloneUtil.clonePhysical(from, 
recyclable.getPhysicalRowData(), rowType, fieldSerializers);
         RowData meta = RowDataCloneUtil.cloneMeta(from, 
recyclable.getMetaRowData(), metadataConverters);
-        batch[position] = recyclable.replace(physical.getRowKind(), physical, 
meta);
+        batch[position] = recyclable.replace(physical.getRowKind(), physical, 
meta, System.currentTimeMillis());
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
index 28327d2d8d..4dcec331d4 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
@@ -39,6 +39,7 @@ public class RecyclableJoinedRowData implements RowData {
     private RowKind rowKind = RowKind.INSERT;
     private RowData physicalRowData;
     private RowData metaRowData;
+    private long dataTime;
 
     public RecyclableJoinedRowData() {
     }
@@ -48,10 +49,12 @@ public class RecyclableJoinedRowData implements RowData {
         metaRowData = new GenericRowData(metaSize);
     }
 
-    public RecyclableJoinedRowData replace(RowKind rowKind, RowData 
physicalRowData, RowData metaRowData) {
+    public RecyclableJoinedRowData replace(RowKind rowKind, RowData 
physicalRowData, RowData metaRowData,
+            long dataTime) {
         this.rowKind = rowKind;
         this.physicalRowData = physicalRowData;
         this.metaRowData = metaRowData;
+        this.dataTime = dataTime;
         return this;
     }
 
@@ -63,6 +66,10 @@ public class RecyclableJoinedRowData implements RowData {
         return metaRowData;
     }
 
+    public long getDataTime() {
+        return dataTime;
+    }
+
     // 
---------------------------------------------------------------------------------------------
 
     @Override

Reply via email to