This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 704ef3eb7 [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907) 704ef3eb7 is described below commit 704ef3eb7440b1ad3a41ab851203f56c1fe650e3 Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Fri Apr 28 11:20:01 2023 +0800 [INLONG-7906][Sort] Improve logic of calculation object byte size (#7907) --- .../inlong/sort/base/metric/SinkMetricData.java | 8 ++-- .../inlong/sort/base/metric/SourceMetricData.java | 14 +++---- .../sort/base/metric/sub/SinkTableMetricData.java | 7 ++-- .../sort/base/util/CalculateObjectSizeUtils.java | 47 ++++++++++++++++++++++ .../base/util/CalculateObjectSizeUtilsTest.java | 47 ++++++++++++++++++++++ .../table/DorisDynamicSchemaOutputFormat.java | 9 +++-- .../filesystem/stream/AbstractStreamingWriter.java | 7 ++-- .../inlong/sort/hbase/sink/HBaseSinkFunction.java | 5 ++- .../hive/filesystem/AbstractStreamingWriter.java | 4 +- .../sink/multiple/DynamicSchemaHandleOperator.java | 4 +- .../sink/multiple/IcebergMultipleStreamWriter.java | 4 +- .../jdbc/internal/JdbcBatchingOutputFormat.java | 4 +- .../internal/TableMetricStatementExecutor.java | 9 +++-- 13 files changed, 130 insertions(+), 39 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 3e0ae04b3..a130be24a 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -25,7 +25,6 @@ import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; -import java.nio.charset.StandardCharsets; import java.util.Map; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; @@ -36,6 +35,7 @@ import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND; +import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; /** * A collection class for handling metrics @@ -253,13 +253,11 @@ public class SinkMetricData implements MetricData { } public void invokeWithEstimate(Object o) { - long size = o.toString().getBytes(StandardCharsets.UTF_8).length; - invoke(1, size); + invoke(1, getDataSize(o)); } public void invokeDirtyWithEstimate(Object o) { - long size = o.toString().getBytes(StandardCharsets.UTF_8).length; - invokeDirty(1, size); + invokeDirty(1, getDataSize(o)); } public void invoke(long rowCount, long rowSize) { diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index e5ffdf844..b5ca46713 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -17,24 +17,24 @@ package org.apache.inlong.sort.base.metric; -import java.util.List; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.inlong.audit.AuditOperator; - -import java.nio.charset.StandardCharsets; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Map; + import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; +import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; /** * A collection class for handling metrics @@ -194,13 +194,11 @@ public class SourceMetricData implements MetricData { } public void outputMetricsWithEstimate(Object data) { - long size = data.toString().getBytes(StandardCharsets.UTF_8).length; - outputMetrics(1, size); + outputMetrics(1, getDataSize(data)); } public void outputMetricsWithEstimate(Object data, long dataTime) { - long size = data.toString().getBytes(StandardCharsets.UTF_8).length; - outputMetrics(1, size, dataTime); + outputMetrics(1, getDataSize(data), dataTime); } public void outputMetrics(long rowCountSize, long rowDataSize) { diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java index 92f9da48f..537bab033 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java @@ -38,6 +38,7 @@ import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; +import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; /** * A collection class for handling sub metrics of table schema type @@ -405,13 +406,11 @@ public class SinkTableMetricData extends SinkMetricData implements SinkSubMetric * @param data the dirty data */ public void outputDirtyMetricsWithEstimate(String database, String table, Object data) { - long size = data == null ? 0L : data.toString().getBytes(StandardCharsets.UTF_8).length; - outputDirtyMetrics(database, table, 1, size); + outputDirtyMetrics(database, table, 1, getDataSize(data)); } public void outputDirtyMetricsWithEstimate(Object data) { - long size = data.toString().getBytes(StandardCharsets.UTF_8).length; - invokeDirty(1, size); + invokeDirty(1, getDataSize(data)); } @Override diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java new file mode 100644 index 000000000..0826eb2a9 --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.util; + +import org.apache.flink.table.data.binary.BinaryRowData; + +import java.nio.charset.StandardCharsets; + +/** + * calculate tool for object + */ +public class CalculateObjectSizeUtils { + + /** + * {@link BinaryRowData} don't implement the {@link Object#toString} method + * So, we need use {@link BinaryRowData#getSizeInBytes} to get byte size. + */ + public static long getDataSize(Object object) { + if (object == null) { + return 0L; + } + long size; + if (object instanceof BinaryRowData) { + BinaryRowData binaryRowData = (BinaryRowData) object; + size = binaryRowData.getSizeInBytes(); + } else { + size = object.toString().getBytes(StandardCharsets.UTF_8).length; + } + return size; + } + +} diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java new file mode 100644 index 000000000..cc6b85ebb --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtilsTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.util; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.binary.BinaryRowDataUtil; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link CalculateObjectSizeUtils} + */ +public class CalculateObjectSizeUtilsTest { + + @Test + public void testGetDataSize() { + String data1 = null; + long expected1 = 0L; + long actual1 = CalculateObjectSizeUtils.getDataSize(data1); + Assert.assertEquals(expected1, actual1); + + String data2 = "test"; + long expected2 = 4L; + long actual2 = CalculateObjectSizeUtils.getDataSize(data2); + Assert.assertEquals(expected2, actual2); + + BinaryRowData data3 = BinaryRowDataUtil.EMPTY_ROW; + long expected3 = 8L; + long actual3 = CalculateObjectSizeUtils.getDataSize(data3); + Assert.assertEquals(expected3, actual3); + } +} diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index db446f0ce..02f364b4d 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -17,8 +17,6 @@ package org.apache.inlong.sort.doris.table; -import java.util.LinkedHashSet; -import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; @@ -55,6 +53,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.doris.model.RespContent; import org.apache.inlong.sort.doris.util.DorisParseUtils; @@ -68,10 +67,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -579,9 +580,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { try { metricData.outputDirtyMetricsWithEstimate(database, table, 1, - content.getBytes(StandardCharsets.UTF_8).length); + CalculateObjectSizeUtils.getDataSize(content)); } catch (Exception ex) { - metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length); + metricData.invokeDirtyWithEstimate(dirtyData); } } diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java index 3d8531bd4..4cef0c5b8 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java @@ -44,13 +44,13 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; -import java.nio.charset.StandardCharsets; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; @@ -236,7 +236,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe element.hasTimestamp() ? element.getTimestamp() : null, currentWatermark); rowSize = rowSize + 1; - dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length; + dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(element.getValue()); } catch (IOException e) { throw new RuntimeException(e); } catch (Exception e) { @@ -245,8 +245,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe throw new RuntimeException(e); } if (sinkMetricData != null) { - sinkMetricData.invokeDirty(1L, - element.getValue().toString().getBytes(StandardCharsets.UTF_8).length); + sinkMetricData.invokeWithEstimate(element.getValue()); } if (dirtySink != null) { DirtyData.Builder<Object> builder = DirtyData.builder(); diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java index 1d62d8f56..be01bf403 100644 --- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java +++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java @@ -52,6 +52,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -249,13 +250,13 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> try { mutation = Preconditions.checkNotNull(mutationConverter.convertToMutation(value)); rowSize++; - dataSize = dataSize + value.toString().getBytes(StandardCharsets.UTF_8).length; + dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(value); } catch (Exception e) { LOGGER.error("Convert to mutation error", e); if (!dirtyOptions.ignoreDirty()) { throw new RuntimeException(e); } - sinkMetricData.invokeDirty(1, value.toString().getBytes(StandardCharsets.UTF_8).length); + sinkMetricData.invokeDirtyWithEstimate(value); if (dirtySink != null) { DirtyData.Builder<Object> builder = DirtyData.builder(); try { diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java index 4a359ad7e..d2603b90a 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java @@ -44,13 +44,13 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; -import java.nio.charset.StandardCharsets; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; @@ -241,7 +241,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe currentWatermark); rowSize = rowSize + 1; if (element.getValue() != null) { - dataSize = dataSize + element.getValue().toString().getBytes(StandardCharsets.UTF_8).length; + dataSize = dataSize + CalculateObjectSizeUtils.getDataSize(element.getValue()); } } catch (IOException e) { throw e; diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index 472944aae..6194b891d 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -229,7 +229,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi if (!dirtyOptions.ignoreDirty()) { if (metricData != null) { metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(), - tableId.name(), rowData.toString()); + tableId.name(), rowData); } } else { handleDirtyData(rowData.toString(), jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId, @@ -384,7 +384,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi if (metricData != null) { metricData.outputDirtyMetricsWithEstimate( tableId.namespace().toString(), tableId.name(), - rowData.toString()); + rowData); } } else { handleDirtyData(rowData.toString(), jsonNode, diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java index 3e29f20fc..32c4200ad 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java @@ -49,6 +49,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; import org.apache.inlong.sort.base.sink.MultipleSinkOption; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory; import org.slf4j.Logger; @@ -56,7 +57,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.Closeable; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -258,7 +258,7 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi for (RowData data : recordWithSchema.getData()) { String dataBaseName = tableId.namespace().toString(); String tableName = tableId.name(); - long size = data == null ? 0 : data.toString().getBytes(StandardCharsets.UTF_8).length; + long size = CalculateObjectSizeUtils.getDataSize(data); try { multipleWriters.get(tableId).processElement(data); diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java index f7c2f74e5..307c71ddc 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java @@ -54,6 +54,7 @@ import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.Field; -import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.HashMap; import java.util.concurrent.Executors; @@ -309,7 +309,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat private void updateMetric(In record) { rowSize++; - dataSize += record.toString().getBytes(StandardCharsets.UTF_8).length; + dataSize += CalculateObjectSizeUtils.getDataSize(record); } private void resetStateAfterFlush() { diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java index c5ae7cc65..4f8a1ec97 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java @@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData; import org.apache.inlong.sort.base.dirty.DirtySinkHelper; import org.apache.inlong.sort.base.dirty.DirtyType; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,7 +114,7 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe // approximate since it may be inefficient to iterate over all writtenSize-1 elements. long writtenBytes = 0L; if (writtenSize > 0) { - writtenBytes = (long) batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize; + writtenBytes = CalculateObjectSizeUtils.getDataSize(batch.get(0)) * writtenSize; } batch.clear(); if (!multipleSink) { @@ -181,7 +182,7 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe st.addBatch(); st.executeBatch(); if (!multipleSink) { - sinkMetricData.invoke(1, rowData.toString().getBytes().length); + sinkMetricData.invokeWithEstimate(rowData); } else { metric[0] += 1; metric[1] += rowData.toString().getBytes().length; @@ -200,13 +201,13 @@ public final class TableMetricStatementExecutor implements JdbcBatchStatementExe if (dirtySinkHelper != null) { dirtySinkHelper.invoke(rowData.toString(), DirtyType.BATCH_LOAD_ERROR, e); } - sinkMetricData.invokeDirty(1, rowData.toString().getBytes().length); + sinkMetricData.invokeDirtyWithEstimate(rowData); } else { if (dirtySinkHelper != null) { dirtySinkHelper.invoke(rowData.toString(), DirtyType.BATCH_LOAD_ERROR, label, logtag, identifier, e); } metric[2] += 1; - metric[3] += rowData.toString().getBytes().length; + metric[3] += CalculateObjectSizeUtils.getDataSize(rowData); } }