[ 
https://issues.apache.org/jira/browse/HUDI-9565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-9565:
----------------------------------
    Status: In Progress  (was: Open)

> Proposal: Unify Schema Evolution in the Reader
> ----------------------------------------------
>
>                 Key: HUDI-9565
>                 URL: https://issues.apache.org/jira/browse/HUDI-9565
>             Project: Apache Hudi
>          Issue Type: New Feature
>          Components: reader-core
>            Reporter: Jonathan Vexler
>            Assignee: Jonathan Vexler
>            Priority: Major
>             Fix For: 1.1.0
>
>
> There are 2 types of schema evolution that we have: 
> schema.on.write
> schema.on.read
> You can read more about those here: 
> [https://hudi.apache.org/docs/schema_evolution]
> This document will focus on schema.on.write but could be expanded with 
> schema.on.read incorporated as well.
> Schema.on.write is intended to support "easy" evolutions, and many query 
> engines already have most of the support built in. Our current support of 
> schema evolution is a patchwork of fixes targeted at specific readers or 
> engines. 
> Avro:
>  
> HoodieAvroParquetReader and HoodieAvroDataBlock will do the following:
>  #  get the schema that the file was written with
>  # call HoodieAvroUtils#recordNeedsRewriteForExtendedAvroTypePromotion 
>  # If that returns false, we can just use the reader schema to read the 
> records
>  # if that returns true, we will call 
> HoodieAvroUtils#rewriteRecordWithNewSchema for each record to do any type 
> promotions that are not supported natively by avro
> Spark:
> We have 2 parquet readers:
> HoodieSparkParquetReader
> this uses the parquet-hadoop reader with a ParquetReadSupport from spark-sql
> we use this to read parquet log files, but it is also used in the non-fg 
> reader based merge handle
>  
> The evolution here is similar to avro. We have a simple util class 
> SparkBasicSchemaEvolution that invokes 
> HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo() this returns an 
> object that is essentially a mapping of columns that need to be casted from 
> one type to another. Then for each record we invoke 
> HoodieParquetFileFormatHelper.generateUnsafeProjection to generate a lambda 
> function that will convert each record into the desired output
> The other spark reader is more complex.
> *Overview of the SparkParquetReader, can skip this if you already are 
> familiar*
> We have an interface 
> {code:java}
> trait SparkParquetReader extends Serializable {
>   /**
>    * Read an individual parquet file
>    *
>    * @param file               parquet file to read
>    * @param requiredSchema     desired output schema of the data
>    * @param partitionSchema    schema of the partition columns. Partition 
> values will be appended to the end of every row
>    * @param internalSchemaOpt  option of internal schema for schema.on.read
>    * @param filters            filters for data skipping. Not guaranteed to 
> be used; the spark plan will also apply the filters.
>    * @param storageConf        the hadoop conf
>    * @return iterator of rows read from the file output type says 
> [[InternalRow]] but could be [[ColumnarBatch]]
>    */
>   def read(file: PartitionedFile,
>            requiredSchema: StructType,
>            partitionSchema: StructType,
>            internalSchemaOpt: util.Option[InternalSchema],
>            filters: Seq[Filter],
>            storageConf: StorageConfiguration[Configuration]): 
> Iterator[InternalRow]
> } {code}
> That is implemented for each spark version. The implementation mostly matches 
> the implementation in OSS Spark ParquetFileFormat. We have some compatibility 
> fixes for spark minor versions, and we also changed it so we can read 
> individual parquet files instead of creating a lambda function that is used 
> to read a series of files. We can tune the filters and schemas at a per file 
> level which was not possible with the default spark implementation. We have 
> Spark35ParquetReader, Spark34ParquetReader, Spark33ParquetReader implemented 
> and use spark adapter to fetch those implementations.
> *Done with spark reader explanation*
>  
> Schema evolution was consolidated into a class 
> Spark3ParquetSchemaEvolutionUtils to reduce duplicate code, and to keep 
> SparkParquetReader as close to the OSS implementation as possible for better 
> maintainability. 
>  
> We generate the type change infos and cast the records like we do in the 
> other spark reader, but schema.on.read is also handled here. Any pushed down 
> filters need to be rebuilt and if vectorized reader is used, we have a 
> specialized vectorized reader that will cast promoted columns.
> *How do we make this universal or unify across record formats (spark, avro, 
> java, hive)?* 
> We need to put the schema evolution handling into the fg reader, remove the 
> specialized implementations, and make it generic. We will need to add 
> interfaces for reading the write schema from the file metadata, and project 
> the record before merging with log files. Additionally, we will need to 
> modify pushdown filters as well
> Complications include: 
>  # Extra read of metadata might be costly, we might consider changing reader 
> implementations to take in the metadata as a param so we only need a single 
> read
>  # For Hive, only reordering top level cols has been implemented. Still need 
> to implement type promotions and nested cols
>  # WriteHandles have their own ways of dealing with schema evolution, if we 
> remove the evolution from the spark and avro readers, we need to ensure the 
> non-fg reader based write paths are still functional
>  # Performance cost of projection. Readers have built in mechanisms for most 
> cases of basic schema evolution. Do they have optimizations that our 
> universal solution won't be able to take advantage of?
>  # Non-fg reader paths. For spark, we avoid using the fg reader for file 
> slices that are just a base file. This avoids the overhead of the fg reader 
> and the overhead of sending the fileslice to the executor.
>  # Universal Schema? Avro is somewhat difficult to work with. We have 
> InternalSchema but it is scala and not widely used in the codebase.
>  # Vectorized reading. FG reader does not support vectorized reading. To 
> maintain schema evolution support for spark vectorized reader, we will need 
> to support vectorized in the fg reader and implementations such as projection 
> might require nontrivial work
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to