hudi-agent commented on code in PR #18674:
URL: https://github.com/apache/hudi/pull/18674#discussion_r3243731762
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -281,6 +280,28 @@ protected Pair<Function<T, T>, HoodieSchema>
getSchemaTransformerWithEvolvedSche
return Pair.of(transformer, evolvedSchema);
}
+ /**
+ * Composes schema evolution then the engine's optional log-block record
projection
+ * (currently only Spark 4.1's PushVariantIntoScan). Returns the evolved
data-block schema
+ * — the projector preserves field shape, only rewriting variant fields, so
merger
+ * metadata cols (read by ordinal) stay intact.
+ *
+ * <p>Skipped when a custom payload class is configured: {@code
PayloadUpdateProcessor}
+ * round-trips through {@code convertToAvroRecord} against a schema that
still types
+ * variant fields as {@code VariantType}, which would mis-decode rewritten
rows.
+ */
+ protected Pair<Function<T, T>, HoodieSchema>
getProjectedTransformer(HoodieDataBlock dataBlock) {
+ Pair<Function<T, T>, HoodieSchema> evolved =
getSchemaTransformerWithEvolvedSchema(dataBlock);
+ if (payloadClasses.isPresent()) {
Review Comment:
🤖 I wonder if this `payloadClasses.isPresent()` guard is complete. It skips
variant projection for log records, but base records read in
`SparkFileFormatInternalRowReaderContext.getFileRecordIterator` still get the
overlaid schema (struct shape) unconditionally. Later in
`HoodieAvroRecordMerger.merge`,
`recordContext.convertToAvroRecord(older.getRecord(),
getSchemaFromBufferRecord(older))` runs on the base record with `readerSchema`
typing the field as `VariantType` — the same mis-decode the comment above warns
about, but on the base side. Could you confirm there's an upstream check that
prevents PushVariantIntoScan from running with
`PAYLOAD_BASED_MERGE_STRATEGY_UUID`, or otherwise gate the overlay similarly?
@yihua
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -151,6 +151,21 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
supportVectorizedRead = false
supportReturningBatch = false
false
+ } else if (schema.fields.exists(f => f.dataType.isInstanceOf[StructType]
Review Comment:
🤖 nit: could you use a pattern match instead of the
`isInstanceOf`/`asInstanceOf` pair? Something like
`schema.fields.exists(_.dataType match { case st: StructType =>
sparkAdapter.isVariantProjectionStruct(st); case _ => false })` is more
idiomatic Scala and avoids the double-cast.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type,
Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark
4.1+ reads
+// variant fields by name via SPARK-54410, so the reorder workaround below is
no longer
+// needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its
converters
+// array in hardcoded [value, metadata] order, then indexes by schema
position. If the
+// Parquet schema has [metadata, value] order (per spec), the positional
mismatch causes
+// MALFORMED_VARIANT. Workaround: reorder variant group fields to [value,
metadata] in
+// the requested schema. parquet-mr reconciles requested vs file schema by
field name,
+// so bytes flow correctly. Tracked in issue #18334.
+class Spark40HoodieParquetReadSupport(
+ convertTz: Option[ZoneId],
+ enableVectorizedReader: Boolean,
+ enableTimestampFieldRepair: Boolean,
+ datetimeRebaseSpec: RebaseSpec,
+ int96RebaseSpec: RebaseSpec,
+ tableSchemaOpt:
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] =
org.apache.hudi.common.util.Option.empty())
+ extends HoodieParquetReadSupport(
Review Comment:
🤖 nit: could you add an import alias (e.g. `import
org.apache.hudi.common.util.{Option => HOption}`) and use `HOption` here,
rather than the fully-qualified name inline? That matches the convention used
in `SparkFileFormatInternalRowReaderContext.scala` and similar files in this
module.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala:
##########
@@ -250,6 +253,58 @@ class Spark4_1Adapter extends BaseSpark4Adapter {
RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
}
+ override def isVariantProjectionStruct(structType: StructType): Boolean = {
+ VariantMetadata.isVariantStruct(structType)
+ }
+
+ override def buildVariantProjector(sparkDataSchema: StructType,
+ sparkRequiredSchema: StructType):
Option[InternalRow => InternalRow] = {
+ // Quick check: any required field a variant projection struct?
+ if (!sparkRequiredSchema.fields.exists(f =>
VariantMetadata.isVariantStruct(f.dataType))) {
+ None
+ } else {
+ // Surface mismatched schemas with both field lists rather than Spark's
bare
+ // IllegalArgumentException from fieldIndex.
+ def lookupDataField(name: String): (Int, StructField) = {
+ val idx = sparkDataSchema.getFieldIndex(name).getOrElse(
+ throw new IllegalStateException(
+ s"Required field '$name' is absent from sparkDataSchema; " +
+ s"required=${sparkRequiredSchema.fieldNames.mkString("[", ",",
"]")}, " +
+ s"data=${sparkDataSchema.fieldNames.mkString("[", ",", "]")}"))
+ (idx, sparkDataSchema.fields(idx))
+ }
+ val exprs: Array[Expression] = sparkRequiredSchema.fields.map { rf =>
+ rf.dataType match {
+ case projectedStruct: StructType if
VariantMetadata.isVariantStruct(projectedStruct) =>
+ val (dataIdx, dataField) = lookupDataField(rf.name)
+ require(isVariantType(dataField.dataType),
+ s"Expected VariantType for field '${rf.name}' in data schema,
got ${dataField.dataType}")
+ val variantRef: Expression = BoundReference(dataIdx,
dataField.dataType, dataField.nullable)
+ val childExprs: Seq[Expression] =
projectedStruct.fields.toSeq.flatMap { child =>
+ val vm = VariantMetadata.fromMetadata(child.metadata)
+ val pathLit = Literal(UTF8String.fromString(vm.path),
DataTypes.StringType)
+ val tz: Option[String] = Option(vm.timeZoneId)
+ val variantGet: Expression = VariantGet(variantRef, pathLit,
child.dataType, vm.failOnError, tz)
+ Seq(Literal(UTF8String.fromString(child.name),
DataTypes.StringType), variantGet)
+ }
+ CreateNamedStruct(childExprs)
+ case _ =>
+ val (dataIdx, dataField) = lookupDataField(rf.name)
+ BoundReference(dataIdx, dataField.dataType, dataField.nullable)
+ }
+ }
+
+ val projection = UnsafeProjection.create(exprs.toIndexedSeq,
DataTypeUtils.toAttributes(sparkDataSchema))
+ Some(row => projection(row))
+ }
+ }
+
+ // Apply LogicalTypeAnnotation.variantType((byte) 1) to the variant group,
matching parquet 1.16+'s
+ // SparkToParquetSchemaConverter convention.
+ override protected def applyVariantLogicalType(builder:
Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = {
+ builder.as(LogicalTypeAnnotation.variantType(1.toByte))
+ }
Review Comment:
🤖 nit: the `1` is the Variant spec version — it might be worth extracting it
as a named constant (e.g. `private val VariantSpecVersion: Byte = 1`) so a
future reader doesn't have to cross-reference the parquet spec to understand
what the literal represents.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]