hudi-agent commented on code in PR #18065:
URL: https://github.com/apache/hudi/pull/18065#discussion_r3375903634


##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -20,38 +20,289 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
 
 /**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant 
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link 
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code 
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider} 
loaded via reflection.</p>
  */
 public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
 
   private final Option<HoodieBloomFilterWriteSupport<String>> 
bloomFilterWriteSupportOpt;
   private final Map<String, String> footerMetadata = new HashMap<>();
   protected final Properties properties;
 
+  /**
+   * Whether variant write shredding is enabled via config.
+   */
+  private final boolean variantWriteShreddingEnabled;
+
+  /**
+   * The effective (possibly shredded) HoodieSchema used for writing.
+   */
+  private final HoodieSchema effectiveHoodieSchema;
+
+  /**
+   * The effective Avro schema (derived from effectiveHoodieSchema).
+   */
+  private final Schema effectiveAvroSchema;
+
+  /**
+   * Variant fields that need shredding, keyed by their index in the effective 
schema.
+   * Empty if no shredding is needed.
+   */
+  private final Map<Integer, ShreddedVariantField> shreddedVariantFields;
+
+  /**
+   * Provider for variant shredding (loaded via reflection). Null if no 
shredding is needed.
+   */
+  private final VariantShreddingProvider shreddingProvider;
+
+  /**
+   * Names of all variant-typed top-level fields, regardless of shredding. 
Used to fail fast on the
+   * not-yet-supported read-then-reshred path (compaction/clustering over an 
already-shredded base
+   * file). See https://github.com/apache/hudi/issues/18931.
+   */
+  private final String[] variantFieldNames;
+
   public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, 
