yihua commented on code in PR #18674:
URL: https://github.com/apache/hudi/pull/18674#discussion_r3243682450


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -187,10 +187,14 @@ public ClosableIterator<UnsafeRow> 
getUnsafeRowIterator(HoodieSchema requestedSc
                 
String.valueOf(storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(),
 sqlConf.parquetRecordFilterEnabled())));
           });
     }
-    ParquetReader<InternalRow> reader = ParquetReader.builder(new 
HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
-                rebaseDateSpec,
-                
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), 
messageSchema),
-            new Path(path.toUri()))
+    // Via SparkAdapter so Spark 4.0 picks up its variant-reordering 
ReadSupport subclass
+    // (#18334); constructing the base class here would MALFORMED_VARIANT on 
Spark 4.0.
+    HoodieParquetReadSupport readSupport = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetReadSupport(
+        Option$.MODULE$.empty(), true, true,
+        rebaseDateSpec,
+        SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"),

Review Comment:
   This could be folded into the adapter implementation, since 
`createParquetReadSupport` is a new adapter method.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -466,6 +481,28 @@ trait SparkAdapter extends Serializable {
    */
   def isVariantShreddingStruct(structType: StructType): Boolean
 
+  /**
+   * Checks if a StructType is the result of Spark 4.1's PushVariantIntoScan 
rewriting — i.e.,
+   * every child field carries `VariantMetadata` describing a pushed-down 
variant extraction.
+   *
+   * Returns false on Spark versions earlier than 4.1 (the rewriting only 
happens there).
+   */
+  def isVariantProjectionStruct(structType: StructType): Boolean = false
+
+  /**
+   * If `sparkRequiredSchema` contains any field that's a Spark 4.1 variant 
projection struct
+   * (i.e., the same-named field in `sparkDataSchema` is `VariantType`), 
returns a row
+   * transformer that takes an InternalRow in the data-schema shape (with full 
variants) and
+   * produces an InternalRow in the required-schema shape (with each variant 
column projected
+   * to its requested struct via VariantGet).
+   *
+   * Used on the MOR log-file path: log records carry the full variant on 
disk, but the merger
+   * expects rows aligned to the post-PushVariantIntoScan required schema. 
Returns None when
+   * there's nothing to project (cheap fast-path for Spark < 4.1 and for 
non-variant queries).
+   */
+  def buildVariantProjector(sparkDataSchema: StructType,
+                            sparkRequiredSchema: StructType): 
Option[InternalRow => InternalRow] = None

Review Comment:
   We need to revisit if this impacts performance and see if such projection 
can be avoided through custom variant reader.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -61,14 +65,63 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
                                               filters: Seq[Filter],
                                               requiredFilters: Seq[Filter],
                                               storageConfiguration: 
StorageConfiguration[_],
-                                              tableConfig: HoodieTableConfig)
+                                              tableConfig: HoodieTableConfig,
+                                              sparkDataSchema: 
Option[StructType] = None,

Review Comment:
   `sparkDataSchema` is not used and should be removed.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -242,6 +242,21 @@ trait SparkAdapter extends Serializable {
                             options: Map[String, String],
                             hadoopConf: Configuration): 
Option[SparkColumnarFileReader]
 
+  /**
+   * Build the [[HoodieParquetReadSupport]] for a parquet read. Spark 4.0 
overrides to return
+   * its variant-aware subclass (variant group field reorder for the 
positional converter).
+   */
+  def createParquetReadSupport(convertTz: Option[java.time.ZoneId],
+                               enableVectorizedReader: Boolean,
+                               enableTimestampFieldRepair: Boolean,
+                               datetimeRebaseSpec: 
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec,
+                               int96RebaseSpec: 
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec,
+                               tableSchemaOpt: 
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType])

Review Comment:
   nit: import the classes `RebaseSpec`, `Option => HOption`, `MessageType`



##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala:
##########
@@ -195,6 +195,18 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
     Spark40ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
   }
 
+  override def createParquetReadSupport(convertTz: Option[java.time.ZoneId],
+                                        enableVectorizedReader: Boolean,
+                                        enableTimestampFieldRepair: Boolean,
+                                        datetimeRebaseSpec: 
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec,
+                                        int96RebaseSpec: 
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec,
+                                        tableSchemaOpt: 
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType])
+      : 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetReadSupport = {
+    new 
org.apache.spark.sql.execution.datasources.parquet.Spark40HoodieParquetReadSupport(
+      convertTz, enableVectorizedReader, enableTimestampFieldRepair,
+      datetimeRebaseSpec, int96RebaseSpec, tableSchemaOpt)

Review Comment:
   nit: Import the classes and avoid package prefix.  Use `Option => HOption` 
for org.apache.hudi.common.util.Option to avoid conflict with Scala Option.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -514,6 +520,18 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
       }
   }
 
