[ 
https://issues.apache.org/jira/browse/HUDI-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Udit Mehrotra updated HUDI-915:
-------------------------------
    Description: 
This issue happens in when the source data is partitioned using _*hive-style 
partitioning*_ which is also the default behavior of spark when it writes the 
data. With this partitioning, the partition column/schema is never stored in 
the files but instead retrieved on the fly from the file paths which have 
partition folder in the form *_partition_key=partition_value_*.

Now, during metadata bootstrap we store only the metadata columns in the hudi 
table folder. Also the *bootstrap schema* we are computing directly reads 
schema from the source data file which does not have the *partition column 
schema* in it. Thus it is not complete.

All this manifests into issues when we ultimately do *upserts* on these 
bootstrapped files and they are fully bootstrapped. During upsert time the 
schema evolves because the upsert dataframe needs to have partition column in 
it for performing upserts. Thus ultimately the *upserted rows* have the correct 
partition column value stored, while the other records which are simply copied 
over from the metadata bootstrap file have missing partition column in them. 
Thus, we observe a different behavior here with *bootstrapped* vs 
*non-bootstrapped* tables.

While this is not at the moment creating issues with *Hive* because it is able 
to determine the partition columns becuase of all the metadata it stores, 
however it creates a problem with other engines like *Spark* where the 
partition columns will show up as *null* when the upserted files are read.

Thus, the proposal is to fix the following issues:
 * When performing bootstrap*,* figure out the partition schema and store it in 
the *bootstrap schema* in the commit metadata file. This would provide the 
following benefits:
 ** From a completeness perspective this is good so that there is no behavioral 
changes between bootstrapped vs non-bootstrapped tables.
 ** In spark bootstrap relation and incremental query relation where we need to 
figure out the latest schema, once can simply get the accurate schema from the 
commit metadata file instead of having to determine whether or not partition 
column is present in the schema obtained from the metadata file and if not 
figure out the partition schema everytime and merge (which can be expensive).
 * When doing upsert on files that are metadata bootstrapped, the partition 
column values should be correctly determined and copied to the upserted file to 
avoid missing and null values.
 ** Again this is consistent behavior with non-bootstrapped tables and even 
though Hive seems to somehow handle this, we should consider other engines like 
*Spark* where it cannot be automatically handled.
 ** Without this it will be significantly more complicated to be able to 
provide the partition value on read side in spark, to be able to determine 
everytime whether partition value is null and somehow filling it in.
 ** Once the table is fully bootstrapped at some point in future, and the 
bootstrap commit is say cleaned up and spark querying happens through *parquet* 
datasource instead of *new bootstrapped datasource*, the *parquet datasource* 
will return null values wherever it find the missing partition values. In that 
case, we have no control over the *parquet* datasource as it is simply reading 
from the file. 

  was:
This issue happens in when the source data is partitioned using _*hive-style 
partitioning*_ which is also the default behavior of spark when it writes the 
data. With this partitioning, the partition column/schema is never stored in 
the files but instead retrieved on the fly from the file paths which have 
partition folder in the form *_partition_key=partition_value_*.

Now, during metadata bootstrap we store only the metadata columns in the hudi 
table folder. Also the *bootstrap schema* we are computing directly reads 
schema from the source data file which does not have the *partition column 
schema* in it. Thus it is not complete.

All this manifests into issues when we ultimately do *upserts* on these 
bootstrapped files and they are fully bootstrapped. During upsert time the 
schema evolves because the upsert dataframe needs to have partition column in 
it for performing upserts. Thus ultimately the *upserted rows* have the correct 
partition column value stored, while the other records which are simply copied 
over from the metadata bootstrap file have missing partition column in them. 
Thus, we observe a different behavior here with *bootstrapped* vs 
*non-bootstrapped* tables.

While this is not at the moment creating issues with *Hive* because it is able 
to determine the partition columns becuase of all the metadata it stores, 
however it creates a problem with other engines like *Spark* where the 
partition columns will show up as *null* when the upserted files are read. 

Thus, the proposal is to fix the following issues:
 * When performing **bootstrap, figure out the partition schema and store it in 
the *bootstrap schema* in the commit metadata file. This would provide the 
following benefits:
 ** From a completeness perspective this is good so that there is no behavioral 
changes between bootstrapped vs non-bootstrapped tables.
 ** In spark bootstrap relation and incremental query relation where we need to 
figure out the latest schema, once can simply get the accurate schema from the 
commit metadata file instead of having to determine whether or not partition 
column is present in the schema obtained from the metadata file and if not 
figure out the partition schema everytime and merge (which can be expensive).
 * When doing upsert on files that are metadata bootstrapped, the partition 
column values should be correctly determined and copied to the upserted file to 
avoid missing and null values.
 ** Again this is consistent behavior with non-bootstrapped tables and even 
though Hive seems to somehow handle this, we should consider other engines like 
*Spark* where it cannot be automatically handled.
 ** Without this it will be significantly more complicated to be able to 
provide the partition value on read side in spark, to be able to determine 
everytime whether partition value is null and somehow filling it in.
 ** Once the table is fully bootstrapped at some point in future, and the 
bootstrap commit is say cleaned up and spark querying happens through *parquet* 
datasource instead of *new bootstrapped datasource*, the *parquet datasource* 
will return null values wherever it find the missing partition values. In that 
case, we have no control over the *parquet* datasource as it is simply reading 
from the file. 


> Partition Columns missing in files upserted after Metadata Bootstrap
> --------------------------------------------------------------------
>
>                 Key: HUDI-915
>                 URL: https://issues.apache.org/jira/browse/HUDI-915
>             Project: Apache Hudi (incubating)
>          Issue Type: Sub-task
>          Components: Common Core
>            Reporter: Udit Mehrotra
>            Priority: Major
>
> This issue happens in when the source data is partitioned using _*hive-style 
> partitioning*_ which is also the default behavior of spark when it writes the 
> data. With this partitioning, the partition column/schema is never stored in 
> the files but instead retrieved on the fly from the file paths which have 
> partition folder in the form *_partition_key=partition_value_*.
> Now, during metadata bootstrap we store only the metadata columns in the hudi 
> table folder. Also the *bootstrap schema* we are computing directly reads 
> schema from the source data file which does not have the *partition column 
> schema* in it. Thus it is not complete.
> All this manifests into issues when we ultimately do *upserts* on these 
> bootstrapped files and they are fully bootstrapped. During upsert time the 
> schema evolves because the upsert dataframe needs to have partition column in 
> it for performing upserts. Thus ultimately the *upserted rows* have the 
> correct partition column value stored, while the other records which are 
> simply copied over from the metadata bootstrap file have missing partition 
> column in them. Thus, we observe a different behavior here with 
> *bootstrapped* vs *non-bootstrapped* tables.
> While this is not at the moment creating issues with *Hive* because it is 
> able to determine the partition columns becuase of all the metadata it 
> stores, however it creates a problem with other engines like *Spark* where 
> the partition columns will show up as *null* when the upserted files are read.
> Thus, the proposal is to fix the following issues:
>  * When performing bootstrap*,* figure out the partition schema and store it 
> in the *bootstrap schema* in the commit metadata file. This would provide the 
> following benefits:
>  ** From a completeness perspective this is good so that there is no 
> behavioral changes between bootstrapped vs non-bootstrapped tables.
>  ** In spark bootstrap relation and incremental query relation where we need 
> to figure out the latest schema, once can simply get the accurate schema from 
> the commit metadata file instead of having to determine whether or not 
> partition column is present in the schema obtained from the metadata file and 
> if not figure out the partition schema everytime and merge (which can be 
> expensive).
>  * When doing upsert on files that are metadata bootstrapped, the partition 
> column values should be correctly determined and copied to the upserted file 
> to avoid missing and null values.
>  ** Again this is consistent behavior with non-bootstrapped tables and even 
> though Hive seems to somehow handle this, we should consider other engines 
> like *Spark* where it cannot be automatically handled.
>  ** Without this it will be significantly more complicated to be able to 
> provide the partition value on read side in spark, to be able to determine 
> everytime whether partition value is null and somehow filling it in.
>  ** Once the table is fully bootstrapped at some point in future, and the 
> bootstrap commit is say cleaned up and spark querying happens through 
> *parquet* datasource instead of *new bootstrapped datasource*, the *parquet 
> datasource* will return null values wherever it find the missing partition 
> values. In that case, we have no control over the *parquet* datasource as it 
> is simply reading from the file. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to