Option<BloomFilter> bloomFilterOpt,
                                 Properties properties) {
-    super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+    this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema, 
properties), bloomFilterOpt, properties);
+  }
+
+  private HoodieAvroWriteSupport(MessageType schema, HoodieSchema 
hoodieSchema, HoodieSchema effectiveSchema,
+                                 Option<BloomFilter> bloomFilterOpt, 
Properties properties) {
+    super(schema, effectiveSchema.toAvroSchema(), 
ConvertingGenericData.INSTANCE);
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
     this.properties = properties;
     String vectorMeta = 
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
     if (!vectorMeta.isEmpty()) {
       footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
     }
+
+    this.effectiveHoodieSchema = effectiveSchema;
+    this.effectiveAvroSchema = effectiveSchema.toAvroSchema();
+    this.variantWriteShreddingEnabled = Boolean.parseBoolean(
+        properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+            
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+    // Identify variant fields that need shredding
+    Map<Integer, ShreddedVariantField> shreddedFields = new LinkedHashMap<>();
+
+    if (variantWriteShreddingEnabled && effectiveSchema.getType() == 
HoodieSchemaType.RECORD) {
+      List<HoodieSchemaField> fields = effectiveSchema.getFields();
+      for (int i = 0; i < fields.size(); i++) {
+        HoodieSchemaField field = fields.get(i);
+        HoodieSchema fieldSchema = field.schema();
+        // Unwrap nullable union to get the underlying type
+        if (fieldSchema.isNullable()) {
+          fieldSchema = fieldSchema.getNonNullType();
+        }
+        if (fieldSchema.getType() == HoodieSchemaType.VARIANT) {
+          HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
+          if (variant.isShredded() && 
variant.getTypedValueField().isPresent()) {
+            // Get the Avro sub-schema for this variant field from the 
effective schema
+            Schema fieldAvroSchema = 
effectiveAvroSchema.getFields().get(i).schema();
+            // Unwrap nullable union
+            if (fieldAvroSchema.getType() == Schema.Type.UNION) {
+              fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema);
+            }
+            shreddedFields.put(i, new ShreddedVariantField(fieldAvroSchema, 
variant));
+          }
+        }
+      }
+    }
+
+    this.shreddedVariantFields = shreddedFields;
+
+    // Collect every variant-typed field name (independent of shredding) for 
the read-then-reshred guard.
+    List<String> variantNames = new ArrayList<>();
+    if (effectiveSchema.getType() == HoodieSchemaType.RECORD) {
+      for (HoodieSchemaField field : effectiveSchema.getFields()) {
+        HoodieSchema fieldSchema = field.schema();
+        if (fieldSchema.isNullable()) {
+          fieldSchema = fieldSchema.getNonNullType();

Review Comment:
   🤖 nit: the shredding branch inside `write()` is ~30 lines and does its own 
field-by-field copy/transform. Could you extract it into a private 
`GenericRecord shredRecord(IndexedRecord)` helper so the top-level `write()` 
reads as just guard + (shred or pass-through)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java:
##########
@@ -139,9 +141,42 @@ private HoodieAvroWriteSupport 
getHoodieAvroWriteSupport(HoodieSchema schema,
                                                            
StorageConfiguration storageConf,
                                                            boolean 
enableBloomFilter) {
     Option<BloomFilter> filter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
+    HoodieSchema effectiveSchema = 
HoodieAvroWriteSupport.generateEffectiveSchema(schema, config);
+    // Work on a copy so we never mutate the shared config's internal 
Properties.
+    Properties props = TypedProperties.copy(config.getProps());
+    // Auto-detect variant shredding provider from classpath if not explicitly 
configured
+    if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
+      String detected = detectShreddingProvider();
+      if (detected != null) {
+        props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(), 
detected);
+      }
+    }
     return (HoodieAvroWriteSupport) ReflectionUtils.loadClass(
         
config.getStringOrDefault(HoodieStorageConfig.HOODIE_AVRO_WRITE_SUPPORT_CLASS),
         new Class<?>[] {MessageType.class, HoodieSchema.class, Option.class, 
Properties.class},
-        getAvroSchemaConverter((Configuration) 
storageConf.unwrapAs(Configuration.class)).convert(schema), schema, filter, 
config.getProps());
+        // Build the Parquet schema from the effective (possibly shredded) 
schema so the message type
+        // matches the records actually written - a shredded variant has a 
nullable value and a
+        // typed_value column; converting the original schema would mark value 
REQUIRED and drop
+        // typed_value, failing the write with "Null-value for required field: 
value".
+        getAvroSchemaConverter((Configuration) 
storageConf.unwrapAs(Configuration.class)).convert(effectiveSchema), schema, 
filter, props);
+  }
+
+  /**
+   * Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider} 
implementation
+   * available on the classpath. Returns the fully-qualified class name if 
found, or null.
+   */
+  private static String detectShreddingProvider() {
+    String[] candidates = {
+        "org.apache.hudi.variant.Spark4VariantShreddingProvider"

Review Comment:
   🤖 nit: the `candidates` array has a single entry today — until a second 
provider exists, a single `String CANDIDATE` constant + one Class.forName would 
be clearer. Or leave a comment noting which additional providers are expected 
to be added here.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -20,38 +20,289 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
 
 /**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant 
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link 
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code 
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider} 
loaded via reflection.</p>
  */
 public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
 
   private final Option<HoodieBloomFilterWriteSupport<String>> 
bloomFilterWriteSupportOpt;
   private final Map<String, String> footerMetadata = new HashMap<>();
   protected final Properties properties;
 
+  /**
+   * Whether variant write shredding is enabled via config.
+   */
+  private final boolean variantWriteShreddingEnabled;
+
+  /**
+   * The effective (possibly shredded) HoodieSchema used for writing.
+   */
+  private final HoodieSchema effectiveHoodieSchema;
+
+  /**
+   * The effective Avro schema (derived from effectiveHoodieSchema).
+   */
+  private final Schema effectiveAvroSchema;
+
+  /**
+   * Variant fields that need shredding, keyed by their index in the effective 
schema.
+   * Empty if no shredding is needed.
+   */
+  private final Map<Integer, ShreddedVariantField> shreddedVariantFields;
+
+  /**
+   * Provider for variant shredding (loaded via reflection). Null if no 
shredding is needed.
+   */
+  private final VariantShreddingProvider shreddingProvider;
+
+  /**
+   * Names of all variant-typed top-level fields, regardless of shredding. 
Used to fail fast on the
+   * not-yet-supported read-then-reshred path (compaction/clustering over an 
already-shredded base
+   * file). See https://github.com/apache/hudi/issues/18931.
+   */
+  private final String[] variantFieldNames;
+
   public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, 
Option<BloomFilter> bloomFilterOpt,
                                 Properties properties) {
-    super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+    this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema, 
properties), bloomFilterOpt, properties);
+  }
+
+  private HoodieAvroWriteSupport(MessageType schema, HoodieSchema 
hoodieSchema, HoodieSchema effectiveSchema,
+                                 Option<BloomFilter> bloomFilterOpt, 
Properties properties) {
+    super(schema, effectiveSchema.toAvroSchema(), 
ConvertingGenericData.INSTANCE);
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
     this.properties = properties;
     String vectorMeta = 
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
     if (!vectorMeta.isEmpty()) {
       footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
     }
+
+    this.effectiveHoodieSchema = effectiveSchema;

Review Comment:
   🤖 nit: the constructor walks `effectiveSchema.getFields()` twice — once to 
populate `shreddedVariantFields`, then again to populate `variantFieldNames`. 
Could you fold them into a single pass so the two views of the same fields stay 
obviously in sync?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala:
##########
@@ -542,4 +546,160 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
 
     spark.sql(s"drop table $tableName")
   }
