hudi-agent commented on code in PR #18674:
URL: https://github.com/apache/hudi/pull/18674#discussion_r3243731762


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -281,6 +280,28 @@ protected Pair<Function<T, T>, HoodieSchema> 
getSchemaTransformerWithEvolvedSche
     return Pair.of(transformer, evolvedSchema);
   }
 
+  /**
+   * Composes schema evolution then the engine's optional log-block record 
projection
+   * (currently only Spark 4.1's PushVariantIntoScan). Returns the evolved 
data-block schema
+   * — the projector preserves field shape, only rewriting variant fields, so 
merger
+   * metadata cols (read by ordinal) stay intact.
+   *
+   * <p>Skipped when a custom payload class is configured: {@code 
PayloadUpdateProcessor}
+   * round-trips through {@code convertToAvroRecord} against a schema that 
still types
+   * variant fields as {@code VariantType}, which would mis-decode rewritten 
rows.
+   */
+  protected Pair<Function<T, T>, HoodieSchema> 
getProjectedTransformer(HoodieDataBlock dataBlock) {
+    Pair<Function<T, T>, HoodieSchema> evolved = 
getSchemaTransformerWithEvolvedSchema(dataBlock);
+    if (payloadClasses.isPresent()) {

Review Comment:
   🤖 I wonder if this `payloadClasses.isPresent()` guard is complete. It skips 
variant projection for log records, but base records read in 
`SparkFileFormatInternalRowReaderContext.getFileRecordIterator` still get the 
overlaid schema (struct shape) unconditionally. Later in 
`HoodieAvroRecordMerger.merge`, 
`recordContext.convertToAvroRecord(older.getRecord(), 
getSchemaFromBufferRecord(older))` runs on the base record with `readerSchema` 
typing the field as `VariantType` — the same mis-decode the comment above warns 
about, but on the base side. Could you confirm there's an upstream check that 
prevents PushVariantIntoScan from running with 
`PAYLOAD_BASED_MERGE_STRATEGY_UUID`, or otherwise gate the overlay similarly? 
@yihua
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -151,6 +151,21 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
       supportVectorizedRead = false
       supportReturningBatch = false
       false
+    } else if (schema.fields.exists(f => f.dataType.isInstanceOf[StructType]

Review Comment:
   🤖 nit: could you use a pattern match instead of the 
`isInstanceOf`/`asInstanceOf` pair? Something like 
`schema.fields.exists(_.dataType match { case st: StructType => 
sparkAdapter.isVariantProjectionStruct(st); case _ => false })` is more 
idiomatic Scala and avoids the double-cast.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type, 
Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark 
4.1+ reads
+//  variant fields by name via SPARK-54410, so the reorder workaround below is 
no longer
+//  needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its 
converters
+//  array in hardcoded [value, metadata] order, then indexes by schema 
position. If the
+//  Parquet schema has [metadata, value] order (per spec), the positional 
mismatch causes
+//  MALFORMED_VARIANT. Workaround: reorder variant group fields to [value, 
metadata] in
+//  the requested schema. parquet-mr reconciles requested vs file schema by 
field name,
+//  so bytes flow correctly. Tracked in issue #18334.
+class Spark40HoodieParquetReadSupport(
+                                       convertTz: Option[ZoneId],
+                                       enableVectorizedReader: Boolean,
+                                       enableTimestampFieldRepair: Boolean,
+                                       datetimeRebaseSpec: RebaseSpec,
+                                       int96RebaseSpec: RebaseSpec,
+                                       tableSchemaOpt: 
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] = 
org.apache.hudi.common.util.Option.empty())
+  extends HoodieParquetReadSupport(

Review Comment:
   🤖 nit: could you add an import alias (e.g. `import 
org.apache.hudi.common.util.{Option => HOption}`) and use `HOption` here, 
rather than the fully-qualified name inline? That matches the convention used 
in `SparkFileFormatInternalRowReaderContext.scala` and similar files in this 
module.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala:
##########
@@ -250,6 +253,58 @@ class Spark4_1Adapter extends BaseSpark4Adapter {
     RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
   }
 
+  override def isVariantProjectionStruct(structType: StructType): Boolean = {
+    VariantMetadata.isVariantStruct(structType)
+  }
+
+  override def buildVariantProjector(sparkDataSchema: StructType,
+                                     sparkRequiredSchema: StructType): 
Option[InternalRow => InternalRow] = {
+    // Quick check: any required field a variant projection struct?
+    if (!sparkRequiredSchema.fields.exists(f => 
VariantMetadata.isVariantStruct(f.dataType))) {
+      None
+    } else {
+      // Surface mismatched schemas with both field lists rather than Spark's 
bare
+      // IllegalArgumentException from fieldIndex.
+      def lookupDataField(name: String): (Int, StructField) = {
+        val idx = sparkDataSchema.getFieldIndex(name).getOrElse(
+          throw new IllegalStateException(
+            s"Required field '$name' is absent from sparkDataSchema; " +
+              s"required=${sparkRequiredSchema.fieldNames.mkString("[", ",", 
"]")}, " +
+              s"data=${sparkDataSchema.fieldNames.mkString("[", ",", "]")}"))
+        (idx, sparkDataSchema.fields(idx))
+      }
+      val exprs: Array[Expression] = sparkRequiredSchema.fields.map { rf =>
+        rf.dataType match {
+          case projectedStruct: StructType if 
VariantMetadata.isVariantStruct(projectedStruct) =>
+            val (dataIdx, dataField) = lookupDataField(rf.name)
+            require(isVariantType(dataField.dataType),
+              s"Expected VariantType for field '${rf.name}' in data schema, 
got ${dataField.dataType}")
+            val variantRef: Expression = BoundReference(dataIdx, 
dataField.dataType, dataField.nullable)
+            val childExprs: Seq[Expression] = 
projectedStruct.fields.toSeq.flatMap { child =>
+              val vm = VariantMetadata.fromMetadata(child.metadata)
+              val pathLit = Literal(UTF8String.fromString(vm.path), 
DataTypes.StringType)
+              val tz: Option[String] = Option(vm.timeZoneId)
+              val variantGet: Expression = VariantGet(variantRef, pathLit, 
child.dataType, vm.failOnError, tz)
+              Seq(Literal(UTF8String.fromString(child.name), 
DataTypes.StringType), variantGet)
+            }
+            CreateNamedStruct(childExprs)
+          case _ =>
+            val (dataIdx, dataField) = lookupDataField(rf.name)
+            BoundReference(dataIdx, dataField.dataType, dataField.nullable)
+        }
+      }
+
+      val projection = UnsafeProjection.create(exprs.toIndexedSeq, 
DataTypeUtils.toAttributes(sparkDataSchema))
+      Some(row => projection(row))
+    }
+  }
+
+  // Apply LogicalTypeAnnotation.variantType((byte) 1) to the variant group, 
matching parquet 1.16+'s
+  // SparkToParquetSchemaConverter convention.
+  override protected def applyVariantLogicalType(builder: 
Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = {
+    builder.as(LogicalTypeAnnotation.variantType(1.toByte))
+  }

Review Comment:
   🤖 nit: the `1` is the Variant spec version — it might be worth extracting it 
as a named constant (e.g. `private val VariantSpecVersion: Byte = 1`) so a 
future reader doesn't have to cross-reference the parquet spec to understand 
what the literal represents.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to