This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch variant-intro-shredded-avro-support in repository https://gitbox.apache.org/repos/asf/hudi.git
commit afc6b4b5b06e298ccd92f0eb50b3bf0701a67844 Author: voon <[email protected]> AuthorDate: Fri Jan 30 16:57:16 2026 +0800 feat(schema): Add read + write support for shredded for AVRO - Added support to write shredded types for HoodieRecordType.AVRO - Added functional tests for testing newly added configs --- .../hudi/common/config/HoodieStorageConfig.java | 10 + .../apache/hudi/avro/HoodieAvroWriteSupport.java | 396 +++++++++++++++++++- .../apache/hudi/avro/VariantShreddingProvider.java | 66 ++++ .../hadoop/HoodieAvroFileWriterFactory.java | 31 +- .../hudi/HoodieHadoopFsRelationFactory.scala | 13 + .../sql/hudi/dml/schema/TestVariantDataType.scala | 166 +++++++++ .../variant/Spark4VariantShreddingProvider.java | 405 +++++++++++++++++++++ .../spark/sql/adapter/BaseSpark4Adapter.scala | 21 +- .../datasources/parquet/Spark40ParquetReader.scala | 3 + 9 files changed, 1097 insertions(+), 14 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java index fac0719fd76b..1a29b8c94257 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java @@ -198,6 +198,16 @@ public class HoodieStorageConfig extends HoodieConfig { + "When disabled, only unshredded variant data can be read. " + "Equivalent to Spark's spark.sql.variant.allowReadingShredded."); + public static final ConfigProperty<String> PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS = ConfigProperty + .key("hoodie.parquet.variant.shredding.provider.class") + .noDefaultValue() + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("Fully-qualified class name of the VariantShreddingProvider implementation " + + "used to shred variant values at write time in the Avro record path. " + + "The provider parses variant binary data and populates typed_value columns. " + + "When not set, the provider is auto-detected from the classpath."); + public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE = ConfigProperty .key("hoodie.parquet.write.utc-timezone.enabled") .defaultValue(true) diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java index 547b02787369..ac674cbf60cf 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java @@ -20,22 +20,45 @@ package org.apache.hudi.avro; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.schema.MessageType; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED; /** - * Wrap AvroWriterSupport for plugging in the bloom filter. + * Wrap AvroWriterSupport for plugging in the bloom filter and variant shredding support. + * + * <p>When variant columns are configured for shredding (via {@link HoodieSchema.Variant#isShredded()}), + * this class transforms variant records at write time to populate {@code typed_value} columns + * by parsing variant binary data using a {@link VariantShreddingProvider} loaded via reflection.</p> */ public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> { @@ -43,15 +66,207 @@ public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> { private final Map<String, String> footerMetadata = new HashMap<>(); protected final Properties properties; + /** + * Whether variant write shredding is enabled via config. + */ + private final boolean variantWriteShreddingEnabled; + + /** + * The effective (possibly shredded) HoodieSchema used for writing. + */ + private final HoodieSchema effectiveHoodieSchema; + + /** + * The effective Avro schema (derived from effectiveHoodieSchema). + */ + private final Schema effectiveAvroSchema; + + /** + * Indices of top-level variant fields that need shredding transformation. + * Empty array if no shredding is needed. + */ + private final int[] shreddedVariantFieldIndices; + + /** + * The shredded Avro sub-schema for each variant field at the corresponding index. + * Indexed by position in {@link #shreddedVariantFieldIndices}. + */ + private final Schema[] shreddedVariantAvroSchemas; + + /** + * The HoodieSchema.Variant for each variant field at the corresponding index. + * Indexed by position in {@link #shreddedVariantFieldIndices}. + */ + private final HoodieSchema.Variant[] shreddedVariantHoodieSchemas; + + /** + * Provider for variant shredding (loaded via reflection). Null if no shredding is needed. + */ + private final VariantShreddingProvider shreddingProvider; + public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, Option<BloomFilter> bloomFilterOpt, Properties properties) { - super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE); + this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema, properties), bloomFilterOpt, properties); + } + + private HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, HoodieSchema effectiveSchema, + Option<BloomFilter> bloomFilterOpt, Properties properties) { + super(schema, effectiveSchema.toAvroSchema(), ConvertingGenericData.INSTANCE); this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new); this.properties = properties; String vectorMeta = HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema); if (!vectorMeta.isEmpty()) { footerMetadata.put(HoodieSchema.PARQUET_VECTOR_COLUMNS_METADATA_KEY, vectorMeta); } + + this.effectiveHoodieSchema = effectiveSchema; + this.effectiveAvroSchema = effectiveSchema.toAvroSchema(); + this.variantWriteShreddingEnabled = Boolean.parseBoolean( + properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(), + String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue()))); + + // Identify variant fields that need shredding + List<Integer> variantIndices = new ArrayList<>(); + List<Schema> variantAvroSchemas = new ArrayList<>(); + List<HoodieSchema.Variant> variantHoodieSchemas = new ArrayList<>(); + + if (variantWriteShreddingEnabled && effectiveSchema.getType() == HoodieSchemaType.RECORD) { + List<HoodieSchemaField> fields = effectiveSchema.getFields(); + for (int i = 0; i < fields.size(); i++) { + HoodieSchemaField field = fields.get(i); + HoodieSchema fieldSchema = field.schema(); + // Unwrap nullable union to get the underlying type + if (fieldSchema.isNullable()) { + fieldSchema = fieldSchema.getNonNullType(); + } + if (fieldSchema.getType() == HoodieSchemaType.VARIANT) { + HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema; + if (variant.isShredded() && variant.getTypedValueField().isPresent()) { + variantIndices.add(i); + // Get the Avro sub-schema for this variant field from the effective schema + Schema fieldAvroSchema = effectiveAvroSchema.getFields().get(i).schema(); + // Unwrap nullable union + if (fieldAvroSchema.getType() == Schema.Type.UNION) { + fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema); + } + variantAvroSchemas.add(fieldAvroSchema); + variantHoodieSchemas.add(variant); + } + } + } + } + + this.shreddedVariantFieldIndices = variantIndices.stream().mapToInt(Integer::intValue).toArray(); + this.shreddedVariantAvroSchemas = variantAvroSchemas.toArray(new Schema[0]); + this.shreddedVariantHoodieSchemas = variantHoodieSchemas.toArray(new HoodieSchema.Variant[0]); + + // Load shredding provider via reflection if needed + if (shreddedVariantFieldIndices.length > 0) { + String providerClass = properties.getProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key()); + if (providerClass != null && !providerClass.isEmpty()) { + this.shreddingProvider = (VariantShreddingProvider) ReflectionUtils.loadClass(providerClass); + } else { + this.shreddingProvider = null; + } + } else { + this.shreddingProvider = null; + } + } + + /** + * Generates the effective schema for writing, applying variant shredding configuration. + * + * <p>When shredding is disabled, shredded variant fields are replaced with unshredded + * variants (removing {@code typed_value}) so that the Parquet file does not contain + * unused typed_value columns.</p> + * + * <p>When shredding is enabled and a forced shredding schema is configured via + * {@link HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST}, + * all variant fields are replaced with shredded variants using the forced schema. + * This handles the case where the input schema is unshredded but shredding is desired.</p> + * + * <p>When shredding is enabled without a forced schema, the schema is returned as-is + * (already-shredded variants stay shredded, unshredded variants stay unshredded).</p> + * + * @param hoodieSchema the original HoodieSchema + * @param properties writer properties containing shredding configuration + * @return the effective schema to use for writing + */ + public static HoodieSchema generateEffectiveSchema(HoodieSchema hoodieSchema, Properties properties) { + boolean shreddingEnabled = Boolean.parseBoolean( + properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(), + String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue()))); + + if (!shreddingEnabled) { + // Schemas from clustering/compaction may still be shredded (read from on-disk Parquet files + // written with shredding enabled), so we need to strip typed_value when shredding + // is disabled. + return unshreddVariantFields(hoodieSchema); + } + + // Check if a forced shredding schema is configured + String forceShreddingSchema = properties.getProperty( + PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key()); + if (forceShreddingSchema != null && !forceShreddingSchema.isEmpty()) { + return applyForcedShreddingSchema(hoodieSchema, forceShreddingSchema); + } + + // When enabled without forced schema, use the schema as-is + // (shredded variants stay shredded, unshredded variants stay unshredded) + return hoodieSchema; + } + + /** + * Overloaded version accepting HoodieConfig for use by factories. + */ + public static HoodieSchema generateEffectiveSchema(HoodieSchema hoodieSchema, HoodieConfig config) { + return generateEffectiveSchema(hoodieSchema, config.getProps()); + } + + @SuppressWarnings("unchecked") + @Override + public void write(T record) { + if (shreddedVariantFieldIndices.length > 0 && shreddingProvider != null) { + IndexedRecord inputRecord = (IndexedRecord) record; + GenericRecord shreddedRecord = new GenericData.Record(effectiveAvroSchema); + + // Copy all fields, transforming variant fields that need shredding + List<Schema.Field> effectiveFields = effectiveAvroSchema.getFields(); + Schema inputSchema = inputRecord.getSchema(); + + for (int i = 0; i < effectiveFields.size(); i++) { + Schema.Field effectiveField = effectiveFields.get(i); + String fieldName = effectiveField.name(); + Schema.Field inputField = inputSchema.getField(fieldName); + if (inputField == null) { + continue; + } + + int variantIdx = findVariantIndex(i); + if (variantIdx >= 0) { + // This is a shredded variant field - transform it + Object fieldValue = inputRecord.get(inputField.pos()); + if (fieldValue instanceof GenericRecord) { + GenericRecord variantRecord = (GenericRecord) fieldValue; + GenericRecord shreddedVariant = shreddingProvider.shredVariantRecord( + variantRecord, + shreddedVariantAvroSchemas[variantIdx], + shreddedVariantHoodieSchemas[variantIdx]); + shreddedRecord.put(i, shreddedVariant); + } else { + // Null or unexpected type - copy as-is + shreddedRecord.put(i, fieldValue); + } + } else { + // Non-variant field - copy as-is + shreddedRecord.put(i, inputRecord.get(inputField.pos())); + } + } + + super.write((T) shreddedRecord); + } else { + super.write(record); + } } @Override @@ -74,6 +289,181 @@ public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> { footerMetadata.put(key, value); } + /** + * Finds the position in {@link #shreddedVariantFieldIndices} for the given effective field index, + * or -1 if this field is not a variant field that needs shredding. + */ + private int findVariantIndex(int effectiveFieldIndex) { + for (int i = 0; i < shreddedVariantFieldIndices.length; i++) { + if (shreddedVariantFieldIndices[i] == effectiveFieldIndex) { + return i; + } + } + return -1; + } + + private static final Pattern DECIMAL_PATTERN = Pattern.compile( + "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)"); + + /** + * Applies a forced shredding schema to all variant fields in the given schema. + * The forced schema DDL (e.g., {@code "a int, b string"}) defines the typed_value + * fields that will be added to each variant column. + */ + private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema, String ddl) { + if (schema.getType() != HoodieSchemaType.RECORD) { + return schema; + } + + Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl); + + List<HoodieSchemaField> fields = schema.getFields(); + List<HoodieSchemaField> newFields = new ArrayList<>(); + boolean changed = false; + + for (HoodieSchemaField field : fields) { + HoodieSchema fieldSchema = field.schema(); + boolean wasNullable = fieldSchema.isNullable(); + HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : fieldSchema; + + if (unwrapped.getType() == HoodieSchemaType.VARIANT) { + HoodieSchema.Variant shreddedVariant = HoodieSchema.createVariantShreddedObject( + unwrapped.getAvroSchema().getName(), + unwrapped.getAvroSchema().getNamespace(), + unwrapped.getAvroSchema().getDoc(), + shreddedFields); + HoodieSchema replacement = wasNullable + ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant; + newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement))); + changed = true; + } else { + newFields.add(HoodieSchemaUtils.createNewSchemaField(field)); + } + } + + if (!changed) { + return schema; + } + + return HoodieSchema.createRecord( + schema.getAvroSchema().getName(), + schema.getAvroSchema().getNamespace(), + schema.getAvroSchema().getDoc(), + newFields); + } + + /** + * Parses a DDL-style shredding schema string (e.g., {@code "a int, b string, c decimal(15,1)"}) + * into a map of field names to their HoodieSchema types. + */ + private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) { + Map<String, HoodieSchema> fields = new LinkedHashMap<>(); + for (String fieldDef : ddl.split(",")) { + String trimmed = fieldDef.trim(); + if (trimmed.isEmpty()) { + continue; + } + String[] parts = trimmed.split("\\s+", 2); + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid shredding DDL field definition (expected 'name type'): " + trimmed); + } + fields.put(parts[0].trim(), parseSimpleType(parts[1].trim())); + } + return fields; + } + + /** + * Parses a simple type name into a HoodieSchema. + * Supports common types: int, long, string, double, float, boolean, binary, decimal(p,s). + */ + private static HoodieSchema parseSimpleType(String type) { + String lower = type.toLowerCase(); + switch (lower) { + case "int": + case "integer": + return HoodieSchema.create(HoodieSchemaType.INT); + case "long": + case "bigint": + return HoodieSchema.create(HoodieSchemaType.LONG); + case "string": + return HoodieSchema.create(HoodieSchemaType.STRING); + case "double": + return HoodieSchema.create(HoodieSchemaType.DOUBLE); + case "float": + return HoodieSchema.create(HoodieSchemaType.FLOAT); + case "boolean": + return HoodieSchema.create(HoodieSchemaType.BOOLEAN); + case "binary": + return HoodieSchema.create(HoodieSchemaType.BYTES); + default: + Matcher m = DECIMAL_PATTERN.matcher(lower); + if (m.matches()) { + return HoodieSchema.createDecimal( + Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2))); + } + throw new IllegalArgumentException("Unsupported shredding type: " + type); + } + } + + /** + * Strips shredding from variant fields in the schema. + * Replaces shredded variant fields with unshredded variants (removing typed_value). + */ + private static HoodieSchema unshreddVariantFields(HoodieSchema schema) { + if (schema.getType() != HoodieSchemaType.RECORD) { + return schema; + } + + List<HoodieSchemaField> fields = schema.getFields(); + List<HoodieSchemaField> newFields = new ArrayList<>(); + boolean changed = false; + + for (HoodieSchemaField field : fields) { + HoodieSchema fieldSchema = field.schema(); + boolean wasNullable = fieldSchema.isNullable(); + HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : fieldSchema; + + if (unwrapped.getType() == HoodieSchemaType.VARIANT) { + HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped; + if (variant.isShredded()) { + // Replace with unshredded variant + HoodieSchema.Variant unshredded = HoodieSchema.createVariant( + unwrapped.getAvroSchema().getName(), + unwrapped.getAvroSchema().getNamespace(), + unwrapped.getAvroSchema().getDoc()); + HoodieSchema replacement = wasNullable ? HoodieSchema.createNullable(unshredded) : unshredded; + newFields.add(field.withSchema(replacement)); + changed = true; + continue; + } + } + newFields.add(field); + } + + if (!changed) { + return schema; + } + + return HoodieSchema.createRecord( + schema.getAvroSchema().getName(), + schema.getAvroSchema().getNamespace(), + schema.getAvroSchema().getDoc(), + newFields); + } + + /** + * Extracts the non-null type from a union schema. + */ + private static Schema getNonNullFromUnion(Schema unionSchema) { + for (Schema type : unionSchema.getTypes()) { + if (type.getType() != Schema.Type.NULL) { + return type; + } + } + throw new IllegalArgumentException("Union schema does not contain a non-null type: " + unionSchema); + } + private static class HoodieBloomFilterAvroWriteSupport extends HoodieBloomFilterWriteSupport<String> { public HoodieBloomFilterAvroWriteSupport(BloomFilter bloomFilter) { super(bloomFilter); @@ -84,4 +474,4 @@ public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> { return StringUtils.getUTF8Bytes(key); } } -} +} \ No newline at end of file diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java new file mode 100644 index 000000000000..90ffbee1e53f --- /dev/null +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.hudi.common.schema.HoodieSchema; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +/** + * Interface for shredding variant values at write time. + * <p> + * Implementations parse variant binary data (value + metadata bytes) and produce + * a shredded {@link GenericRecord} with typed_value columns populated according + * to the shredding schema. + * <p> + * This interface allows the variant binary parsing logic (which may depend on + * engine-specific libraries like Spark's variant module) to be loaded via reflection, + * keeping the core write support free of engine-specific dependencies. + */ +public interface VariantShreddingProvider { + + /** + * Transform an unshredded variant GenericRecord into a shredded one. + * <p> + * The input record is expected to have: + * <ul> + * <li>{@code value}: ByteBuffer containing the variant value binary</li> + * <li>{@code metadata}: ByteBuffer containing the variant metadata binary</li> + * </ul> + * <p> + * The output record should conform to {@code shreddedSchema} and have: + * <ul> + * <li>{@code value}: ByteBuffer or null (null when typed_value captures the full value)</li> + * <li>{@code metadata}: ByteBuffer (always present)</li> + * <li>{@code typed_value}: the typed representation extracted from the variant binary, + * or null if the variant type does not match the typed_value schema</li> + * </ul> + * + * @param unshreddedVariant GenericRecord with {value: ByteBuffer, metadata: ByteBuffer} + * @param shreddedSchema target Avro schema with {value: nullable ByteBuffer, metadata: ByteBuffer, typed_value: type} + * @param variantSchema HoodieSchema.Variant containing the shredding schema information + * @return a GenericRecord conforming to shreddedSchema with typed_value populated where possible + */ + GenericRecord shredVariantRecord( + GenericRecord unshreddedVariant, + Schema shreddedSchema, + HoodieSchema.Variant variantSchema); +} \ No newline at end of file diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java index 5be171d35c61..afcb5803ab9e 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java @@ -49,6 +49,7 @@ import java.io.OutputStream; import java.util.Properties; import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WRITER_TO_ALLOW_DUPLICATES; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS; import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter; public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { @@ -123,9 +124,37 @@ public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { private HoodieAvroWriteSupport getHoodieAvroWriteSupport(HoodieSchema schema, HoodieConfig config, boolean enableBloomFilter) { Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty(); + HoodieSchema effectiveSchema = HoodieAvroWriteSupport.generateEffectiveSchema(schema, config); + Properties props = config.getProps(); + // Auto-detect variant shredding provider from classpath if not explicitly configured + if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) { + String detected = detectShreddingProvider(); + if (detected != null) { + props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(), detected); + } + } return (HoodieAvroWriteSupport) ReflectionUtils.loadClass( config.getStringOrDefault(HoodieStorageConfig.HOODIE_AVRO_WRITE_SUPPORT_CLASS), new Class<?>[] {MessageType.class, HoodieSchema.class, Option.class, Properties.class}, - getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(schema), schema, filter, config.getProps()); + getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(effectiveSchema), schema, filter, props); + } + + /** + * Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider} implementation + * available on the classpath. Returns the fully-qualified class name if found, or null. + */ + private static String detectShreddingProvider() { + String[] candidates = { + "org.apache.hudi.variant.Spark4VariantShreddingProvider" + }; + for (String candidate : candidates) { + try { + Class.forName(candidate); + return candidate; + } catch (ClassNotFoundException e) { + // not on classpath, try next + } + } + return null; } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala index a11e3075a6fb..d3449fcc56f6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedFileFormat import org.apache.spark.sql.hudi.HoodieSqlCommonUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType @@ -62,6 +63,18 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext, val schemaSpec: Option[StructType], val isBootstrap: Boolean ) extends SparkAdapterSupport with HoodieHadoopFsRelationFactory with Logging { + // Propagate Hudi's variant allow-reading-shredded config to Spark's SQLConf. + // ParquetToSparkSchemaConverter reads this from SQLConf.get(), so it must be set + // before query execution starts here during table resolution + if (HoodieSparkUtils.gteqSpark4_0) { + val sqlConf = sqlContext.sparkSession.sessionState.conf + val hoodieParquetAllowReadingShreddedConfKey = "hoodie.parquet.variant.allow.reading.shredded" + val allowReadingShredded = options.getOrElse( + hoodieParquetAllowReadingShreddedConfKey, + sqlConf.getConfString(hoodieParquetAllowReadingShreddedConfKey, "true")) + sqlConf.setConfString(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key, allowReadingShredded) + } + protected lazy val sparkSession: SparkSession = sqlContext.sparkSession protected lazy val optParams: Map[String, String] = options diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala index a236592bc1d4..421525682000 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala @@ -24,6 +24,10 @@ import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.common.util.StringUtils import org.apache.hudi.internal.schema.HoodieSchemaException +import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath} +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.parquet.schema.{GroupType, MessageType, Type} import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase @@ -189,4 +193,166 @@ class TestVariantDataType extends HoodieSparkSqlTestBase { spark.sql(s"drop table $tableName") } + + test("Test Shredded Variant Write and Read + Validate Parquet Schema after Write") { + assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") + + // Test 1: Shredding enabled with forced schema → parquet should have typed_value + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | v variant, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey = 'id', + | type = 'cow', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql("set hoodie.parquet.variant.write.shredding.enabled = true") + spark.sql("set hoodie.parquet.variant.allow.reading.shredded = true") + spark.sql("set hoodie.parquet.variant.force.shredding.schema.for.test = a int, b string") + + spark.sql( + s""" + |insert into $tableName values + | (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000) + """.stripMargin) + checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")( + Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000) + ) + + // Verify parquet schema has shredded structure with typed_value + val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath) + assert(parquetFiles.nonEmpty, "Should have at least one data parquet file") + + parquetFiles.foreach { filePath => + val schema = readParquetSchema(filePath) + val variantGroup = getFieldAsGroup(schema, "v") + assert(groupContainsField(variantGroup, "typed_value"), + s"Shredded variant should have typed_value field. Schema:\n$variantGroup") + val valueField = variantGroup.getType(variantGroup.getFieldIndex("value")) + assert(valueField.getRepetition == Type.Repetition.OPTIONAL, + "Shredded variant value field should be OPTIONAL") + val metadataField = variantGroup.getType(variantGroup.getFieldIndex("metadata")) + assert(metadataField.getRepetition == Type.Repetition.REQUIRED, + "Shredded variant metadata field should be REQUIRED") + } + }) + } + + test("Test Unshredded Variant Write and Read + Validate Parquet Schema after Write") { + assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") + // Shredding disabled parquet should NOT have typed_value + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | v variant, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey = 'id', + | type = 'cow', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql(s"set hoodie.parquet.variant.write.shredding.enabled = false") + + spark.sql( + s""" + |insert into $tableName values + | (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000) + """.stripMargin) + + checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")( + Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000) + ) + + // Verify parquet schema does NOT have typed_value + val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath) + assert(parquetFiles.nonEmpty, "Should have at least one data parquet file") + + parquetFiles.foreach { filePath => + val schema = readParquetSchema(filePath) + val variantGroup = getFieldAsGroup(schema, "v") + assert(!groupContainsField(variantGroup, "typed_value"), + s"Non-shredded variant should NOT have typed_value field. Schema:\n$variantGroup") + val valueField = variantGroup.getType(variantGroup.getFieldIndex("value")) + assert(valueField.getRepetition == Type.Repetition.REQUIRED, + "Non-shredded variant value field should be REQUIRED") + } + + // Verify data can still be read back for the non-shredded case + checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")( + Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000) + ) + }) + } + + /** + * Lists data parquet files in the table directory, excluding Hudi metadata files. + */ + private def listDataParquetFiles(tablePath: String): Seq[String] = { + val conf = spark.sparkContext.hadoopConfiguration + val fs = FileSystem.get(new HadoopPath(tablePath).toUri, conf) + val iter = fs.listFiles(new HadoopPath(tablePath), true) + val files = scala.collection.mutable.ArrayBuffer[String]() + while (iter.hasNext) { + val file = iter.next() + val path = file.getPath.toString + if (path.endsWith(".parquet") && !path.contains(".hoodie")) { + files += path + } + } + files.toSeq + } + + /** + * Reads the Parquet schema (MessageType) from a parquet file. + */ + private def readParquetSchema(filePath: String): MessageType = { + val conf = spark.sparkContext.hadoopConfiguration + val inputFile = HadoopInputFile.fromPath(new HadoopPath(filePath), conf) + val reader = ParquetFileReader.open(inputFile) + try { + reader.getFooter.getFileMetaData.getSchema + } finally { + reader.close() + } + } + + /** + * Gets a named field from a GroupType (MessageType) and returns it as a GroupType. + * Uses getFieldIndex(String) + getType(int) to avoid Scala overload resolution issues. + */ + private def getFieldAsGroup(parent: GroupType, fieldName: String): GroupType = { + val idx: Int = parent.getFieldIndex(fieldName) + parent.getType(idx).asGroupType() + } + + /** + * Checks whether a GroupType contains a field with the given name. + * Uses try/catch on getFieldIndex to avoid Scala-Java collection converter dependencies. + */ + private def groupContainsField(group: GroupType, fieldName: String): Boolean = { + try { + group.getFieldIndex(fieldName) + true + } catch { + case _: Exception => false + } + } } diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java new file mode 100644 index 000000000000..ae981a22f5af --- /dev/null +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.variant; + +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.common.schema.HoodieSchema; + +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.types.variant.VariantSchema; +import org.apache.spark.types.variant.VariantShreddingWriter; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Implementation of {@link VariantShreddingProvider} using Spark 4's variant parsing library. + * + * <p>This class bridges the Avro record path and Spark's {@link VariantShreddingWriter} + * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It converts + * the shredded output into Avro {@link GenericRecord}s that can be written via + * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p> + * + * <p>The shredding logic is delegated to {@link VariantShreddingWriter#castShredded}, + * which handles scalar, object, and array shredding including residual value construction + * for non-matching fields. This class implements the {@link ShreddedResult} and + * {@link ShreddedResultBuilder} interfaces to collect the shredded components into + * Avro GenericRecords.</p> + */ +public class Spark4VariantShreddingProvider implements VariantShreddingProvider { + + private static final String VALUE_FIELD = "value"; + private static final String METADATA_FIELD = "metadata"; + private static final String TYPED_VALUE_FIELD = "typed_value"; + + @Override + public GenericRecord shredVariantRecord( + GenericRecord unshreddedVariant, + Schema shreddedSchema, + HoodieSchema.Variant variantSchema) { + + ByteBuffer valueBuf = (ByteBuffer) unshreddedVariant.get(VALUE_FIELD); + ByteBuffer metadataBuf = (ByteBuffer) unshreddedVariant.get(METADATA_FIELD); + + if (valueBuf == null || metadataBuf == null) { + return null; + } + + byte[] valueBytes = toByteArray(valueBuf); + byte[] metadataBytes = toByteArray(metadataBuf); + + Variant variant = new Variant(valueBytes, metadataBytes); + + // Build VariantSchema from the Avro shredded schema, registering + // Avro schemas at each level for GenericRecord construction. + AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder(); + VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, builder); + + // Delegate to Spark's VariantShreddingWriter for the actual shredding logic. + AvroShreddedResult result = (AvroShreddedResult) + VariantShreddingWriter.castShredded(variant, sparkSchema, builder); + + return result.toGenericRecord(); + } + + /** + * Builds a {@link VariantSchema} from an Avro {@link Schema} representing a + * shredded variant structure ({@code value}, {@code metadata}, {@code typed_value}). + * + * <p>This method also registers the Avro schema mapping in the builder so that + * {@link AvroShreddedResultBuilder#createEmpty} can create results with the + * correct Avro schema at each nesting level.</p> + */ + private VariantSchema buildVariantSchema(Schema avroSchema, boolean isTopLevel, + AvroShreddedResultBuilder builder) { + Schema.Field valueField = avroSchema.getField(VALUE_FIELD); + Schema.Field metadataField = avroSchema.getField(METADATA_FIELD); + Schema.Field typedValueField = avroSchema.getField(TYPED_VALUE_FIELD); + + int idx = 0; + int variantIdx = valueField != null ? idx++ : -1; + int topLevelMetadataIdx; + if (metadataField != null && isTopLevel) { + topLevelMetadataIdx = idx++; + } else { + topLevelMetadataIdx = -1; + if (metadataField != null) { + idx++; + } + } + int typedIdx = typedValueField != null ? idx++ : -1; + int numFields = idx; + + VariantSchema.ScalarType scalarSchema = null; + VariantSchema.ObjectField[] objectSchema = null; + VariantSchema arraySchema = null; + + if (typedValueField != null) { + Schema tvSchema = unwrapNullable(typedValueField.schema()); + + switch (tvSchema.getType()) { + case RECORD: + // Object shredding: each field has a nested {value, typed_value} sub-struct + List<VariantSchema.ObjectField> fields = new ArrayList<>(); + for (Schema.Field field : tvSchema.getFields()) { + Schema fieldSchema = unwrapNullable(field.schema()); + VariantSchema subSchema = buildVariantSchema(fieldSchema, false, builder); + fields.add(new VariantSchema.ObjectField(field.name(), subSchema)); + } + objectSchema = fields.toArray(new VariantSchema.ObjectField[0]); + break; + + case ARRAY: + // Array shredding: elements follow the shredding schema + Schema elementSchema = unwrapNullable(tvSchema.getElementType()); + arraySchema = buildVariantSchema(elementSchema, false, builder); + break; + + default: + // Scalar shredding + scalarSchema = avroTypeToScalarType(tvSchema); + break; + } + } + + VariantSchema result = new VariantSchema( + typedIdx, variantIdx, topLevelMetadataIdx, numFields, + scalarSchema, objectSchema, arraySchema); + + builder.registerSchema(result, avroSchema); + + return result; + } + + /** + * Maps an Avro {@link Schema} type (potentially with logical type annotations) + * to a {@link VariantSchema.ScalarType}. + */ + private VariantSchema.ScalarType avroTypeToScalarType(Schema schema) { + LogicalType logicalType = schema.getLogicalType(); + + // Check logical types first + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return new VariantSchema.DecimalType(decimal.getPrecision(), decimal.getScale()); + } + String name = logicalType.getName(); + if ("date".equals(name)) { + return new VariantSchema.DateType(); + } + if ("timestamp-micros".equals(name)) { + return new VariantSchema.TimestampType(); + } + if ("local-timestamp-micros".equals(name)) { + return new VariantSchema.TimestampNTZType(); + } + if ("timestamp-millis".equals(name)) { + return new VariantSchema.TimestampType(); + } + if ("local-timestamp-millis".equals(name)) { + return new VariantSchema.TimestampNTZType(); + } + if ("uuid".equals(name)) { + return new VariantSchema.UuidType(); + } + } + + switch (schema.getType()) { + case BOOLEAN: + return new VariantSchema.BooleanType(); + case INT: + return new VariantSchema.IntegralType(VariantSchema.IntegralSize.INT); + case LONG: + return new VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG); + case FLOAT: + return new VariantSchema.FloatType(); + case DOUBLE: + return new VariantSchema.DoubleType(); + case STRING: + return new VariantSchema.StringType(); + case BYTES: + return new VariantSchema.BinaryType(); + case FIXED: + return new VariantSchema.BinaryType(); + default: + return null; + } + } + + private static Schema unwrapNullable(Schema schema) { + if (schema.getType() == Schema.Type.UNION) { + for (Schema type : schema.getTypes()) { + if (type.getType() != Schema.Type.NULL) { + return type; + } + } + } + return schema; + } + + private static byte[] toByteArray(ByteBuffer buffer) { + if (buffer.hasArray() && buffer.position() == 0 + && buffer.arrayOffset() == 0 + && buffer.remaining() == buffer.array().length) { + return buffer.array(); + } + byte[] bytes = new byte[buffer.remaining()]; + buffer.duplicate().get(bytes); + return bytes; + } + + /** + * {@link ShreddedResult} implementation that collects shredded variant components + * and converts them into an Avro {@link GenericRecord}. + */ + static class AvroShreddedResult implements ShreddedResult { + private final VariantSchema variantSchema; + private final Schema avroSchema; + + private byte[] metadata; + private byte[] variantValue; + private Object scalarValue; + private AvroShreddedResult[] objectFields; + private AvroShreddedResult[] arrayElements; + + AvroShreddedResult(VariantSchema variantSchema, Schema avroSchema) { + this.variantSchema = variantSchema; + this.avroSchema = avroSchema; + } + + @Override + public void addArray(ShreddedResult[] array) { + this.arrayElements = new AvroShreddedResult[array.length]; + for (int i = 0; i < array.length; i++) { + this.arrayElements[i] = (AvroShreddedResult) array[i]; + } + } + + @Override + public void addObject(ShreddedResult[] values) { + this.objectFields = new AvroShreddedResult[values.length]; + for (int i = 0; i < values.length; i++) { + this.objectFields[i] = (AvroShreddedResult) values[i]; + } + } + + @Override + public void addVariantValue(byte[] result) { + this.variantValue = result; + } + + @Override + public void addScalar(Object result) { + this.scalarValue = result; + } + + @Override + public void addMetadata(byte[] result) { + this.metadata = result; + } + + /** + * Converts the collected shredded components into an Avro {@link GenericRecord}. + */ + GenericRecord toGenericRecord() { + GenericRecord record = new GenericData.Record(avroSchema); + + // Metadata (only present at top level) + if (metadata != null) { + record.put(METADATA_FIELD, ByteBuffer.wrap(metadata)); + } + + // Value (variant binary for non-shredded or residual data) + Schema.Field valueField = avroSchema.getField(VALUE_FIELD); + if (valueField != null) { + if (variantValue != null) { + record.put(VALUE_FIELD, ByteBuffer.wrap(variantValue)); + } else { + record.put(VALUE_FIELD, null); + } + } + + // Typed value + Schema.Field tvField = avroSchema.getField(TYPED_VALUE_FIELD); + if (tvField == null) { + return record; + } + + if (scalarValue != null) { + Schema tvSchema = unwrapNullable(tvField.schema()); + record.put(TYPED_VALUE_FIELD, convertScalarToAvro(scalarValue, tvSchema)); + } else if (objectFields != null) { + Schema tvSchema = unwrapNullable(tvField.schema()); + GenericRecord tvRecord = new GenericData.Record(tvSchema); + for (int i = 0; i < objectFields.length; i++) { + String fieldName = variantSchema.objectSchema[i].fieldName; + // Always create the sub-record even for missing fields (non-null struct with null fields) + tvRecord.put(fieldName, objectFields[i].toGenericRecord()); + } + record.put(TYPED_VALUE_FIELD, tvRecord); + } else if (arrayElements != null) { + List<GenericRecord> list = new ArrayList<>(arrayElements.length); + for (AvroShreddedResult element : arrayElements) { + list.add(element.toGenericRecord()); + } + record.put(TYPED_VALUE_FIELD, list); + } else { + record.put(TYPED_VALUE_FIELD, null); + } + + return record; + } + + /** + * Converts a scalar value from Spark's variant representation to an Avro-compatible type. + * Handles type widening (Byte/Short to Int/Long) and binary wrapping. + */ + private static Object convertScalarToAvro(Object value, Schema avroSchema) { + if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); + } + if (value instanceof UUID) { + return value.toString(); + } + // Widen integer types to match Avro schema expectations + if (avroSchema.getType() == Schema.Type.INT) { + if (value instanceof Byte) { + return ((Byte) value).intValue(); + } + if (value instanceof Short) { + return ((Short) value).intValue(); + } + } + if (avroSchema.getType() == Schema.Type.LONG) { + if (value instanceof Byte) { + return ((Byte) value).longValue(); + } + if (value instanceof Short) { + return ((Short) value).longValue(); + } + if (value instanceof Integer) { + return ((Integer) value).longValue(); + } + } + // BigDecimal, Boolean, String, Integer, Long, Float, Double + // are directly compatible with Avro's type system + return value; + } + } + + /** + * {@link ShreddedResultBuilder} that creates {@link AvroShreddedResult} instances + * with the corresponding Avro schema at each nesting level. + */ + static class AvroShreddedResultBuilder implements ShreddedResultBuilder { + private final Map<VariantSchema, Schema> schemaMap = new IdentityHashMap<>(); + + void registerSchema(VariantSchema variantSchema, Schema avroSchema) { + schemaMap.put(variantSchema, avroSchema); + } + + @Override + public ShreddedResult createEmpty(VariantSchema schema) { + Schema avroSchema = schemaMap.get(schema); + if (avroSchema == null) { + throw new IllegalStateException( + "No Avro schema registered for VariantSchema: " + schema); + } + return new AvroShreddedResult(schema, avroSchema); + } + + @Override + public boolean allowNumericScaleChanges() { + return true; + } + } +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala index 45fe8679ba8b..10315a8a4891 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala @@ -207,18 +207,19 @@ abstract class BaseSpark4Adapter extends SparkAdapter with Logging { override def isDataTypeEqualForPhysicalSchema(requiredType: DataType, fileType: DataType): Option[Boolean] = { /** * Checks if a StructType is the physical representation of VariantType in Parquet. - * VariantType is stored in Parquet as a struct with two binary fields: "metadata" and "value". + * VariantType is stored in Parquet as a struct with binary fields: "metadata" and "value". + * Supports both unshredded (2 fields) and shredded (3 fields with "typed_value") layouts. */ def isVariantPhysicalSchema(structType: StructType): Boolean = { - if (structType.fields.length != 2) { - false - } else { - val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap - fieldMap.contains(HoodieSchema.Variant.VARIANT_VALUE_FIELD) && - fieldMap.contains(HoodieSchema.Variant.VARIANT_METADATA_FIELD) && - fieldMap(HoodieSchema.Variant.VARIANT_VALUE_FIELD) == BinaryType && - fieldMap(HoodieSchema.Variant.VARIANT_METADATA_FIELD) == BinaryType - } + val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap + val hasRequiredFields = fieldMap.contains(HoodieSchema.Variant.VARIANT_VALUE_FIELD) && + fieldMap.contains(HoodieSchema.Variant.VARIANT_METADATA_FIELD) && + fieldMap(HoodieSchema.Variant.VARIANT_VALUE_FIELD) == BinaryType && + fieldMap(HoodieSchema.Variant.VARIANT_METADATA_FIELD) == BinaryType + val isUnshredded = structType.fields.length == 2 + val isShredded = structType.fields.length == 3 && + fieldMap.contains(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD) + hasRequiredFields && (isUnshredded || isShredded) } // Handle VariantType comparisons diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala index 1516fe870057..2c87a5efe0af 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala @@ -276,6 +276,9 @@ object Spark40ParquetReader extends SparkParquetReaderBuilder { ) hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, sqlConf.parquetInferTimestampNTZEnabled) + hadoopConf.setBoolean(SQLConf.VARIANT_ALLOW_READING_SHREDDED.key, + options.getOrElse("hoodie.parquet.variant.allow.reading.shredded", "true").toBoolean) + val enableLogicalTimestampRepair = hadoopConf.getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true) val returningBatch = sqlConf.parquetVectorizedReaderEnabled && options.getOrElse(FileFormat.OPTION_RETURNING_BATCH,