+
+  test("Test Shredded Variant Write and Read + Validate Parquet Schema after 
Write") {
+    assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
+
+    // Test 1: Shredding enabled with forced schema → parquet should have 
typed_value
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  v variant,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+        """.stripMargin)
+
+      spark.sql("set hoodie.parquet.variant.write.shredding.enabled = true")
+      spark.sql("set hoodie.parquet.variant.allow.reading.shredded = true")
+      spark.sql("set hoodie.parquet.variant.force.shredding.schema.for.test = 
a int, b string")
+
+      spark.sql(
+        s"""
+           |insert into $tableName values
+           |  (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
+        """.stripMargin)
+      checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+        Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+      )
+
+      // Verify parquet schema has shredded structure with typed_value
+      val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
+      assert(parquetFiles.nonEmpty, "Should have at least one data parquet 
file")
+
+      parquetFiles.foreach { filePath =>
+        val schema = readParquetSchema(filePath)
+        val variantGroup = getFieldAsGroup(schema, "v")
+        assert(groupContainsField(variantGroup, "typed_value"),
+          s"Shredded variant should have typed_value field. 
Schema:\n$variantGroup")
+        val valueField = 
variantGroup.getType(variantGroup.getFieldIndex("value"))
+        assert(valueField.getRepetition == Type.Repetition.OPTIONAL,
+          "Shredded variant value field should be OPTIONAL")
+        val metadataField = 
variantGroup.getType(variantGroup.getFieldIndex("metadata"))
+        assert(metadataField.getRepetition == Type.Repetition.REQUIRED,
+          "Shredded variant metadata field should be REQUIRED")
+      }
+    })
+  }
+
+  test("Test Unshredded Variant Write and Read + Validate Parquet Schema after 
Write") {
+    assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
+    // Shredding disabled parquet should NOT have typed_value
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  v variant,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+              """.stripMargin)
+
+      spark.sql(s"set hoodie.parquet.variant.write.shredding.enabled = false")
+
+      spark.sql(
+        s"""
+           |insert into $tableName values
+           |  (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+        Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+      )
+
+      // Verify parquet schema does NOT have typed_value
+      val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
+      assert(parquetFiles.nonEmpty, "Should have at least one data parquet 
file")
+
+      parquetFiles.foreach { filePath =>
+        val schema = readParquetSchema(filePath)
+        val variantGroup = getFieldAsGroup(schema, "v")
+        assert(!groupContainsField(variantGroup, "typed_value"),
+          s"Non-shredded variant should NOT have typed_value field. 
Schema:\n$variantGroup")
+        val valueField = 
variantGroup.getType(variantGroup.getFieldIndex("value"))
+        assert(valueField.getRepetition == Type.Repetition.REQUIRED,
+          "Non-shredded variant value field should be REQUIRED")
+      }
+
+      // Verify data can still be read back for the non-shredded case
+      checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+        Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+      )
+    })
+  }
+
+  /**
+   * Lists data parquet files in the table directory, excluding Hudi metadata 
files.
+   */
+  private def listDataParquetFiles(tablePath: String): Seq[String] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val fs = FileSystem.get(new HadoopPath(tablePath).toUri, conf)
+    val iter = fs.listFiles(new HadoopPath(tablePath), true)
+    val files = scala.collection.mutable.ArrayBuffer[String]()
+    while (iter.hasNext) {
+      val file = iter.next()
+      val path = file.getPath.toString
+      if (path.endsWith(".parquet") && !path.contains(".hoodie")) {
+        files += path
+      }
+    }
+    files.toSeq
+  }
+
+  /**
+   * Reads the Parquet schema (MessageType) from a parquet file.
+   */
+  private def readParquetSchema(filePath: String): MessageType = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val inputFile = HadoopInputFile.fromPath(new HadoopPath(filePath), conf)
+    val reader = ParquetFileReader.open(inputFile)
+    try {
+      reader.getFooter.getFileMetaData.getSchema
+    } finally {
+      reader.close()
+    }
+  }
+
+  /**
+   * Gets a named field from a GroupType (MessageType) and returns it as a 
GroupType.
+   * Uses getFieldIndex(String) + getType(int) to avoid Scala overload 
resolution issues.
+   */
+  private def getFieldAsGroup(parent: GroupType, fieldName: String): GroupType 
= {
+    val idx: Int = parent.getFieldIndex(fieldName)
+    parent.getType(idx).asGroupType()
+  }
+
+  /**
+   * Checks whether a GroupType contains a field with the given name.
+   */
+  private def groupContainsField(group: GroupType, fieldName: String): Boolean 
= {
+    group.containsField(fieldName)

Review Comment:
   🤖 nit: `groupContainsField` is a one-line passthrough to 
`group.containsField(fieldName)` — could you just inline the call at the two 
callsites and drop the helper?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to