the-other-tim-brown commented on code in PR #14265: URL: https://github.com/apache/hudi/pull/14265#discussion_r2539600878
########## hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java: ########## @@ -0,0 +1,931 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieAvroSchemaException; + +import org.apache.avro.JsonProperties; +import org.apache.avro.Schema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Wrapper class for Avro Schema that provides Hudi-specific schema functionality + * while maintaining binary compatibility with Avro. + * + * <p>This class serves as the primary interface for schema operations within Hudi. + * It encapsulates an Avro Schema and provides a consistent, type-safe API while + * maintaining full compatibility with existing Avro-based code.</p> + * + * <p>Key features: + * <ul> + * <li>Binary compatibility with Avro Schema</li> + * <li>Type-safe field access through HoodieSchemaField</li> + * <li>Support for all Avro schema types and operations</li> + * <li>Consistent error handling using Hudi exceptions</li> + * <li>Integration with Hudi's Option type for null safety</li> + * </ul></p> + * + * <p>Usage examples: + * <pre>{@code + * // Create from JSON + * HoodieSchema schema = HoodieSchema.parse(jsonSchemaString); + * + * // Create primitive schemas + * HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING); + * + * // Access schema properties + * HoodieSchemaType type = schema.getType(); + * List<HoodieSchemaField> fields = schema.getFields(); + * Option<HoodieSchemaField> field = schema.getField("fieldName"); + * + * // Convert back to Avro for compatibility + * Schema avroSchema = schema.getAvroSchema(); + * }</pre></p> + * + * @since 1.2.0 + */ +public class HoodieSchema implements Serializable { + + /** + * Constant representing a null JSON value, equivalent to JsonProperties.NULL_VALUE. + * This provides compatibility with Avro's JsonProperties while maintaining Hudi's API. + */ + public static final Object NULL_VALUE = JsonProperties.NULL_VALUE; + private static final long serialVersionUID = 1L; + private final Schema avroSchema; + private final HoodieSchemaType type; + + /** + * Creates a new HoodieSchema wrapping the given Avro schema. + * + * @param avroSchema the Avro schema to wrap, cannot be null + * @throws IllegalArgumentException if avroSchema is null + */ + public HoodieSchema(Schema avroSchema) { + ValidationUtils.checkArgument(avroSchema != null, "Avro schema cannot be null"); + this.avroSchema = avroSchema; + Schema.Type avroType = avroSchema.getType(); + ValidationUtils.checkState(avroType != null, "Avro schema type cannot be null"); + this.type = HoodieSchemaType.fromAvroType(avroType); + } + + /** + * Factory method to create HoodieSchema from an Avro schema. + * + * @param avroSchema the Avro schema to wrap + * @return new HoodieSchema instance + * @throws IllegalArgumentException if avroSchema is null + */ + public static HoodieSchema fromAvroSchema(Schema avroSchema) { + return new HoodieSchema(avroSchema); + } + + /** + * Parses a JSON schema string and returns the corresponding HoodieSchema. + * + * @param jsonSchema the JSON schema string to parse + * @return parsed HoodieSchema + * @throws HoodieAvroSchemaException if the schema string is invalid + */ + public static HoodieSchema parse(String jsonSchema) { + ValidationUtils.checkArgument(jsonSchema != null && !jsonSchema.trim().isEmpty(), + "Schema string cannot be null or empty"); + + try { + Schema avroSchema = new Schema.Parser().parse(jsonSchema); + return new HoodieSchema(avroSchema); + } catch (Exception e) { + throw new HoodieAvroSchemaException("Failed to parse schema: " + jsonSchema + ". Error: " + e.getMessage()); + } + } + + /** + * Creates a schema for the specified primitive type. + * + * @param type the primitive schema type + * @return new HoodieSchema for the primitive type + * @throws IllegalArgumentException if type is not a primitive type + */ + public static HoodieSchema create(HoodieSchemaType type) { + ValidationUtils.checkArgument(type != null, "Schema type cannot be null"); + ValidationUtils.checkArgument(type.isPrimitive(), "Only primitive types are supported: " + type); + + Schema.Type avroType = type.toAvroType(); + ValidationUtils.checkState(avroType != null, "Converted Avro type cannot be null"); + + Schema avroSchema = Schema.create(avroType); + return new HoodieSchema(avroSchema); + } + + /** + * Creates a nullable schema (union of null and the specified schema). + * + * @param schema the schema to make nullable + * @return new HoodieSchema representing a nullable version of the input schema + */ + public static HoodieSchema createNullable(HoodieSchema schema) { + ValidationUtils.checkArgument(schema != null, "Schema cannot be null"); + + Schema inputAvroSchema = schema.avroSchema; + ValidationUtils.checkState(inputAvroSchema != null, "Input schema's Avro schema cannot be null"); + + if (schema.getType() == HoodieSchemaType.UNION) { + // Already a union, check if it contains null + List<Schema> unionTypes = inputAvroSchema.getTypes(); + boolean hasNull = unionTypes.stream().anyMatch(s -> s.getType() == Schema.Type.NULL); + + if (hasNull) { + return schema; // Already nullable + } + + // Add null to existing union + List<Schema> newUnionTypes = new ArrayList<>(); + newUnionTypes.add(Schema.create(Schema.Type.NULL)); + newUnionTypes.addAll(unionTypes); + Schema nullableSchema = Schema.createUnion(newUnionTypes); + return new HoodieSchema(nullableSchema); + } else { + // Create new union with null + List<Schema> unionTypes = new ArrayList<>(); + unionTypes.add(Schema.create(Schema.Type.NULL)); + unionTypes.add(inputAvroSchema); + Schema nullableSchema = Schema.createUnion(unionTypes); + return new HoodieSchema(nullableSchema); + } + } + + /** + * Creates an array schema with the specified element schema. + * + * @param elementSchema the schema for array elements + * @return new HoodieSchema representing an array + */ + public static HoodieSchema createArray(HoodieSchema elementSchema) { + ValidationUtils.checkArgument(elementSchema != null, "Element schema cannot be null"); + + Schema elementAvroSchema = elementSchema.avroSchema; + ValidationUtils.checkState(elementAvroSchema != null, "Element schema's Avro schema cannot be null"); + + Schema arraySchema = Schema.createArray(elementAvroSchema); + return new HoodieSchema(arraySchema); + } + + /** + * Creates a map schema with the specified value schema. + * + * @param valueSchema the schema for map values + * @return new HoodieSchema representing a map + */ + public static HoodieSchema createMap(HoodieSchema valueSchema) { + ValidationUtils.checkArgument(valueSchema != null, "Value schema cannot be null"); + + Schema valueAvroSchema = valueSchema.avroSchema; + ValidationUtils.checkState(valueAvroSchema != null, "Value schema's Avro schema cannot be null"); + + Schema mapSchema = Schema.createMap(valueAvroSchema); + return new HoodieSchema(mapSchema); + } + + /** + * Creates a record schema with the specified properties. + * + * @param name the record name + * @param namespace the namespace (can be null) + * @param doc the documentation (can be null) + * @param fields the list of fields + * @return new HoodieSchema representing a record + */ + public static HoodieSchema createRecord(String name, String namespace, String doc, List<HoodieSchemaField> fields) { + return createRecord(name, doc, namespace, false, fields); + } + + /** + * Creates a record schema with the specified properties, including error flag. + * + * @param name the record name + * @param doc the documentation (can be null) + * @param namespace the namespace (can be null) + * @param isError whether this is an error record + * @param fields the list of fields + * @return new HoodieSchema representing a record + */ + public static HoodieSchema createRecord(String name, String doc, String namespace, boolean isError, List<HoodieSchemaField> fields) { + ValidationUtils.checkArgument(name != null && !name.isEmpty(), "Record name cannot be null or empty"); + ValidationUtils.checkArgument(fields != null, "Fields cannot be null"); + + // Convert HoodieSchemaFields to Avro Fields + List<Schema.Field> avroFields = fields.stream() + .map(HoodieSchemaField::getAvroField) + .collect(Collectors.toList()); + + Schema recordSchema = Schema.createRecord(name, doc, namespace, isError); + recordSchema.setFields(avroFields); + return new HoodieSchema(recordSchema); + } + + /** + * Creates a union schema from multiple schemas. + * + * @param schemas the schemas to include in the union + * @return new HoodieSchema representing the union + */ + public static HoodieSchema createUnion(List<HoodieSchema> schemas) { + ValidationUtils.checkArgument(schemas != null && !schemas.isEmpty(), "Union schemas cannot be null or empty"); + + List<Schema> avroSchemas = schemas.stream() + .map(HoodieSchema::getAvroSchema) + .collect(Collectors.toList()); + + Schema unionSchema = Schema.createUnion(avroSchemas); + return new HoodieSchema(unionSchema); + } + + /** + * Creates a union schema from multiple schemas (varargs version). + * + * @param schemas the schemas to include in the union + * @return new HoodieSchema representing the union + */ + public static HoodieSchema createUnion(HoodieSchema... schemas) { + ValidationUtils.checkArgument(schemas != null && schemas.length > 0, "Union schemas cannot be null or empty"); + return createUnion(Arrays.asList(schemas)); + } + + /** + * Creates an enum schema with the specified properties. + * + * @param name the enum name + * @param namespace the namespace (can be null) + * @param doc the documentation (can be null) + * @param symbols the list of enum symbols + * @return new HoodieSchema representing an enum + */ + public static HoodieSchema createEnum(String name, String namespace, String doc, List<String> symbols) { + ValidationUtils.checkArgument(name != null && !name.isEmpty(), "Enum name cannot be null or empty"); + ValidationUtils.checkArgument(symbols != null && !symbols.isEmpty(), "Enum symbols cannot be null or empty"); + + Schema enumSchema = Schema.createEnum(name, doc, namespace, symbols); + return new HoodieSchema(enumSchema); + } + + /** + * Creates a fixed schema with the specified properties. + * + * @param name the fixed type name + * @param namespace the namespace (can be null) + * @param doc the documentation (can be null) + * @param size the size in bytes + * @return new HoodieSchema representing a fixed type + */ + public static HoodieSchema createFixed(String name, String namespace, String doc, int size) { + ValidationUtils.checkArgument(name != null && !name.isEmpty(), "Fixed name cannot be null or empty"); + ValidationUtils.checkArgument(size > 0, "Fixed size must be positive: " + size); + + Schema fixedSchema = Schema.createFixed(name, doc, namespace, size); + return new HoodieSchema(fixedSchema); + } + + /** + * Returns the Hudi schema version information. + * + * @return version string of the Hudi schema system + */ + public static String getHudiSchemaVersion() { + Package pkg = HoodieSchema.class.getPackage(); + String version = pkg != null ? pkg.getImplementationVersion() : null; + return version != null ? version : "unknown"; + } + + /** + * Creates a Hudi write schema from a given schema string with optional operation field. + * This is equivalent to HoodieAvroUtils.createHoodieWriteSchema() but returns HoodieSchema. + * + * @param schemaStr the schema string to convert + * @param withOperationField whether to include operation field metadata + * @return HoodieSchema configured for write operations + * @throws IllegalArgumentException if schema string is invalid + */ + public static HoodieSchema createHoodieWriteSchema(String schemaStr, boolean withOperationField) { + ValidationUtils.checkArgument(schemaStr != null && !schemaStr.trim().isEmpty(), + "Schema string cannot be null or empty"); + Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaStr, withOperationField); + return HoodieSchema.fromAvroSchema(avroSchema); + } + + /** + * Adds metadata fields to an existing HoodieSchema. + * This is equivalent to HoodieAvroUtils.addMetadataFields() but operates on HoodieSchemas. + * + * @param schema the base schema to add metadata fields to + * @param withOperationField whether to include operation field metadata + * @return new HoodieSchema with metadata fields added + * @throws IllegalArgumentException if schema is null + */ + public static HoodieSchema addMetadataFields(HoodieSchema schema, boolean withOperationField) { + ValidationUtils.checkArgument(schema != null, "Schema cannot be null"); + Schema avroSchema = schema.toAvroSchema(); + Schema resultAvro = HoodieAvroUtils.addMetadataFields(avroSchema, withOperationField); + return HoodieSchema.fromAvroSchema(resultAvro); + } + + /** + * Removes metadata fields from a HoodieSchema. + * This is equivalent to HoodieAvroUtils.removeMetadataFields() but operates on HoodieSchemas. + * + * @param schema the schema to remove metadata fields from + * @return new HoodieSchema without metadata fields + * @throws IllegalArgumentException if schema is null + */ + public static HoodieSchema removeMetadataFields(HoodieSchema schema) { + ValidationUtils.checkArgument(schema != null, "Schema cannot be null"); + Schema avroSchema = schema.toAvroSchema(); + Schema resultAvro = HoodieAvroUtils.removeMetadataFields(avroSchema); + return HoodieSchema.fromAvroSchema(resultAvro); + } + + /** + * Returns the type of this schema. + * + * @return the schema type + */ + public HoodieSchemaType getType() { + return type; + } + + /** + * Returns the name of this schema, if it has one. + * + * @return Option containing the schema name, or Option.empty() if none + */ + public Option<String> getName() { + return Option.ofNullable(avroSchema.getName()); + } + + /** + * Returns the namespace of this schema, if it has one. + * + * @return Option containing the schema namespace, or Option.empty() if none + */ + public Option<String> getNamespace() { + return Option.ofNullable(avroSchema.getNamespace()); + } + + /** + * Returns the full name of this schema (namespace + name). + * + * @return Option containing the full schema name, or Option.empty() if none + */ + public Option<String> getFullName() { + return Option.ofNullable(avroSchema.getFullName()); + } + + /** + * Returns the documentation string for this schema, if any. + * + * @return Option containing the documentation, or Option.empty() if none + */ + public Option<String> getDoc() { + return Option.ofNullable(avroSchema.getDoc()); + } + + /** + * Returns the fields of this record schema. + * + * @return list of HoodieSchemaField objects + * @throws IllegalStateException if this is not a record schema + */ + public List<HoodieSchemaField> getFields() { + if (type != HoodieSchemaType.RECORD) { + throw new IllegalStateException("Cannot get fields from non-record schema: " + type); + } + + return avroSchema.getFields().stream() Review Comment: This is a good point but I think we can hold off on the optimizations until we refactor the inner workings of this class to remove the avro dependency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
