[
https://issues.apache.org/jira/browse/HUDI-9565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonathan Vexler updated HUDI-9565:
----------------------------------
Description:
There are 2 types of schema evolution that we have:
schema.on.write
schema.on.read
You can read more about those here:
[https://hudi.apache.org/docs/schema_evolution]
This document will focus on schema.on.write but could be expanded with
schema.on.read incorporated as well.
Schema.on.write is intended to support "easy" evolutions, and many query
engines already have most of the support built in. Our current support of
schema evolution is a patchwork of fixes targeted at specific readers or
engines.
Avro:
HoodieAvroParquetReader and HoodieAvroDataBlock will do the following:
# get the schema that the file was written with
# call HoodieAvroUtils#recordNeedsRewriteForExtendedAvroTypePromotion
# If that returns false, we can just use the reader schema to read the records
# if that returns true, we will call
HoodieAvroUtils#rewriteRecordWithNewSchema for each record to do any type
promotions that are not supported natively by avro
Spark:
We have 2 parquet readers:
HoodieSparkParquetReader
this uses the parquet-hadoop reader with a ParquetReadSupport from spark-sql
we use this to read parquet log files, but it is also used in the non-fg reader
based merge handle
The evolution here is similar to avro. We have a simple util class
SparkBasicSchemaEvolution that invokes
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo() this returns an
object that is essentially a mapping of columns that need to be casted from one
type to another. Then for each record we invoke
HoodieParquetFileFormatHelper.generateUnsafeProjection to generate a lambda
function that will convert each record into the desired output
The other spark reader is more complex.
*Overview of the SparkParquetReader, can skip this if you already are familiar*
We have an interface
{code:java}
trait SparkParquetReader extends Serializable {
/**
* Read an individual parquet file
*
* @param file parquet file to read
* @param requiredSchema desired output schema of the data
* @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
* @param internalSchemaOpt option of internal schema for schema.on.read
* @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
* @param storageConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
def read(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
internalSchemaOpt: util.Option[InternalSchema],
filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]):
Iterator[InternalRow]
} {code}
That is implemented for each spark version. The implementation mostly matches
the implementation in OSS Spark ParquetFileFormat. We have some compatibility
fixes for spark minor versions, and we also changed it so we can read
individual parquet files instead of creating a lambda function that is used to
read a series of files. We can tune the filters and schemas at a per file level
which was not possible with the default spark implementation. We have
Spark35ParquetReader, Spark34ParquetReader, Spark33ParquetReader implemented
and use spark adapter to fetch those implementations.
*Done with spark reader explanation*
Schema evolution was consolidated into a class
Spark3ParquetSchemaEvolutionUtils to reduce duplicate code, and to keep
SparkParquetReader as close to the OSS implementation as possible for better
maintainability.
We generate the type change infos and cast the records like we do in the other
spark reader, but schema.on.read is also handled here. Any pushed down filters
need to be rebuilt and if vectorized reader is used, we have a specialized
vectorized reader that will cast promoted columns.
*How do we make this universal?*
We need to put the schema evolution handling into the fg reader, remove the
specialized implementations, and make it generic. We will need to add
interfaces for reading the write schema from the file metadata, and project the
record before merging with log files. Additionally, we will need to modify
pushdown filters as well
Complications include:
# Extra read of metadata might be costly, we might consider changing reader
implementations to take in the metadata as a param so we only need a single read
# For Hive, only reordering top level cols has been implemented. Still need to
implement type promotions and nested cols
# WriteHandles have their own ways of dealing with schema evolution, if we
remove the evolution from the spark and avro readers, we need to ensure the
non-fg reader based write paths are still functional
# Performance cost of projection. Readers have built in mechanisms for most
cases of basic schema evolution. Do they have optimizations that our universal
solution won't be able to take advantage of?
# Non-fg reader paths. For spark, we avoid using the fg reader for file slices
that are just a base file. This avoids the overhead of the fg reader and the
overhead of sending the fileslice to the executor.
# Universal Schema? Avro is somewhat difficult to work with. We have
InternalSchema but it is scala and not widely used in the codebase.
# Vectorized reading. FG reader does not support vectorized reading. To
maintain schema evolution support for spark vectorized reader, we will need to
support vectorized in the fg reader and implementations such as projection
might require nontrivial work
was:
There are 2 types of schema evolution that we have:
schema.on.write
schema.on.read
You can read more about those here:
[https://hudi.apache.org/docs/schema_evolution]
This document will focus on schema.on.write but could be expanded with
schema.on.read incorporated as well.
Schema.on.write is intended to support "easy" evolutions, and many query
engines already have most of the support built in. Our current support of
schema evolution is a patchwork of fixes targeted at specific readers or
engines.
Avro:
HoodieAvroParquetReader and HoodieAvroDataBlock will do the following:
# get the schema that the file was written with
# call HoodieAvroUtils#recordNeedsRewriteForExtendedAvroTypePromotion
# If that returns false, we can just use the reader schema to read the records
# if that returns true, we will call
HoodieAvroUtils#rewriteRecordWithNewSchema for each record to do any type
promotions that are not supported natively by avro
Spark:
We have 2 parquet readers:
HoodieSparkParquetReader
this uses the parquet-hadoop reader with a ParquetReadSupport from spark-sql
we use this to read parquet log files, but it is also used in the non-fg reader
based merge handle
The evolution here is similar to avro. We have a simple util class
SparkBasicSchemaEvolution that invokes
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo() this returns an
object that is essentially a mapping of columns that need to be casted from one
type to another. Then for each record we invoke
HoodieParquetFileFormatHelper.generateUnsafeProjection to generate a lambda
function that will convert each record into the desired output
The other spark reader is more complex.
*Overview of the SparkParquetReader, can skip this if you already are familiar*
We have an interface
{code:java}
trait SparkParquetReader extends Serializable {
/**
* Read an individual parquet file
*
* @param file parquet file to read
* @param requiredSchema desired output schema of the data
* @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
* @param internalSchemaOpt option of internal schema for schema.on.read
* @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
* @param storageConf the hadoop conf
* @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
*/
def read(file: PartitionedFile,
requiredSchema: StructType,
partitionSchema: StructType,
internalSchemaOpt: util.Option[InternalSchema],
filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]):
Iterator[InternalRow]
} {code}
That is implemented for each spark version. The implementation mostly matches
the implementation in OSS Spark ParquetFileFormat. We have some compatibility
fixes for spark minor versions, and we also changed it so we can read
individual parquet files instead of creating a lambda function that is used to
read a series of files. We can tune the filters and schemas at a per file level
which was not possible with the default spark implementation. We have
Spark35ParquetReader, Spark34ParquetReader, Spark33ParquetReader implemented
and use spark adapter to fetch those implementations.
*Done with spark reader explanation*
Schema evolution was consolidated into a class
Spark3ParquetSchemaEvolutionUtils to reduce duplicate code, and to keep
SparkParquetReader as close to the OSS implementation as possible for better
maintainability.
We generate the type change infos and cast the records like we do in the other
spark reader, but schema.on.read is also handled here. Any pushed down filters
need to be rebuilt and if vectorized reader is used, we have a specialized
vectorized reader that will cast promoted columns.
*How do we make this universal?*
We need to put the schema evolution handling into the fg reader, remove the
specialized implementations, and make it generic. We will need to add
interfaces for reading the write schema from the file metadata, and project the
record before merging with log files. Additionally, we will need to modify
pushdown filters as well
Complications include: # Extra read of metadata might be costly, we might
consider changing reader implementations to take in the metadata as a param so
we only need a single read
# For Hive, only reordering top level cols has been implemented. Still need to
implement type promotions and nested cols
# WriteHandles have their own ways of dealing with schema evolution, if we
remove the evolution from the spark and avro readers, we need to ensure the
non-fg reader based write paths are still functional
# Performance cost of projection. Readers have built in mechanisms for most
cases of basic schema evolution. Do they have optimizations that our universal
solution won't be able to take advantage of?
# Non-fg reader paths. For spark, we avoid using the fg reader for file slices
that are just a base file. This avoids the overhead of the fg reader and the
overhead of sending the fileslice to the executor.
# Universal Schema? Avro is somewhat difficult to work with. We have
InternalSchema but it is scala and not widely used in the codebase.
# Vectorized reading. FG reader does not support vectorized reading. To
maintain schema evolution support for spark vectorized reader, we will need to
support vectorized in the fg reader and implementations such as projection
might require nontrivial work
> Proposal: Unify Schema Evolution in the Reader
> ----------------------------------------------
>
> Key: HUDI-9565
> URL: https://issues.apache.org/jira/browse/HUDI-9565
> Project: Apache Hudi
> Issue Type: New Feature
> Components: reader-core
> Reporter: Jonathan Vexler
> Priority: Major
>
> There are 2 types of schema evolution that we have:
> schema.on.write
> schema.on.read
> You can read more about those here:
> [https://hudi.apache.org/docs/schema_evolution]
> This document will focus on schema.on.write but could be expanded with
> schema.on.read incorporated as well.
> Schema.on.write is intended to support "easy" evolutions, and many query
> engines already have most of the support built in. Our current support of
> schema evolution is a patchwork of fixes targeted at specific readers or
> engines.
> Avro:
>
> HoodieAvroParquetReader and HoodieAvroDataBlock will do the following:
> # get the schema that the file was written with
> # call HoodieAvroUtils#recordNeedsRewriteForExtendedAvroTypePromotion
> # If that returns false, we can just use the reader schema to read the
> records
> # if that returns true, we will call
> HoodieAvroUtils#rewriteRecordWithNewSchema for each record to do any type
> promotions that are not supported natively by avro
> Spark:
> We have 2 parquet readers:
> HoodieSparkParquetReader
> this uses the parquet-hadoop reader with a ParquetReadSupport from spark-sql
> we use this to read parquet log files, but it is also used in the non-fg
> reader based merge handle
>
> The evolution here is similar to avro. We have a simple util class
> SparkBasicSchemaEvolution that invokes
> HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo() this returns an
> object that is essentially a mapping of columns that need to be casted from
> one type to another. Then for each record we invoke
> HoodieParquetFileFormatHelper.generateUnsafeProjection to generate a lambda
> function that will convert each record into the desired output
> The other spark reader is more complex.
> *Overview of the SparkParquetReader, can skip this if you already are
> familiar*
> We have an interface
> {code:java}
> trait SparkParquetReader extends Serializable {
> /**
> * Read an individual parquet file
> *
> * @param file parquet file to read
> * @param requiredSchema desired output schema of the data
> * @param partitionSchema schema of the partition columns. Partition
> values will be appended to the end of every row
> * @param internalSchemaOpt option of internal schema for schema.on.read
> * @param filters filters for data skipping. Not guaranteed to
> be used; the spark plan will also apply the filters.
> * @param storageConf the hadoop conf
> * @return iterator of rows read from the file output type says
> [[InternalRow]] but could be [[ColumnarBatch]]
> */
> def read(file: PartitionedFile,
> requiredSchema: StructType,
> partitionSchema: StructType,
> internalSchemaOpt: util.Option[InternalSchema],
> filters: Seq[Filter],
> storageConf: StorageConfiguration[Configuration]):
> Iterator[InternalRow]
> } {code}
> That is implemented for each spark version. The implementation mostly matches
> the implementation in OSS Spark ParquetFileFormat. We have some compatibility
> fixes for spark minor versions, and we also changed it so we can read
> individual parquet files instead of creating a lambda function that is used
> to read a series of files. We can tune the filters and schemas at a per file
> level which was not possible with the default spark implementation. We have
> Spark35ParquetReader, Spark34ParquetReader, Spark33ParquetReader implemented
> and use spark adapter to fetch those implementations.
> *Done with spark reader explanation*
>
> Schema evolution was consolidated into a class
> Spark3ParquetSchemaEvolutionUtils to reduce duplicate code, and to keep
> SparkParquetReader as close to the OSS implementation as possible for better
> maintainability.
>
> We generate the type change infos and cast the records like we do in the
> other spark reader, but schema.on.read is also handled here. Any pushed down
> filters need to be rebuilt and if vectorized reader is used, we have a
> specialized vectorized reader that will cast promoted columns.
> *How do we make this universal?*
> We need to put the schema evolution handling into the fg reader, remove the
> specialized implementations, and make it generic. We will need to add
> interfaces for reading the write schema from the file metadata, and project
> the record before merging with log files. Additionally, we will need to
> modify pushdown filters as well
> Complications include:
> # Extra read of metadata might be costly, we might consider changing reader
> implementations to take in the metadata as a param so we only need a single
> read
> # For Hive, only reordering top level cols has been implemented. Still need
> to implement type promotions and nested cols
> # WriteHandles have their own ways of dealing with schema evolution, if we
> remove the evolution from the spark and avro readers, we need to ensure the
> non-fg reader based write paths are still functional
> # Performance cost of projection. Readers have built in mechanisms for most
> cases of basic schema evolution. Do they have optimizations that our
> universal solution won't be able to take advantage of?
> # Non-fg reader paths. For spark, we avoid using the fg reader for file
> slices that are just a base file. This avoids the overhead of the fg reader
> and the overhead of sending the fileslice to the executor.
> # Universal Schema? Avro is somewhat difficult to work with. We have
> InternalSchema but it is scala and not widely used in the codebase.
> # Vectorized reading. FG reader does not support vectorized reading. To
> maintain schema evolution support for spark vectorized reader, we will need
> to support vectorized in the fg reader and implementations such as projection
> might require nontrivial work
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)