+  /**
+   * Detects a Spark 4.1 PushVariantIntoScan-projected struct. Short-circuits 
before consulting
+   * the version-specific SparkAdapter so shared-module unit tests (whose 
classpath lacks any
+   * SparkXAdapter) don't trigger an adapter-load failure on plain StructType 
conversions.
+   */
+  private def isSparkVariantProjectionStruct(st: StructType): Boolean = {
+    if (!HoodieSparkUtils.gteqSpark4_1) return false
+    if (!st.fields.exists(_.metadata != Metadata.empty)) return false

Review Comment:
   nit: could you flip this to `if (st.fields.forall(_.metadata == 
Metadata.empty)) return false`? The double-negative `!exists(!= empty)` takes a 
moment to mentally resolve compared to `forall(== empty)`.



##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type, 
Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark 
4.1+ reads
+//  variant fields by name via SPARK-54410, so the reorder workaround below is 
no longer
+//  needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its 
converters
+//  array in hardcoded [value, metadata] order, then indexes by schema 
position. If the
+//  Parquet schema has [metadata, value] order (per spec), the positional 
mismatch causes
+//  MALFORMED_VARIANT. Workaround: reorder variant group fields to [value, 
metadata] in
+//  the requested schema. parquet-mr reconciles requested vs file schema by 
field name,
+//  so bytes flow correctly. Tracked in issue #18334.
+class Spark40HoodieParquetReadSupport(
+                                       convertTz: Option[ZoneId],
+                                       enableVectorizedReader: Boolean,
+                                       enableTimestampFieldRepair: Boolean,
+                                       datetimeRebaseSpec: RebaseSpec,
+                                       int96RebaseSpec: RebaseSpec,
+                                       tableSchemaOpt: 
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] = 
org.apache.hudi.common.util.Option.empty())

Review Comment:
   nit: same on fixing imports "org.apache.hudi.common.util.Option => HOption" 
and importing MessageType



##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type, 
Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark 
4.1+ reads
+//  variant fields by name via SPARK-54410, so the reorder workaround below is 
no longer
+//  needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its 
converters
+//  array in hardcoded [value, metadata] order, then indexes by schema 
position. If the
+//  Parquet schema has [metadata, value] order (per spec), the positional 
mismatch causes
+//  MALFORMED_VARIANT. Workaround: reorder variant group fields to [value, 
metadata] in
+//  the requested schema. parquet-mr reconciles requested vs file schema by 
field name,
+//  so bytes flow correctly. Tracked in issue #18334.
+class Spark40HoodieParquetReadSupport(
+                                       convertTz: Option[ZoneId],
+                                       enableVectorizedReader: Boolean,
+                                       enableTimestampFieldRepair: Boolean,
+                                       datetimeRebaseSpec: RebaseSpec,
+                                       int96RebaseSpec: RebaseSpec,
+                                       tableSchemaOpt: 
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] = 
org.apache.hudi.common.util.Option.empty())
+  extends HoodieParquetReadSupport(
+    convertTz, enableVectorizedReader, enableTimestampFieldRepair,
+    datetimeRebaseSpec, int96RebaseSpec, tableSchemaOpt) {
+
+  override def init(context: InitContext): ReadContext = {
+    val baseContext = super.init(context)
+    val reorderedSchema = Spark40HoodieParquetReadSupport.reorderVariantFields(
+      baseContext.getRequestedSchema)
+    new ReadContext(reorderedSchema, baseContext.getReadSupportMetadata)
+  }
+}
+
+object Spark40HoodieParquetReadSupport {
+  /**
+   * Reorders variant group fields in the requested schema so that "value" 
precedes "metadata".
+   * This works around Spark 4.0.x's ParquetUnshreddedVariantConverter, which 
builds its
+   * converters array in hardcoded [value, metadata] order and indexes by 
schema position.
+   * parquet-mr reconciles the requested schema against the file schema by 
field name,
+   * so the correct bytes still flow to the correct converters regardless of 
file order.
+   */
+  def reorderVariantFields(schema: MessageType): MessageType = {
+    val reordered = 
schema.getFields.asScala.map(reorderVariantType).toArray[Type]
+    Types.buildMessage().addFields(reordered: _*).named(schema.getName)
+  }
+
+  private def reorderVariantType(t: Type): Type = {
+    t match {
+      case group: GroupType if isVariantGroup(group) =>
+        // Rebuild with [value, metadata] order for Spark compatibility
+        val valueField = group.getType("value")
+        val metadataField = group.getType("metadata")
+        group.withNewFields(java.util.Arrays.asList(valueField, metadataField))
+      case group: GroupType =>
+        // Recurse into nested groups
+        val children = group.getFields.asScala.map(reorderVariantType).asJava
+        group.withNewFields(children)
+      case _ => t
+    }
+  }
+
+  private def isVariantGroup(group: GroupType): Boolean = {
+    group.containsField("value") &&
+      group.containsField("metadata") &&
+      group.getType("value").isPrimitive &&
+      group.getType("metadata").isPrimitive &&
+      group.getType("value").asPrimitiveType().getPrimitiveTypeName == 
PrimitiveType.PrimitiveTypeName.BINARY &&
+      group.getType("metadata").asPrimitiveType().getPrimitiveTypeName == 
PrimitiveType.PrimitiveTypeName.BINARY
+  }

Review Comment:
   nit: can this match non-variant `GroupType` with the same schema?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -61,14 +65,63 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
                                               filters: Seq[Filter],
                                               requiredFilters: Seq[Filter],
                                               storageConfiguration: 
StorageConfiguration[_],
-                                              tableConfig: HoodieTableConfig)
+                                              tableConfig: HoodieTableConfig,
+                                              sparkDataSchema: 
Option[StructType] = None,
+                                              sparkRequiredSchema: 
Option[StructType] = None)
   extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig, 
