xuyangzhong commented on code in PR #25078: URL: https://github.com/apache/flink/pull/25078#discussion_r1837425108
########## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/TextFormatStatisticsReportUtilTest.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive.util; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.testutils.junit.utils.TempDirUtils; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +import static java.nio.file.StandardOpenOption.APPEND; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Tests for {@link TextFormatStatisticsReportUtil}. */ +class TextFormatStatisticsReportUtilTest { + private Configuration hadoopConfig; + private DataType producedDataType; + + @TempDir private java.nio.file.Path temporaryFolder; + + @BeforeEach + void setUp() { + hadoopConfig = new Configuration(); + // Create a sample producedDataType with a RowType + List<RowType.RowField> fields = new ArrayList<>(); + fields.add(new RowType.RowField("field1", new VarCharType())); + fields.add(new RowType.RowField("field2", new VarCharType())); + fields.add(new RowType.RowField("field3", new VarCharType())); + producedDataType = new AtomicDataType(new RowType(fields)); + } + + @Test + void testEstimateTableStatisticsCase1() throws IOException { + // Create sample files for testing + File tempFile = TempDirUtils.newFile(temporaryFolder, "flink_test_file.txt"); + + List<Path> files = new ArrayList<>(); + files.add(new Path(tempFile.toURI())); + + String sampleString = "sample data"; + Files.write(tempFile.toPath(), sampleString.getBytes()); + TableStats stats = + TextFormatStatisticsReportUtil.estimateTableStatistics( + files, producedDataType, hadoopConfig); + assertEquals(0, stats.getRowCount()); Review Comment: It seems a bit odd if it is 0 here. Can we use `Math.ceil` in `estimateTableStatistics`? ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/TextFormatStatisticsReportUtil.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive.util; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** Utils for text format statistics report. */ +public class TextFormatStatisticsReportUtil { + private static final Logger LOG = LoggerFactory.getLogger(TextFormatStatisticsReportUtil.class); + + public static TableStats estimateTableStatistics( + List<Path> files, DataType producedDataType, Configuration hadoopConfig) { + try { + long rowCount; + RowType rowType = (RowType) producedDataType.getLogicalType(); + double totalFileSize = 0.0; + for (Path file : files) { + totalFileSize += getTextFileSize(hadoopConfig, file); + } + rowCount = (long) (totalFileSize / estimateRowSize(rowType)); + return new TableStats(rowCount); + } catch (Exception e) { + LOG.warn("Estimating statistics failed for text format: {}", e.getMessage()); + return TableStats.UNKNOWN; + } + } + + private static int estimateRowSize(RowType rowType) { + int rowSize = 0; + for (int index = 0; index < rowType.getFieldCount(); ++index) { + LogicalType logicalType = rowType.getTypeAt(index); + rowSize += getAverageTypeValueSize(logicalType); Review Comment: nit: Implicit cast from 'double' to 'int' ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/TextFormatStatisticsReportUtil.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive.util; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** Utils for text format statistics report. */ +public class TextFormatStatisticsReportUtil { + private static final Logger LOG = LoggerFactory.getLogger(TextFormatStatisticsReportUtil.class); + + public static TableStats estimateTableStatistics( + List<Path> files, DataType producedDataType, Configuration hadoopConfig) { + try { + long rowCount; + RowType rowType = (RowType) producedDataType.getLogicalType(); + double totalFileSize = 0.0; Review Comment: nit: why not using `long` here? ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/TextFormatStatisticsReportUtil.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive.util; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.plan.stats.TableStats; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** Utils for text format statistics report. */ +public class TextFormatStatisticsReportUtil { + private static final Logger LOG = LoggerFactory.getLogger(TextFormatStatisticsReportUtil.class); + + public static TableStats estimateTableStatistics( + List<Path> files, DataType producedDataType, Configuration hadoopConfig) { + try { + long rowCount; + RowType rowType = (RowType) producedDataType.getLogicalType(); + double totalFileSize = 0.0; + for (Path file : files) { + totalFileSize += getTextFileSize(hadoopConfig, file); + } + rowCount = (long) (totalFileSize / estimateRowSize(rowType)); + return new TableStats(rowCount); + } catch (Exception e) { + LOG.warn("Estimating statistics failed for text format: {}", e.getMessage()); + return TableStats.UNKNOWN; + } + } + + private static int estimateRowSize(RowType rowType) { + int rowSize = 0; + for (int index = 0; index < rowType.getFieldCount(); ++index) { + LogicalType logicalType = rowType.getTypeAt(index); + rowSize += getAverageTypeValueSize(logicalType); + } + return rowSize; + } + + /** Estimation rules based on Hive field types. */ + private static double getAverageTypeValueSize(LogicalType logicalType) { + LogicalTypeRoot typeRoot = logicalType.getTypeRoot(); + switch (typeRoot) { + case CHAR: Review Comment: I have updated this part and. Can you help take a look and double check it? Further more, it's better to add a comment here:`draws inspiration from FlinkRelMdSize#averageTypeValueSize`. ``` switch (typeRoot) { case CHAR: case TINYINT: case BOOLEAN: return 1; case VARCHAR: case DATE: case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: case TIME_WITHOUT_TIME_ZONE: case DECIMAL: return 12; case SMALLINT: return 2; case INTEGER: case FLOAT: case INTERVAL_DAY_TIME: return 4; case BIGINT: case DOUBLE: case INTERVAL_YEAR_MONTH: return 8; case BINARY: case VARBINARY: return 16; case ARRAY: return getAverageTypeValueSize(((ArrayType) logicalType).getElementType()) * 16; case MAP: return (getAverageTypeValueSize(((MapType) logicalType).getKeyType()) + getAverageTypeValueSize(((MapType) logicalType).getValueType())) * 16; case ROW: case DISTINCT_TYPE: case STRUCTURED_TYPE: return logicalType.getChildren().stream() .map(TextFormatStatisticsReportUtil::getAverageTypeValueSize) .reduce(0.0, Double::sum); case MULTISET: return (getAverageTypeValueSize(((MultisetType) logicalType).getElementType()) + getAverageTypeValueSize(new IntType())) * 16; default: // For unknown data types, we use a smaller data size for estimation. return 8; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org