yihua commented on code in PR #13711:
URL: https://github.com/apache/hudi/pull/13711#discussion_r2359280154


##########
hudi-common/src/main/java/org/apache/hudi/stats/ValueType.java:
##########
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.stats;
+
+import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.avro.HoodieAvroWrapperUtils;
+import org.apache.hudi.common.util.DateTimeUtils;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeTokenParser;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.convertBytesToBigDecimal;
+import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
+import static org.apache.hudi.common.util.DateTimeUtils.instantToNanos;
+import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
+import static org.apache.hudi.common.util.DateTimeUtils.nanosToInstant;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
+public enum ValueType {
+  V1(HoodieAvroWrapperUtils.PrimitiveWrapperType.V1.getClazz(), 
HoodieAvroWrapperUtils.PrimitiveWrapperType.V1,
+      ValueType::passThrough, ValueType::passThrough, ValueType::passThrough),
+  NULL(HoodieAvroWrapperUtils.PrimitiveWrapperType.NULL.getClazz(), 
HoodieAvroWrapperUtils.PrimitiveWrapperType.NULL,
+      ValueType::passThrough, ValueType::passThrough, ValueType::passThrough),
+  BOOLEAN(HoodieAvroWrapperUtils.PrimitiveWrapperType.BOOLEAN, 
ValueType::castToBoolean),
+  INT(HoodieAvroWrapperUtils.PrimitiveWrapperType.INT, 
ValueType::castToInteger),
+  LONG(HoodieAvroWrapperUtils.PrimitiveWrapperType.LONG, 
ValueType::castToLong),
+  FLOAT(HoodieAvroWrapperUtils.PrimitiveWrapperType.FLOAT, 
ValueType::castToFloat),
+  DOUBLE(HoodieAvroWrapperUtils.PrimitiveWrapperType.DOUBLE, 
ValueType::castToDouble),
+  STRING(HoodieAvroWrapperUtils.PrimitiveWrapperType.STRING, 
ValueType::castToString),
+  BYTES(HoodieAvroWrapperUtils.PrimitiveWrapperType.BYTES, 
ValueType::castToBytes),
+  FIXED(HoodieAvroWrapperUtils.PrimitiveWrapperType.BYTES, 
ValueType::castToFixed),
+  DECIMAL(BigDecimal.class, HoodieAvroWrapperUtils.PrimitiveWrapperType.BYTES,
+      ValueType::castToDecimal, ValueType::toDecimal, ValueType::fromDecimal),
+  UUID(UUID.class, HoodieAvroWrapperUtils.PrimitiveWrapperType.STRING,
+      ValueType::castToUUID, ValueType::toUUID, ValueType::fromUUID),
+  DATE(LocalDate.class, HoodieAvroWrapperUtils.PrimitiveWrapperType.INT,
+      ValueType::castToDate, ValueType::toDate, ValueType::fromDate),
+  TIME_MILLIS(LocalTime.class, HoodieAvroWrapperUtils.PrimitiveWrapperType.INT,
+      ValueType::castToTimeMillis, ValueType::toTimeMillis, 
ValueType::fromTimeMillis),
+  TIME_MICROS(LocalTime.class, 
HoodieAvroWrapperUtils.PrimitiveWrapperType.LONG,
+      ValueType::castToTimeMicros, ValueType::toTimeMicros, 
ValueType::fromTimeMicros),
+  TIMESTAMP_MILLIS(Instant.class, 
HoodieAvroWrapperUtils.PrimitiveWrapperType.LONG,
+      ValueType::castToTimestampMillis, ValueType::toTimestampMillis, 
ValueType::fromTimestampMillis),
+  TIMESTAMP_MICROS(Instant.class, 
HoodieAvroWrapperUtils.PrimitiveWrapperType.LONG,
+      ValueType::castToTimestampMicros, ValueType::toTimestampMicros, 
ValueType::fromTimestampMicros),
+  TIMESTAMP_NANOS(Instant.class, 
HoodieAvroWrapperUtils.PrimitiveWrapperType.LONG,
+      ValueType::castToTimestampNanos, ValueType::toTimestampNanos, 
ValueType::fromTimestampNanos),
+  LOCAL_TIMESTAMP_MILLIS(LocalDateTime.class, 
HoodieAvroWrapperUtils.PrimitiveWrapperType.LONG,
+      ValueType::castToLocalTimestampMillis, 
ValueType::toLocalTimestampMillis, ValueType::fromLocalTimestampMillis),
+  LOCAL_TIMESTAMP_MICROS(LocalDateTime.class, 
HoodieAvroWrapperUtils.PrimitiveWrapperType.LONG,
+      ValueType::castToLocalTimestampMicros, 
ValueType::toLocalTimestampMicros, ValueType::fromLocalTimestampMicros),
+  LOCAL_TIMESTAMP_NANOS(LocalDateTime.class, 
HoodieAvroWrapperUtils.PrimitiveWrapperType.LONG,
+      ValueType::castToLocalTimestampNanos, ValueType::toLocalTimestampNanos, 
ValueType::fromLocalTimestampNanos);
+
+  private final Class<?> internalType;
+  private final HoodieAvroWrapperUtils.PrimitiveWrapperType 
primitiveWrapperType;
+  private final BiFunction<Object, ValueMetadata, Comparable<?>> standardize;
+  private final BiFunction<Comparable<?>, ValueMetadata, Comparable<?>> 
toComplex;
+  private final BiFunction<Comparable<?>, ValueMetadata, Comparable<?>> 
toPrimitive;

Review Comment:
   Let's add Javadocs to explain these methods.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala:
##########
@@ -445,6 +448,28 @@ object ColumnStatsIndexSupport {
   @inline def composeColumnStatStructType(col: String, statName: String, 
dataType: DataType) =
     StructField(formatColName(col, statName), dataType, nullable = true, 
Metadata.empty)
 
+  def extractColStatsValue(valueWrapper: AnyRef, dataType: DataType, 
valueMetadata: ValueMetadata, useJava8api: Boolean): Any = {
+    valueMetadata.getValueType match {
+      case ValueType.V1 => extractWrapperValueV1(valueWrapper, dataType)
+      case _ => extractColStatsValueV2(valueWrapper, dataType, valueMetadata, 
useJava8api)
+    }
+  }
+
+  private def extractColStatsValueV2(valueWrapper: AnyRef, dataType: DataType, 
valueMetadata: ValueMetadata, useJava8api: Boolean): Any = {
+    val colStatsValue = 
SparkValueMetadata.convertJavaTypeToSparkType(SparkValueMetadata.getValueMetadata(dataType,
 HoodieIndexVersion.V2)
+      .standardizeJavaTypeAndPromote(valueMetadata.unwrapValue(valueWrapper)), 
useJava8api)
+    // TODO: should this be done here? Should we handle this with adding more 
value types?
+    // TODO: should this logic be in convertJavaTypeToSparkType?
+    dataType match {
+      case ShortType => colStatsValue.asInstanceOf[Int].toShort
+      case ByteType => colStatsValue.asInstanceOf[Int].toByte

Review Comment:
   What Avro field type corresponds to these two?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -117,6 +124,18 @@ public UpgradeDowngrade.TableConfigChangeSet 
upgrade(HoodieWriteConfig config,
           metaClient.getIndexDefinitionPath(),
           indexMetadataOpt.get(),
           metaClient.getTableConfig().getTableVersion());
+      //Validate index columns
+      TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+      try {
+        List<String> partitionsToDrop = mdtPartitionsToDrop(indexMetadataOpt, 
tableSchemaResolver.getTableAvroSchema());
+        if (!partitionsToDrop.isEmpty()) {
+          try (BaseHoodieWriteClient writeClient = 
upgradeDowngradeHelper.getWriteClient(config, context)) {
+            writeClient.dropIndex(partitionsToDrop);

Review Comment:
   For col stats index, instead of dropping, could we allow the index to 
persist by default if user prefers that on large tables (given rebuilding 
column stats may take time)?  Let's add a config to control whether to drop or 
not.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1721,8 +1760,9 @@ private static 
List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetada
     } catch (Exception e) {
       // NOTE: In case reading column range metadata from individual file 
failed,
       //       we simply fall back, in lieu of failing the whole task
-      LOG.error("Failed to fetch column range metadata for: {}", 
partitionPathFileName);
-      return Collections.emptyList();
+      throw new HoodieException("Failed to fetch column range metadata for: " 
+ partitionPathFileName, e);

Review Comment:
   The column stats index and data skipping are designed in a way that if the 
column stats are missing for a particular file or column, the data skipping 
should not prune the file so the correctness of querying is still guaranteed.  
We can revisit this case to see if we want to throw error if the column range 
metadata cannot be read due to more restricted set of exceptions.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala:
##########
@@ -72,10 +72,6 @@ abstract class 
SparkParquetReaderBase(enableVectorizedReader: Boolean,
     conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, true)
     // Using string value of this conf to preserve compatibility across spark 
versions.
     conf.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, false)
-    if (HoodieSparkUtils.gteqSpark3_4) {
-      // PARQUET_INFER_TIMESTAMP_NTZ_ENABLED is required from Spark 3.4.0 or 
above
-      conf.setBoolean("spark.sql.parquet.inferTimestampNTZ.enabled", false)
-    }

Review Comment:
   Yes, that works.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -285,23 +284,15 @@ static boolean 
isMetadataTableBehindDataTable(HoodieWriteConfig config,
    * @param table         Hoodie table
    * @param operationType Type of operation (upgrade/downgrade)
    */
-  public static void dropNonV1SecondaryIndexPartitions(HoodieWriteConfig 
config, HoodieEngineContext context,
-                                                       HoodieTable table, 
SupportsUpgradeDowngrade upgradeDowngradeHelper, String operationType) {
+  public static void dropNonV1IndexPartitions(HoodieWriteConfig config, 
HoodieEngineContext context,
+                                              HoodieTable table, 
SupportsUpgradeDowngrade upgradeDowngradeHelper, String operationType) {
     HoodieTableMetaClient metaClient = table.getMetaClient();
     try (BaseHoodieWriteClient writeClient = 
upgradeDowngradeHelper.getWriteClient(config, context)) {
       List<String> mdtPartitions = 
metaClient.getTableConfig().getMetadataPartitions()
           .stream()
-          .filter(partition -> {
-            // Only drop secondary indexes that are not V1
-            return metaClient.getIndexForMetadataPartition(partition)
-                .map(indexDef -> {
-                  if 
(MetadataPartitionType.fromPartitionPath(indexDef.getIndexName()).equals(MetadataPartitionType.SECONDARY_INDEX))
 {
-                    return 
HoodieIndexVersion.V1.lowerThan(indexDef.getVersion());
-                  }
-                  return false;
-                })
-                .orElse(false);
-          })
+          .filter(partition -> 
metaClient.getIndexForMetadataPartition(partition)
+              .map(indexDef -> 
HoodieIndexVersion.V1.lowerThan(indexDef.getVersion()))

Review Comment:
   We'll take this separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to