SparkFileFormatInternalRecordContext.apply(tableConfig)) {
+
+  // Java-friendly auxiliary constructor (Scala default args don't generate 
matching Java overloads).
+  def this(baseFileReader: SparkColumnarFileReader,
+           filters: Seq[Filter],
+           requiredFilters: Seq[Filter],
+           storageConfiguration: StorageConfiguration[_],
+           tableConfig: HoodieTableConfig) =
+    this(baseFileReader, filters, requiredFilters, storageConfiguration, 
tableConfig, None, None)
+
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private lazy val recordKeyFields = 
Option(tableConfig.getRecordKeyFields.orElse(null)).map(_.map(_.toLowerCase).toSet).getOrElse(Set.empty)
   private lazy val bootstrapSafeFilters: Seq[Filter] = 
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
   private lazy val morFilters = filters.filter(filterIsSafeForPrimaryKey(_, 
recordKeyFields)) ++ requiredFilters
   private lazy val allFilters = filters ++ requiredFilters
 
+  // For each field of `target`, replace its dataType with the matching 
field's projected
+  // variant struct from `source` (when present). Non-matching fields pass 
through.
+  private def overlayVariantProjections(target: StructType, source: 
StructType): StructType = {
+    StructType(target.fields.map { f =>
+      SparkFileFormatInternalRowReaderContext.findFieldByName(source, 
f.name).map(_.dataType) match {
+        case Some(projStruct: StructType) if 
sparkAdapter.isVariantProjectionStruct(projStruct) =>
+          f.copy(dataType = projStruct)
+        case _ => f
+      }
+    })
+  }
+
+  // Aligns log-block records with the PushVariantIntoScan-projected variant 
shape before
+  // they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
+  // _tmp_metadata_row_index) which the merger reads by ordinal — projecting 
down to the
+  // bare required schema would drop them and the merger would read garbage 
offsets.
+  override def getLogBlockRecordProjection(
+      dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow, 
InternalRow]] = {
+    val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f => 
f.dataType match {
+      case st: StructType => sparkAdapter.isVariantProjectionStruct(st)
+      case _ => false
+    }))
+    if (!needsProjection) {
+      return HOption.empty[JFunction[InternalRow, InternalRow]]()
+    }
+    val req = sparkRequiredSchema.get
+    val dataStruct = HoodieInternalRowUtils.getCachedSchema(dataBlockSchema)
+    val targetStruct = overlayVariantProjections(dataStruct, req)
+    sparkAdapter.buildVariantProjector(dataStruct, targetStruct) match {
+      case Some(p) => HOption.of(new JFunction[InternalRow, InternalRow] {
+        // .copy() because the buffer stores rows into ExternalSpillableMap and
+        // UnsafeProjection reuses a single output buffer.
+        override def apply(r: InternalRow): InternalRow = p(r).copy()

Review Comment:
   @voonhous Could you check why `copy()` is required?  The log record reading 
should already have done the record copy when putting the record into the log 
record map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to