This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch variant-intro-shredded-avro-support
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit afc6b4b5b06e298ccd92f0eb50b3bf0701a67844
Author: voon <[email protected]>
AuthorDate: Fri Jan 30 16:57:16 2026 +0800

    feat(schema): Add read + write support for shredded for AVRO
    
    - Added support to write shredded types for HoodieRecordType.AVRO
    - Added functional tests for testing newly added configs
---
 .../hudi/common/config/HoodieStorageConfig.java    |  10 +
 .../apache/hudi/avro/HoodieAvroWriteSupport.java   | 396 +++++++++++++++++++-
 .../apache/hudi/avro/VariantShreddingProvider.java |  66 ++++
 .../hadoop/HoodieAvroFileWriterFactory.java        |  31 +-
 .../hudi/HoodieHadoopFsRelationFactory.scala       |  13 +
 .../sql/hudi/dml/schema/TestVariantDataType.scala  | 166 +++++++++
 .../variant/Spark4VariantShreddingProvider.java    | 405 +++++++++++++++++++++
 .../spark/sql/adapter/BaseSpark4Adapter.scala      |  21 +-
 .../datasources/parquet/Spark40ParquetReader.scala |   3 +
 9 files changed, 1097 insertions(+), 14 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index fac0719fd76b..1a29b8c94257 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -198,6 +198,16 @@ public class HoodieStorageConfig extends HoodieConfig {
           + "When disabled, only unshredded variant data can be read. "
           + "Equivalent to Spark's spark.sql.variant.allowReadingShredded.");
 
+  public static final ConfigProperty<String> 
PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS = ConfigProperty
+      .key("hoodie.parquet.variant.shredding.provider.class")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Fully-qualified class name of the 
VariantShreddingProvider implementation "
+          + "used to shred variant values at write time in the Avro record 
path. "
+          + "The provider parses variant binary data and populates typed_value 
columns. "
+          + "When not set, the provider is auto-detected from the classpath.");
+
   public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE = 
ConfigProperty
       .key("hoodie.parquet.write.utc-timezone.enabled")
       .defaultValue(true)
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index 547b02787369..ac674cbf60cf 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -20,22 +20,45 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
 
 /**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant 
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link 
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code 
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider} 
loaded via reflection.</p>
  */
 public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
 
@@ -43,15 +66,207 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
   private final Map<String, String> footerMetadata = new HashMap<>();
   protected final Properties properties;
 
+  /**
+   * Whether variant write shredding is enabled via config.
+   */
+  private final boolean variantWriteShreddingEnabled;
+
+  /**
+   * The effective (possibly shredded) HoodieSchema used for writing.
+   */
+  private final HoodieSchema effectiveHoodieSchema;
+
+  /**
+   * The effective Avro schema (derived from effectiveHoodieSchema).
+   */
+  private final Schema effectiveAvroSchema;
+
+  /**
+   * Indices of top-level variant fields that need shredding transformation.
+   * Empty array if no shredding is needed.
+   */
+  private final int[] shreddedVariantFieldIndices;
+
+  /**
+   * The shredded Avro sub-schema for each variant field at the corresponding 
index.
+   * Indexed by position in {@link #shreddedVariantFieldIndices}.
+   */
+  private final Schema[] shreddedVariantAvroSchemas;
+
+  /**
+   * The HoodieSchema.Variant for each variant field at the corresponding 
index.
+   * Indexed by position in {@link #shreddedVariantFieldIndices}.
+   */
+  private final HoodieSchema.Variant[] shreddedVariantHoodieSchemas;
+
+  /**
+   * Provider for variant shredding (loaded via reflection). Null if no 
shredding is needed.
+   */
+  private final VariantShreddingProvider shreddingProvider;
+
   public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, 
Option<BloomFilter> bloomFilterOpt,
                                 Properties properties) {
-    super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+    this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema, 
properties), bloomFilterOpt, properties);
+  }
+
+  private HoodieAvroWriteSupport(MessageType schema, HoodieSchema 
hoodieSchema, HoodieSchema effectiveSchema,
+                                 Option<BloomFilter> bloomFilterOpt, 
Properties properties) {
+    super(schema, effectiveSchema.toAvroSchema(), 
ConvertingGenericData.INSTANCE);
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
     this.properties = properties;
     String vectorMeta = 
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
     if (!vectorMeta.isEmpty()) {
       footerMetadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY, 
vectorMeta);
     }
