This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 45923f3cacd709ce60d753a58e6f37a9759cbf19 Author: Geser Dugarov <[email protected]> AuthorDate: Thu Mar 7 12:23:38 2024 +0700 [HUDI-6947] Refactored HoodieSchemaUtils.deduceWriterSchema with many flags (#10810) --- .../scala/org/apache/hudi/HoodieSchemaUtils.scala | 176 +++++++++++---------- 1 file changed, 93 insertions(+), 83 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala index cfc43453e9c..9aeff64f237 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala @@ -76,107 +76,117 @@ object HoodieSchemaUtils { latestTableSchemaOpt: Option[Schema], internalSchemaOpt: Option[InternalSchema], opts: Map[String, String]): Schema = { - val setNullForMissingColumns = opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(), - DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean - val shouldReconcileSchema = opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(), - DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean - val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, - HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean - latestTableSchemaOpt match { - // In case table schema is empty we're just going to use the source schema as a - // writer's schema. + // If table schema is empty, then we use the source schema as a writer's schema. case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema) // Otherwise, we need to make sure we reconcile incoming and latest table schemas case Some(latestTableSchemaWithMetaFields) => - // NOTE: Meta-fields will be unconditionally injected by Hudi writing handles, for the sake of - // deducing proper writer schema we're stripping them to make sure we can perform proper - // analysis - //add call to fix null ordering to ensure backwards compatibility + // NOTE: Meta-fields will be unconditionally injected by Hudi writing handles, for the sake of deducing proper writer schema + // we're stripping them to make sure we can perform proper analysis + // add call to fix null ordering to ensure backwards compatibility val latestTableSchema = AvroInternalSchemaConverter.fixNullOrdering(removeMetadataFields(latestTableSchemaWithMetaFields)) + // Before validating whether schemas are compatible, we need to "canonicalize" source's schema // relative to the table's one, by doing a (minor) reconciliation of the nullability constraints: // for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable // in the table's one we want to proceed aligning nullability constraints w/ the table's schema // Also, we promote types to the latest table schema if possible. - val shouldCanonicalizeSchema = opts.getOrDefault(CANONICALIZE_SCHEMA.key, - CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean - val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(), - SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean - + val shouldCanonicalizeSchema = opts.getOrDefault(CANONICALIZE_SCHEMA.key, CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) { canonicalizeSchema(sourceSchema, latestTableSchema, opts) } else { AvroInternalSchemaConverter.fixNullOrdering(sourceSchema) } - val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key, - HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean - + val shouldReconcileSchema = opts.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(), + DataSourceWriteOptions.RECONCILE_SCHEMA.defaultValue().toString).toBoolean if (shouldReconcileSchema) { - internalSchemaOpt match { - case Some(internalSchema) => - // Apply schema evolution, by auto-merging write schema and read schema - val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema) - val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName) - val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty - if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema - case None => - // In case schema reconciliation is enabled we will employ (legacy) reconciliation - // strategy to produce target writer's schema (see definition below) - val (reconciledSchema, isCompatible) = - reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema) - - // NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible - // w/ the table's one and allow schemas to diverge. This is required in cases where - // partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such - // only incoming dataset's projection has to match the table's schema, and not the whole one - if (!shouldValidateSchemasCompatibility || isCompatible) { - reconciledSchema - } else { - log.error( - s"""Failed to reconcile incoming batch schema with the table's one. - |Incoming schema ${sourceSchema.toString(true)} - |Incoming schema (canonicalized) ${canonicalizedSourceSchema.toString(true)} - |Table's schema ${latestTableSchema.toString(true)} - |""".stripMargin) - throw new SchemaCompatibilityException("Failed to reconcile incoming schema with the table's one") - } - } + deduceWriterSchemaWithReconcile(sourceSchema, canonicalizedSourceSchema, latestTableSchema, internalSchemaOpt, opts) + } else { + deduceWriterSchemaWithoutReconcile(sourceSchema, canonicalizedSourceSchema, latestTableSchema, opts) + } + } + } + + /** + * Deducing with disabled reconciliation. + * We have to validate that the source's schema is compatible w/ the table's latest schema, + * such that we're able to read existing table's records using [[sourceSchema]]. + */ + private def deduceWriterSchemaWithoutReconcile(sourceSchema: Schema, + canonicalizedSourceSchema: Schema, + latestTableSchema: Schema, + opts: Map[String, String]): Schema = { + // NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible + // w/ the table's one and allow schemas to diverge. This is required in cases where + // partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such + // only incoming dataset's projection has to match the table's schema, and not the whole one + val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean + val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, + HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean + val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key, + HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean + val setNullForMissingColumns = opts.getOrDefault(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(), + DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean + + if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && !allowAutoEvolutionColumnDrop) { + // Default behaviour + val reconciledSchema = if (setNullForMissingColumns) { + AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema) + } else { + canonicalizedSourceSchema + } + checkValidEvolution(reconciledSchema, latestTableSchema) + reconciledSchema + } else { + // If it's merge into writes, we don't check for projection nor schema compatibility. Writers down the line will take care of it. + // Or it's not merge into writes, and we don't validate schema, but we allow to drop columns automatically. + // Or it's not merge into writes, we validate schema, and schema is compatible. + if (shouldValidateSchemasCompatibility) { + checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, true, + allowAutoEvolutionColumnDrop, java.util.Collections.emptySet()) + } + canonicalizedSourceSchema + } + } + + /** + * Deducing with enabled reconciliation. + * Marked as Deprecated. + */ + private def deduceWriterSchemaWithReconcile(sourceSchema: Schema, + canonicalizedSourceSchema: Schema, + latestTableSchema: Schema, + internalSchemaOpt: Option[InternalSchema], + opts: Map[String, String]): Schema = { + internalSchemaOpt match { + case Some(internalSchema) => + // Apply schema evolution, by auto-merging write schema and read schema + val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema) + val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName) + val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty + if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema + case None => + // In case schema reconciliation is enabled we will employ (legacy) reconciliation + // strategy to produce target writer's schema (see definition below) + val (reconciledSchema, isCompatible) = + reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema) + + // NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible + // w/ the table's one and allow schemas to diverge. This is required in cases where + // partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such + // only incoming dataset's projection has to match the table's schema, and not the whole one + val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean + if (!shouldValidateSchemasCompatibility || isCompatible) { + reconciledSchema } else { - // In case reconciliation is disabled, we have to validate that the source's schema - // is compatible w/ the table's latest schema, such that we're able to read existing table's - // records using [[sourceSchema]]. - // - // NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible - // w/ the table's one and allow schemas to diverge. This is required in cases where - // partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such - // only incoming dataset's projection has to match the table's schema, and not the whole one - - if (mergeIntoWrites) { - // if its merge into writes, do not check for projection nor schema compatibility. Writers down the line will - // take care of it. - canonicalizedSourceSchema - } else { - if (!shouldValidateSchemasCompatibility) { - // if no validation is enabled, check for col drop - if (allowAutoEvolutionColumnDrop) { - canonicalizedSourceSchema - } else { - val reconciledSchema = if (setNullForMissingColumns) { - AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema) - } else { - canonicalizedSourceSchema - } - checkValidEvolution(reconciledSchema, latestTableSchema) - reconciledSchema - } - } else { - checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, true, - allowAutoEvolutionColumnDrop, java.util.Collections.emptySet()) - canonicalizedSourceSchema - } - } + log.error( + s"""Failed to reconcile incoming batch schema with the table's one. + |Incoming schema ${sourceSchema.toString(true)} + |Incoming schema (canonicalized) ${canonicalizedSourceSchema.toString(true)} + |Table's schema ${latestTableSchema.toString(true)} + |""".stripMargin) + throw new SchemaCompatibilityException("Failed to reconcile incoming schema with the table's one") } } }