+
+    this.effectiveHoodieSchema = effectiveSchema;
+    this.effectiveAvroSchema = effectiveSchema.toAvroSchema();
+    this.variantWriteShreddingEnabled = Boolean.parseBoolean(
+        properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+            
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+    // Identify variant fields that need shredding
+    List<Integer> variantIndices = new ArrayList<>();
+    List<Schema> variantAvroSchemas = new ArrayList<>();
+    List<HoodieSchema.Variant> variantHoodieSchemas = new ArrayList<>();
+
+    if (variantWriteShreddingEnabled && effectiveSchema.getType() == 
HoodieSchemaType.RECORD) {
+      List<HoodieSchemaField> fields = effectiveSchema.getFields();
+      for (int i = 0; i < fields.size(); i++) {
+        HoodieSchemaField field = fields.get(i);
+        HoodieSchema fieldSchema = field.schema();
+        // Unwrap nullable union to get the underlying type
+        if (fieldSchema.isNullable()) {
+          fieldSchema = fieldSchema.getNonNullType();
+        }
+        if (fieldSchema.getType() == HoodieSchemaType.VARIANT) {
+          HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
+          if (variant.isShredded() && 
variant.getTypedValueField().isPresent()) {
+            variantIndices.add(i);
+            // Get the Avro sub-schema for this variant field from the 
effective schema
+            Schema fieldAvroSchema = 
effectiveAvroSchema.getFields().get(i).schema();
+            // Unwrap nullable union
+            if (fieldAvroSchema.getType() == Schema.Type.UNION) {
+              fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema);
+            }
+            variantAvroSchemas.add(fieldAvroSchema);
+            variantHoodieSchemas.add(variant);
+          }
+        }
+      }
+    }
+
+    this.shreddedVariantFieldIndices = 
variantIndices.stream().mapToInt(Integer::intValue).toArray();
+    this.shreddedVariantAvroSchemas = variantAvroSchemas.toArray(new 
Schema[0]);
+    this.shreddedVariantHoodieSchemas = variantHoodieSchemas.toArray(new 
HoodieSchema.Variant[0]);
+
+    // Load shredding provider via reflection if needed
+    if (shreddedVariantFieldIndices.length > 0) {
+      String providerClass = 
properties.getProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key());
+      if (providerClass != null && !providerClass.isEmpty()) {
+        this.shreddingProvider = (VariantShreddingProvider) 
ReflectionUtils.loadClass(providerClass);
+      } else {
+        this.shreddingProvider = null;
+      }
+    } else {
+      this.shreddingProvider = null;
+    }
+  }
+
+  /**
+   * Generates the effective schema for writing, applying variant shredding 
configuration.
+   *
+   * <p>When shredding is disabled, shredded variant fields are replaced with 
unshredded
+   * variants (removing {@code typed_value}) so that the Parquet file does not 
contain
+   * unused typed_value columns.</p>
+   *
+   * <p>When shredding is enabled and a forced shredding schema is configured 
via
+   * {@link 
HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST},
+   * all variant fields are replaced with shredded variants using the forced 
schema.
+   * This handles the case where the input schema is unshredded but shredding 
is desired.</p>
+   *
+   * <p>When shredding is enabled without a forced schema, the schema is 
returned as-is
+   * (already-shredded variants stay shredded, unshredded variants stay 
unshredded).</p>
+   *
+   * @param hoodieSchema the original HoodieSchema
+   * @param properties   writer properties containing shredding configuration
+   * @return the effective schema to use for writing
+   */
+  public static HoodieSchema generateEffectiveSchema(HoodieSchema 
hoodieSchema, Properties properties) {
+    boolean shreddingEnabled = Boolean.parseBoolean(
+        properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+            
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+    if (!shreddingEnabled) {
+      // Schemas from clustering/compaction may still be shredded (read from 
on-disk Parquet files
+      // written with shredding enabled), so we need to strip typed_value when 
shredding
+      // is disabled.
+      return unshreddVariantFields(hoodieSchema);
+    }
+
+    // Check if a forced shredding schema is configured
+    String forceShreddingSchema = properties.getProperty(
+        PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key());
+    if (forceShreddingSchema != null && !forceShreddingSchema.isEmpty()) {
+      return applyForcedShreddingSchema(hoodieSchema, forceShreddingSchema);
+    }
+
+    // When enabled without forced schema, use the schema as-is
+    // (shredded variants stay shredded, unshredded variants stay unshredded)
+    return hoodieSchema;
+  }
+
+  /**
+   * Overloaded version accepting HoodieConfig for use by factories.
+   */
+  public static HoodieSchema generateEffectiveSchema(HoodieSchema 
hoodieSchema, HoodieConfig config) {
+    return generateEffectiveSchema(hoodieSchema, config.getProps());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void write(T record) {
+    if (shreddedVariantFieldIndices.length > 0 && shreddingProvider != null) {
+      IndexedRecord inputRecord = (IndexedRecord) record;
+      GenericRecord shreddedRecord = new 
GenericData.Record(effectiveAvroSchema);
+
+      // Copy all fields, transforming variant fields that need shredding
+      List<Schema.Field> effectiveFields = effectiveAvroSchema.getFields();
+      Schema inputSchema = inputRecord.getSchema();
+
+      for (int i = 0; i < effectiveFields.size(); i++) {
+        Schema.Field effectiveField = effectiveFields.get(i);
+        String fieldName = effectiveField.name();
+        Schema.Field inputField = inputSchema.getField(fieldName);
+        if (inputField == null) {
+          continue;
+        }
+
+        int variantIdx = findVariantIndex(i);
+        if (variantIdx >= 0) {
+          // This is a shredded variant field - transform it
+          Object fieldValue = inputRecord.get(inputField.pos());
+          if (fieldValue instanceof GenericRecord) {
+            GenericRecord variantRecord = (GenericRecord) fieldValue;
+            GenericRecord shreddedVariant = 
shreddingProvider.shredVariantRecord(
+                variantRecord,
+                shreddedVariantAvroSchemas[variantIdx],
+                shreddedVariantHoodieSchemas[variantIdx]);
+            shreddedRecord.put(i, shreddedVariant);
+          } else {
+            // Null or unexpected type - copy as-is
+            shreddedRecord.put(i, fieldValue);
+          }
+        } else {
+          // Non-variant field - copy as-is
+          shreddedRecord.put(i, inputRecord.get(inputField.pos()));
+        }
+      }
+
+      super.write((T) shreddedRecord);
+    } else {
+      super.write(record);
+    }
   }
 
   @Override
@@ -74,6 +289,181 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
     footerMetadata.put(key, value);
   }
 
+  /**
+   * Finds the position in {@link #shreddedVariantFieldIndices} for the given 
effective field index,
+   * or -1 if this field is not a variant field that needs shredding.
+   */
+  private int findVariantIndex(int effectiveFieldIndex) {
+    for (int i = 0; i < shreddedVariantFieldIndices.length; i++) {
+      if (shreddedVariantFieldIndices[i] == effectiveFieldIndex) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+      "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+  /**
+   * Applies a forced shredding schema to all variant fields in the given 
schema.
+   * The forced schema DDL (e.g., {@code "a int, b string"}) defines the 
typed_value
+   * fields that will be added to each variant column.
+   */
+  private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema, 
String ddl) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      return schema;
+    }
+
+    Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<HoodieSchemaField> newFields = new ArrayList<>();
+    boolean changed = false;
+
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema();
+      boolean wasNullable = fieldSchema.isNullable();
+      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
+
+      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+        HoodieSchema.Variant shreddedVariant = 
HoodieSchema.createVariantShreddedObject(
+            unwrapped.getAvroSchema().getName(),
+            unwrapped.getAvroSchema().getNamespace(),
+            unwrapped.getAvroSchema().getDoc(),
+            shreddedFields);
+        HoodieSchema replacement = wasNullable
+            ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+        
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+        changed = true;
+      } else {
+        newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+      }
+    }
+
+    if (!changed) {
+      return schema;
+    }
+
+    return HoodieSchema.createRecord(
+        schema.getAvroSchema().getName(),
+        schema.getAvroSchema().getNamespace(),
+        schema.getAvroSchema().getDoc(),
+        newFields);
+  }
+
+  /**
+   * Parses a DDL-style shredding schema string (e.g., {@code "a int, b 
string, c decimal(15,1)"})
+   * into a map of field names to their HoodieSchema types.
+   */
+  private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) {
+    Map<String, HoodieSchema> fields = new LinkedHashMap<>();
+    for (String fieldDef : ddl.split(",")) {
+      String trimmed = fieldDef.trim();
+      if (trimmed.isEmpty()) {
+        continue;
+      }
+      String[] parts = trimmed.split("\\s+", 2);
+      if (parts.length != 2) {
+        throw new IllegalArgumentException(
+            "Invalid shredding DDL field definition (expected 'name type'): " 
+ trimmed);
+      }
+      fields.put(parts[0].trim(), parseSimpleType(parts[1].trim()));
+    }
+    return fields;
+  }
+
+  /**
+   * Parses a simple type name into a HoodieSchema.
+   * Supports common types: int, long, string, double, float, boolean, binary, 
decimal(p,s).
+   */
+  private static HoodieSchema parseSimpleType(String type) {
+    String lower = type.toLowerCase();
+    switch (lower) {
+      case "int":
+      case "integer":
+        return HoodieSchema.create(HoodieSchemaType.INT);
+      case "long":
+      case "bigint":
+        return HoodieSchema.create(HoodieSchemaType.LONG);
+      case "string":
+        return HoodieSchema.create(HoodieSchemaType.STRING);
+      case "double":
+        return HoodieSchema.create(HoodieSchemaType.DOUBLE);
+      case "float":
+        return HoodieSchema.create(HoodieSchemaType.FLOAT);
+      case "boolean":
+        return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+      case "binary":
+        return HoodieSchema.create(HoodieSchemaType.BYTES);
+      default:
+        Matcher m = DECIMAL_PATTERN.matcher(lower);
+        if (m.matches()) {
+          return HoodieSchema.createDecimal(
+              Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2)));
+        }
+        throw new IllegalArgumentException("Unsupported shredding type: " + 
type);
+    }
+  }
+
+  /**
+   * Strips shredding from variant fields in the schema.
+   * Replaces shredded variant fields with unshredded variants (removing 
typed_value).
+   */
+  private static HoodieSchema unshreddVariantFields(HoodieSchema schema) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      return schema;
+    }
+
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<HoodieSchemaField> newFields = new ArrayList<>();
+    boolean changed = false;
+
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema();
+      boolean wasNullable = fieldSchema.isNullable();
+      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
+
+      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+        HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
+        if (variant.isShredded()) {
+          // Replace with unshredded variant
+          HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
+              unwrapped.getAvroSchema().getName(),
+              unwrapped.getAvroSchema().getNamespace(),
+              unwrapped.getAvroSchema().getDoc());
+          HoodieSchema replacement = wasNullable ? 
HoodieSchema.createNullable(unshredded) : unshredded;
+          newFields.add(field.withSchema(replacement));
+          changed = true;
+          continue;
+        }
+      }
+      newFields.add(field);
+    }
+
+    if (!changed) {
+      return schema;
+    }
+
+    return HoodieSchema.createRecord(
+        schema.getAvroSchema().getName(),
+        schema.getAvroSchema().getNamespace(),
+        schema.getAvroSchema().getDoc(),
+        newFields);
+  }
+
+  /**
+   * Extracts the non-null type from a union schema.
+   */
+  private static Schema getNonNullFromUnion(Schema unionSchema) {
+    for (Schema type : unionSchema.getTypes()) {
+      if (type.getType() != Schema.Type.NULL) {
+        return type;
+      }
+    }
+    throw new IllegalArgumentException("Union schema does not contain a 
non-null type: " + unionSchema);
+  }
+
   private static class HoodieBloomFilterAvroWriteSupport extends 
HoodieBloomFilterWriteSupport<String> {
     public HoodieBloomFilterAvroWriteSupport(BloomFilter bloomFilter) {
       super(bloomFilter);
@@ -84,4 +474,4 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
       return StringUtils.getUTF8Bytes(key);
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
new file mode 100644
index 000000000000..90ffbee1e53f
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Interface for shredding variant values at write time.
+ * <p>
+ * Implementations parse variant binary data (value + metadata bytes) and 
produce
+ * a shredded {@link GenericRecord} with typed_value columns populated 
according
+ * to the shredding schema.
+ * <p>
+ * This interface allows the variant binary parsing logic (which may depend on
+ * engine-specific libraries like Spark's variant module) to be loaded via 
reflection,
+ * keeping the core write support free of engine-specific dependencies.
+ */
+public interface VariantShreddingProvider {
+
+  /**
+   * Transform an unshredded variant GenericRecord into a shredded one.
+   * <p>
+   * The input record is expected to have:
+   * <ul>
+   *   <li>{@code value}: ByteBuffer containing the variant value binary</li>
+   *   <li>{@code metadata}: ByteBuffer containing the variant metadata 
binary</li>
+   * </ul>
+   * <p>
+   * The output record should conform to {@code shreddedSchema} and have:
+   * <ul>
+   *   <li>{@code value}: ByteBuffer or null (null when typed_value captures 
the full value)</li>
+   *   <li>{@code metadata}: ByteBuffer (always present)</li>
+   *   <li>{@code typed_value}: the typed representation extracted from the 
variant binary,
+   *       or null if the variant type does not match the typed_value 
schema</li>
+   * </ul>
+   *
+   * @param unshreddedVariant GenericRecord with {value: ByteBuffer, metadata: 
ByteBuffer}
+   * @param shreddedSchema    target Avro schema with {value: nullable 
ByteBuffer, metadata: ByteBuffer, typed_value: type}
+   * @param variantSchema     HoodieSchema.Variant containing the shredding 
schema information
+   * @return a GenericRecord conforming to shreddedSchema with typed_value 
populated where possible
+   */
+  GenericRecord shredVariantRecord(
+      GenericRecord unshreddedVariant,
+      Schema shreddedSchema,
+      HoodieSchema.Variant variantSchema);
+}
\ No newline at end of file
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
index 5be171d35c61..afcb5803ab9e 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
@@ -49,6 +49,7 @@ import java.io.OutputStream;
 import java.util.Properties;
 
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WRITER_TO_ALLOW_DUPLICATES;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
 import static 
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
 
 public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
@@ -123,9 +124,37 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
   private HoodieAvroWriteSupport getHoodieAvroWriteSupport(HoodieSchema schema,
                                                            HoodieConfig 
config, boolean enableBloomFilter) {
     Option<BloomFilter> filter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
+    HoodieSchema effectiveSchema = 
HoodieAvroWriteSupport.generateEffectiveSchema(schema, config);
+    Properties props = config.getProps();
+    // Auto-detect variant shredding provider from classpath if not explicitly 
configured
+    if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
+      String detected = detectShreddingProvider();
+      if (detected != null) {
+        props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(), 
detected);
+      }
+    }
     return (HoodieAvroWriteSupport) ReflectionUtils.loadClass(
         
config.getStringOrDefault(HoodieStorageConfig.HOODIE_AVRO_WRITE_SUPPORT_CLASS),
         new Class<?>[] {MessageType.class, HoodieSchema.class, Option.class, 
Properties.class},
-        
getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(schema),
 schema, filter, config.getProps());
+        
getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(effectiveSchema),
 schema, filter, props);
+  }
+
+  /**
+   * Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider} 
implementation
+   * available on the classpath. Returns the fully-qualified class name if 
found, or null.
+   */
+  private static String detectShreddingProvider() {
+    String[] candidates = {
+        "org.apache.hudi.variant.Spark4VariantShreddingProvider"
+    };
+    for (String candidate : candidates) {
+      try {
+        Class.forName(candidate);
+        return candidate;
+      } catch (ClassNotFoundException e) {
+        // not on classpath, try next
+      }
+    }
+    return null;
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index a11e3075a6fb..d3449fcc56f6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.execution.datasources._
 import 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedFileFormat
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
@@ -62,6 +63,18 @@ abstract class HoodieBaseHadoopFsRelationFactory(val 
sqlContext: SQLContext,
                                                  val schemaSpec: 
Option[StructType],
                                                  val isBootstrap: Boolean
                                                 ) extends SparkAdapterSupport 
with HoodieHadoopFsRelationFactory with Logging {
+  // Propagate Hudi's variant allow-reading-shredded config to Spark's SQLConf.
+  // ParquetToSparkSchemaConverter reads this from SQLConf.get(), so it must 
be set
+  // before query execution starts here during table resolution
+  if (HoodieSparkUtils.gteqSpark4_0) {
+    val sqlConf = sqlContext.sparkSession.sessionState.conf
+    val hoodieParquetAllowReadingShreddedConfKey = 
"hoodie.parquet.variant.allow.reading.shredded"
+    val allowReadingShredded = options.getOrElse(
+      hoodieParquetAllowReadingShreddedConfKey,
+      sqlConf.getConfString(hoodieParquetAllowReadingShreddedConfKey, "true"))
+    sqlConf.setConfString(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key, 
allowReadingShredded)
+  }
+
   protected lazy val sparkSession: SparkSession = sqlContext.sparkSession
   protected lazy val optParams: Map[String, String] = options
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index a236592bc1d4..421525682000 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -24,6 +24,10 @@ import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.internal.schema.HoodieSchemaException
 
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.util.HadoopInputFile
+import org.apache.parquet.schema.{GroupType, MessageType, Type}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 
 
@@ -189,4 +193,166 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
 
     spark.sql(s"drop table $tableName")
   }
+
+  test("Test Shredded Variant Write and Read + Validate Parquet Schema after 
Write") {
+    assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
+
+    // Test 1: Shredding enabled with forced schema → parquet should have 
typed_value
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  v variant,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+        """.stripMargin)
+
+      spark.sql("set hoodie.parquet.variant.write.shredding.enabled = true")
+      spark.sql("set hoodie.parquet.variant.allow.reading.shredded = true")
+      spark.sql("set hoodie.parquet.variant.force.shredding.schema.for.test = 
a int, b string")
+
+      spark.sql(
+        s"""
+           |insert into $tableName values
+           |  (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
+        """.stripMargin)
+      checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+        Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+      )
+
+      // Verify parquet schema has shredded structure with typed_value
+      val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
+      assert(parquetFiles.nonEmpty, "Should have at least one data parquet 
file")
+
+      parquetFiles.foreach { filePath =>
+        val schema = readParquetSchema(filePath)
+        val variantGroup = getFieldAsGroup(schema, "v")
+        assert(groupContainsField(variantGroup, "typed_value"),
+          s"Shredded variant should have typed_value field. 
Schema:\n$variantGroup")
+        val valueField = 
variantGroup.getType(variantGroup.getFieldIndex("value"))
+        assert(valueField.getRepetition == Type.Repetition.OPTIONAL,
+          "Shredded variant value field should be OPTIONAL")
+        val metadataField = 
variantGroup.getType(variantGroup.getFieldIndex("metadata"))
+        assert(metadataField.getRepetition == Type.Repetition.REQUIRED,
+          "Shredded variant metadata field should be REQUIRED")
+      }
+    })
+  }
+
+  test("Test Unshredded Variant Write and Read + Validate Parquet Schema after 
Write") {
+    assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
+    // Shredding disabled parquet should NOT have typed_value
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  v variant,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+              """.stripMargin)
+
+      spark.sql(s"set hoodie.parquet.variant.write.shredding.enabled = false")
+
+      spark.sql(
+        s"""
+           |insert into $tableName values
+           |  (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+        Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+      )
+
+      // Verify parquet schema does NOT have typed_value
+      val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
+      assert(parquetFiles.nonEmpty, "Should have at least one data parquet 
file")
+
+      parquetFiles.foreach { filePath =>
+        val schema = readParquetSchema(filePath)
+        val variantGroup = getFieldAsGroup(schema, "v")
+        assert(!groupContainsField(variantGroup, "typed_value"),
+          s"Non-shredded variant should NOT have typed_value field. 
Schema:\n$variantGroup")
+        val valueField = 
variantGroup.getType(variantGroup.getFieldIndex("value"))
+        assert(valueField.getRepetition == Type.Repetition.REQUIRED,
+          "Non-shredded variant value field should be REQUIRED")
+      }
+
+      // Verify data can still be read back for the non-shredded case
+      checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+        Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+      )
+    })
+  }
+
+  /**
+   * Lists data parquet files in the table directory, excluding Hudi metadata 
files.
+   */
+  private def listDataParquetFiles(tablePath: String): Seq[String] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val fs = FileSystem.get(new HadoopPath(tablePath).toUri, conf)
+    val iter = fs.listFiles(new HadoopPath(tablePath), true)
+    val files = scala.collection.mutable.ArrayBuffer[String]()
+    while (iter.hasNext) {
+      val file = iter.next()
+      val path = file.getPath.toString
+      if (path.endsWith(".parquet") && !path.contains(".hoodie")) {
+        files += path
+      }
+    }
+    files.toSeq
+  }
+
+  /**
+   * Reads the Parquet schema (MessageType) from a parquet file.
+   */
+  private def readParquetSchema(filePath: String): MessageType = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val inputFile = HadoopInputFile.fromPath(new HadoopPath(filePath), conf)
+    val reader = ParquetFileReader.open(inputFile)
+    try {
+      reader.getFooter.getFileMetaData.getSchema
+    } finally {
+      reader.close()
+    }
+  }
+
+  /**
+   * Gets a named field from a GroupType (MessageType) and returns it as a 
GroupType.
+   * Uses getFieldIndex(String) + getType(int) to avoid Scala overload 
resolution issues.
+   */
+  private def getFieldAsGroup(parent: GroupType, fieldName: String): GroupType 
= {
+    val idx: Int = parent.getFieldIndex(fieldName)
+    parent.getType(idx).asGroupType()
+  }
+
+  /**
+   * Checks whether a GroupType contains a field with the given name.
+   * Uses try/catch on getFieldIndex to avoid Scala-Java collection converter 
dependencies.
+   */
+  private def groupContainsField(group: GroupType, fieldName: String): Boolean 
= {
+    try {
+      group.getFieldIndex(fieldName)
+      true
+    } catch {
+      case _: Exception => false
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
new file mode 100644
index 000000000000..ae981a22f5af
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.variant;
+
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantSchema;
+import org.apache.spark.types.variant.VariantShreddingWriter;
+import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult;
+import 
org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Implementation of {@link VariantShreddingProvider} using Spark 4's variant 
parsing library.
+ *
+ * <p>This class bridges the Avro record path and Spark's {@link 
VariantShreddingWriter}
+ * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It 
converts
+ * the shredded output into Avro {@link GenericRecord}s that can be written via
+ * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p>
+ *
+ * <p>The shredding logic is delegated to {@link 
VariantShreddingWriter#castShredded},
+ * which handles scalar, object, and array shredding including residual value 
construction
+ * for non-matching fields. This class implements the {@link ShreddedResult} 
and
+ * {@link ShreddedResultBuilder} interfaces to collect the shredded components 
into
+ * Avro GenericRecords.</p>
+ */
+public class Spark4VariantShreddingProvider implements 
VariantShreddingProvider {
+
+  private static final String VALUE_FIELD = "value";
+  private static final String METADATA_FIELD = "metadata";
+  private static final String TYPED_VALUE_FIELD = "typed_value";
+
+  @Override
+  public GenericRecord shredVariantRecord(
+      GenericRecord unshreddedVariant,
+      Schema shreddedSchema,
+      HoodieSchema.Variant variantSchema) {
+
+    ByteBuffer valueBuf = (ByteBuffer) unshreddedVariant.get(VALUE_FIELD);
+    ByteBuffer metadataBuf = (ByteBuffer) 
unshreddedVariant.get(METADATA_FIELD);
+
+    if (valueBuf == null || metadataBuf == null) {
+      return null;
+    }
+
+    byte[] valueBytes = toByteArray(valueBuf);
+    byte[] metadataBytes = toByteArray(metadataBuf);
+
+    Variant variant = new Variant(valueBytes, metadataBytes);
+
+    // Build VariantSchema from the Avro shredded schema, registering
+    // Avro schemas at each level for GenericRecord construction.
+    AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder();
+    VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, 
builder);
+
+    // Delegate to Spark's VariantShreddingWriter for the actual shredding 
logic.
+    AvroShreddedResult result = (AvroShreddedResult)
+        VariantShreddingWriter.castShredded(variant, sparkSchema, builder);
+
+    return result.toGenericRecord();
+  }
+
+  /**
+   * Builds a {@link VariantSchema} from an Avro {@link Schema} representing a
+   * shredded variant structure ({@code value}, {@code metadata}, {@code 
typed_value}).
+   *
+   * <p>This method also registers the Avro schema mapping in the builder so 
that
+   * {@link AvroShreddedResultBuilder#createEmpty} can create results with the
+   * correct Avro schema at each nesting level.</p>
+   */
+  private VariantSchema buildVariantSchema(Schema avroSchema, boolean 
isTopLevel,
+                                           AvroShreddedResultBuilder builder) {
+    Schema.Field valueField = avroSchema.getField(VALUE_FIELD);
+    Schema.Field metadataField = avroSchema.getField(METADATA_FIELD);
+    Schema.Field typedValueField = avroSchema.getField(TYPED_VALUE_FIELD);
+
+    int idx = 0;
+    int variantIdx = valueField != null ? idx++ : -1;
+    int topLevelMetadataIdx;
+    if (metadataField != null && isTopLevel) {
+      topLevelMetadataIdx = idx++;
+    } else {
+      topLevelMetadataIdx = -1;
+      if (metadataField != null) {
+        idx++;
+      }
+    }
+    int typedIdx = typedValueField != null ? idx++ : -1;
+    int numFields = idx;
+
+    VariantSchema.ScalarType scalarSchema = null;
+    VariantSchema.ObjectField[] objectSchema = null;
+    VariantSchema arraySchema = null;
+
+    if (typedValueField != null) {
+      Schema tvSchema = unwrapNullable(typedValueField.schema());
+
+      switch (tvSchema.getType()) {
+        case RECORD:
+          // Object shredding: each field has a nested {value, typed_value} 
sub-struct
+          List<VariantSchema.ObjectField> fields = new ArrayList<>();
+          for (Schema.Field field : tvSchema.getFields()) {
+            Schema fieldSchema = unwrapNullable(field.schema());
+            VariantSchema subSchema = buildVariantSchema(fieldSchema, false, 
builder);
+            fields.add(new VariantSchema.ObjectField(field.name(), subSchema));
+          }
+          objectSchema = fields.toArray(new VariantSchema.ObjectField[0]);
+          break;
+
+        case ARRAY:
+          // Array shredding: elements follow the shredding schema
+          Schema elementSchema = unwrapNullable(tvSchema.getElementType());
+          arraySchema = buildVariantSchema(elementSchema, false, builder);
+          break;
+
+        default:
+          // Scalar shredding
+          scalarSchema = avroTypeToScalarType(tvSchema);
+          break;
+      }
+    }
+
+    VariantSchema result = new VariantSchema(
+        typedIdx, variantIdx, topLevelMetadataIdx, numFields,
+        scalarSchema, objectSchema, arraySchema);
+
+    builder.registerSchema(result, avroSchema);
+
+    return result;
+  }
+
+  /**
+   * Maps an Avro {@link Schema} type (potentially with logical type 
annotations)
+   * to a {@link VariantSchema.ScalarType}.
+   */
+  private VariantSchema.ScalarType avroTypeToScalarType(Schema schema) {
+    LogicalType logicalType = schema.getLogicalType();
+
+    // Check logical types first
+    if (logicalType != null) {
+      if (logicalType instanceof LogicalTypes.Decimal) {
+        LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+        return new VariantSchema.DecimalType(decimal.getPrecision(), 
decimal.getScale());
+      }
+      String name = logicalType.getName();
+      if ("date".equals(name)) {
+        return new VariantSchema.DateType();
+      }
+      if ("timestamp-micros".equals(name)) {
+        return new VariantSchema.TimestampType();
+      }
+      if ("local-timestamp-micros".equals(name)) {
+        return new VariantSchema.TimestampNTZType();
+      }
+      if ("timestamp-millis".equals(name)) {
+        return new VariantSchema.TimestampType();
+      }
+      if ("local-timestamp-millis".equals(name)) {
+        return new VariantSchema.TimestampNTZType();
+      }
+      if ("uuid".equals(name)) {
+        return new VariantSchema.UuidType();
+      }
+    }
+
+    switch (schema.getType()) {
+      case BOOLEAN:
+        return new VariantSchema.BooleanType();
+      case INT:
+        return new VariantSchema.IntegralType(VariantSchema.IntegralSize.INT);
+      case LONG:
+        return new VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG);
+      case FLOAT:
+        return new VariantSchema.FloatType();
+      case DOUBLE:
+        return new VariantSchema.DoubleType();
+      case STRING:
+        return new VariantSchema.StringType();
+      case BYTES:
+        return new VariantSchema.BinaryType();
+      case FIXED:
+        return new VariantSchema.BinaryType();
+      default:
+        return null;
+    }
+  }
+
+  private static Schema unwrapNullable(Schema schema) {
+    if (schema.getType() == Schema.Type.UNION) {
+      for (Schema type : schema.getTypes()) {
+        if (type.getType() != Schema.Type.NULL) {
+          return type;
+        }
+      }
+    }
+    return schema;
+  }
+
+  private static byte[] toByteArray(ByteBuffer buffer) {
+    if (buffer.hasArray() && buffer.position() == 0
+        && buffer.arrayOffset() == 0
+        && buffer.remaining() == buffer.array().length) {
+      return buffer.array();
+    }
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.duplicate().get(bytes);
+    return bytes;
+  }
+
+  /**
+   * {@link ShreddedResult} implementation that collects shredded variant 
components
+   * and converts them into an Avro {@link GenericRecord}.
+   */
+  static class AvroShreddedResult implements ShreddedResult {
+    private final VariantSchema variantSchema;
+    private final Schema avroSchema;
+
+    private byte[] metadata;
+    private byte[] variantValue;
+    private Object scalarValue;
+    private AvroShreddedResult[] objectFields;
+    private AvroShreddedResult[] arrayElements;
+
+    AvroShreddedResult(VariantSchema variantSchema, Schema avroSchema) {
+      this.variantSchema = variantSchema;
+      this.avroSchema = avroSchema;
+    }
+
+    @Override
+    public void addArray(ShreddedResult[] array) {
+      this.arrayElements = new AvroShreddedResult[array.length];
+      for (int i = 0; i < array.length; i++) {
+        this.arrayElements[i] = (AvroShreddedResult) array[i];
+      }
+    }
+
+    @Override
+    public void addObject(ShreddedResult[] values) {
+      this.objectFields = new AvroShreddedResult[values.length];
+      for (int i = 0; i < values.length; i++) {
+        this.objectFields[i] = (AvroShreddedResult) values[i];
+      }
+    }
+
+    @Override
+    public void addVariantValue(byte[] result) {
+      this.variantValue = result;
+    }
+
+    @Override
+    public void addScalar(Object result) {
+      this.scalarValue = result;
+    }
+
+    @Override
+    public void addMetadata(byte[] result) {
+      this.metadata = result;
+    }
+
+    /**
+     * Converts the collected shredded components into an Avro {@link 
GenericRecord}.
+     */
+    GenericRecord toGenericRecord() {
+      GenericRecord record = new GenericData.Record(avroSchema);
+
+      // Metadata (only present at top level)
+      if (metadata != null) {
+        record.put(METADATA_FIELD, ByteBuffer.wrap(metadata));
+      }
+
+      // Value (variant binary for non-shredded or residual data)
+      Schema.Field valueField = avroSchema.getField(VALUE_FIELD);
+      if (valueField != null) {
+        if (variantValue != null) {
+          record.put(VALUE_FIELD, ByteBuffer.wrap(variantValue));
+        } else {
+          record.put(VALUE_FIELD, null);
+        }
+      }
+
+      // Typed value
+      Schema.Field tvField = avroSchema.getField(TYPED_VALUE_FIELD);
+      if (tvField == null) {
+        return record;
+      }
+
+      if (scalarValue != null) {
+        Schema tvSchema = unwrapNullable(tvField.schema());
+        record.put(TYPED_VALUE_FIELD, convertScalarToAvro(scalarValue, 
tvSchema));
+      } else if (objectFields != null) {
+        Schema tvSchema = unwrapNullable(tvField.schema());
+        GenericRecord tvRecord = new GenericData.Record(tvSchema);
+        for (int i = 0; i < objectFields.length; i++) {
+          String fieldName = variantSchema.objectSchema[i].fieldName;
+          // Always create the sub-record even for missing fields (non-null 
struct with null fields)
+          tvRecord.put(fieldName, objectFields[i].toGenericRecord());
+        }
+        record.put(TYPED_VALUE_FIELD, tvRecord);
+      } else if (arrayElements != null) {
+        List<GenericRecord> list = new ArrayList<>(arrayElements.length);
+        for (AvroShreddedResult element : arrayElements) {
+          list.add(element.toGenericRecord());
+        }
+        record.put(TYPED_VALUE_FIELD, list);
+      } else {
+        record.put(TYPED_VALUE_FIELD, null);
+      }
+
+      return record;
+    }
+
+    /**
+     * Converts a scalar value from Spark's variant representation to an 
Avro-compatible type.
+     * Handles type widening (Byte/Short to Int/Long) and binary wrapping.
+     */
+    private static Object convertScalarToAvro(Object value, Schema avroSchema) 
{
+      if (value instanceof byte[]) {
+        return ByteBuffer.wrap((byte[]) value);
+      }
+      if (value instanceof UUID) {
+        return value.toString();
+      }
+      // Widen integer types to match Avro schema expectations
+      if (avroSchema.getType() == Schema.Type.INT) {
+        if (value instanceof Byte) {
+          return ((Byte) value).intValue();
+        }
+        if (value instanceof Short) {
+          return ((Short) value).intValue();
+        }
+      }
+      if (avroSchema.getType() == Schema.Type.LONG) {
+        if (value instanceof Byte) {
+          return ((Byte) value).longValue();
+        }
+        if (value instanceof Short) {
+          return ((Short) value).longValue();
+        }
+        if (value instanceof Integer) {
+          return ((Integer) value).longValue();
+        }
+      }
+      // BigDecimal, Boolean, String, Integer, Long, Float, Double
+      // are directly compatible with Avro's type system
+      return value;
+    }
+  }
+
+  /**
+   * {@link ShreddedResultBuilder} that creates {@link AvroShreddedResult} 
instances
+   * with the corresponding Avro schema at each nesting level.
+   */
+  static class AvroShreddedResultBuilder implements ShreddedResultBuilder {
+    private final Map<VariantSchema, Schema> schemaMap = new 
IdentityHashMap<>();
+
+    void registerSchema(VariantSchema variantSchema, Schema avroSchema) {
+      schemaMap.put(variantSchema, avroSchema);
+    }
+
+    @Override
+    public ShreddedResult createEmpty(VariantSchema schema) {
+      Schema avroSchema = schemaMap.get(schema);
+      if (avroSchema == null) {
+        throw new IllegalStateException(
+            "No Avro schema registered for VariantSchema: " + schema);
+      }
+      return new AvroShreddedResult(schema, avroSchema);
+    }
+
+    @Override
+    public boolean allowNumericScaleChanges() {
+      return true;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index 45fe8679ba8b..10315a8a4891 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -207,18 +207,19 @@ abstract class BaseSpark4Adapter extends SparkAdapter 
with Logging {
   override def isDataTypeEqualForPhysicalSchema(requiredType: DataType, 
fileType: DataType): Option[Boolean] = {
     /**
      * Checks if a StructType is the physical representation of VariantType in 
Parquet.
-     * VariantType is stored in Parquet as a struct with two binary fields: 
"metadata" and "value".
+     * VariantType is stored in Parquet as a struct with binary fields: 
"metadata" and "value".
+     * Supports both unshredded (2 fields) and shredded (3 fields with 
"typed_value") layouts.
      */
     def isVariantPhysicalSchema(structType: StructType): Boolean = {
-      if (structType.fields.length != 2) {
-        false
-      } else {
-        val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
-        fieldMap.contains(HoodieSchema.Variant.VARIANT_VALUE_FIELD) &&
-          fieldMap.contains(HoodieSchema.Variant.VARIANT_METADATA_FIELD) &&
-          fieldMap(HoodieSchema.Variant.VARIANT_VALUE_FIELD) == BinaryType &&
-          fieldMap(HoodieSchema.Variant.VARIANT_METADATA_FIELD) == BinaryType
-      }
+      val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
+      val hasRequiredFields = 
fieldMap.contains(HoodieSchema.Variant.VARIANT_VALUE_FIELD) &&
+        fieldMap.contains(HoodieSchema.Variant.VARIANT_METADATA_FIELD) &&
+        fieldMap(HoodieSchema.Variant.VARIANT_VALUE_FIELD) == BinaryType &&
+        fieldMap(HoodieSchema.Variant.VARIANT_METADATA_FIELD) == BinaryType
+      val isUnshredded = structType.fields.length == 2
+      val isShredded = structType.fields.length == 3 &&
+        fieldMap.contains(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)
+      hasRequiredFields && (isUnshredded || isShredded)
     }
 
     // Handle VariantType comparisons
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
index 1516fe870057..2c87a5efe0af 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
@@ -276,6 +276,9 @@ object Spark40ParquetReader extends 
SparkParquetReaderBuilder {
     )
     hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, 
sqlConf.parquetInferTimestampNTZEnabled)
 
+    hadoopConf.setBoolean(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key,
+      options.getOrElse("hoodie.parquet.variant.allow.reading.shredded", 
"true").toBoolean)
+
     val enableLogicalTimestampRepair = 
hadoopConf.getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)
     val returningBatch = sqlConf.parquetVectorizedReaderEnabled &&
       options.getOrElse(FileFormat.OPTION_RETURNING_BATCH,

Reply via email to