danny0405 commented on code in PR #12005:
URL: https://github.com/apache/hudi/pull/12005#discussion_r1792630989


##########
rfc/rfc-82/rfc-82.md:
##########
@@ -0,0 +1,254 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-82: Concurrency control of schema evolution
+
+## Proposers
+
+- @jonvex
+- @Davis-Zhang-Onehouse
+
+## Approvers
+- @yihua
+- @sivabalan
+
+## Status
+
+JIRA: https://issues.apache.org/jira/browse/HUDI-8221
+
+## Abstract
+
+This RFC proposes to enhance Hudi's concurrency control mechanism to handle 
concurrent schema evolution scenarios. The current implementation does not 
adequately address conflicts that may arise when multiple transactions attempt 
to alter the table schema simultaneously. This enhancement aims to detect and 
prevent such conflicts, ensuring data consistency and integrity in multi-writer 
environments.
+
+## Background
+
+### Hudi Schema
+In Apache Hudi, schema management is a critical component that ensures data 
consistency and facilitates efficient data processing, especially in 
environments where data structures may evolve over time.
+
+The schema of the input data represents the structure of incoming records 
being ingested into Hudi from various sources. During a write operation, the 
Hudi write client utilizes a write schema, which is typically derived from the 
input data schema or specified by a schema provider. This write schema is 
applied throughout the transaction to maintain consistency.
+
+Additionally, Hudi stores the table schema within the commit metadata on 
storage, capturing the authoritative schema of the hudi table table at the time 
of each commit. This stored schema is crucial for readers to correctly 
interpret the data and for managing schema evolution across different data 
versions. These schemas are determined through a reconciliation process that 
considers both the incoming data schema and the existing table schema, allowing 
Hudi to handle schema changes gracefully. Currently, Hudi uses this schema 
management approach to enable seamless read and write operations, support 
upserts and deletes, and manage schema evolution, ensuring that the system 
remains robust even as the underlying data structures change.
+
+### Concurrency control over concurrent schema evolution
+Hudi supports concurrency control to handle concurrent write operations. 
However, the existing implementation does not specifically account for schema 
evolution conflicts. In a multi-writer environment, it's possible for different 
clients to attempt schema changes concurrently, which can lead to 
inconsistencies if not properly managed.
+
+Schema evolution is a critical operation that can significantly impact data 
compatibility and query results. Uncontrolled concurrent schema changes can 
result in data inconsistencies, failed reads, or incorrect query results. By 
extending the concurrency control mechanism to handle schema evolution, we can 
prevent these issues and ensure a more robust and reliable data management 
system.
+
+Depending on if there are concurrent schema evolution transactions, an 
inflight transaction may see different schema of the table when it checks at 
different time, there can always be other transactions committed and 
potentially changed schema as a result of that.
+
+We use "txn" as a abbreviation of transaction.
+
+| Scenario | Table Schema when Txn Start | Table Schema when Txn Validates | 
Schema Used by curr Txn | Should Conflict? | Write Schema used by Commit 
Metadata of curr Txn | Notes |
+|----------|---------------------|--------------------------|-------------------|------------------|---------------------------------------|-------|
+| 1 | Not exists | Not exists | S1 | No | S1 | Current txn is the first commit 
ever, conflict is impossible |
+| 2 | Not exists | S1 | S1 | No | S1 | Second commit, no schema evolution |
+| 3 | Not exists | S1 | S2 | Yes | N/A (throws exception) | No predefined 
schema, effectively concurrent schema definition. The design decision made here 
is to identify this as schema conflict for simplicity. It can also be broken 
down into cases where (1) S3 is evolvable from S2, (2) S3 is not evolvable from 
S2, for further optimization. |
+| 4 | S1 | S1 | S1 | No | S1 | No schema evolution |
+| 5 | S1 | S1 | S2 | No | S2 | Schema evolution in current transaction |
+| 6 | S1 | S2 | S1 | No | S2 | Backwards compatibility handles it |
+| 7 | S1 | S2 | S2 | No | S2 | Concurrent evolution to same schema |
+| 8 | S1 | S2 | S3 | Yes | N/A (throws exception) | Concurrent evolution to 
different schemas. Same as case 2, for future optimization we should consider 
evolvibility from S2 to S3 |
+
+For timeline graph of each case please refer appendix.
+
+Notes:
+- S1, S2, S3 represent different schemas. The proposed solution does not 
assume compability/evolvbility among those schemas.
+- 3 schemas to consider:
+  + The table schema from the last committed txn when the current txn starts.
+  + The table schema from the last committed txn when the current txn 
validates.
+  + The table schema that the current txn uses.
+- "Not exists" means the table is empty and table schema is not set.
+
+## Implementation
+
+The proposed implementation involves the following key changes:
+
+1. Enhance the `TransactionUtils` class to include schema conflict detection:
+   - Add a new method `abortTxnOnConcurrentSchemaEvolution` to check for 
schema conflicts.
+
+2. Schema conflict detection logic:
+   - Follow the graph as explained above.
+
+## Rollout/Adoption Plan
+
+1. Impact on existing users:
+   - Users leveraging multi-writer capabilities with schema evolution will 
benefit from improved consistency.
+   - No breaking changes for users not using concurrent schema evolution.
+
+2. Phasing out older behavior:
+   - The code is not protected by a feature flag, once it is merged the 
behavior is adopted in the next release.
+
+3. Migration:
+   - No special migration tools are required.
+   - Users should update to the latest Hudi version to benefit from this 
enhancement.
+
+
+## Test Plan
+
+The implementation includes comprehensive test cases in the 
`TestHoodieClientMultiWriter`:
+
+1. Test scenarios:
+   - Covered by the table in the previous section.
+
+2. Test variations:
+   - Both COPY_ON_WRITE and MERGE_ON_READ table types
+
+# Appendix
+
+## Timeline of concurrency schema evolution case
+
+**Scenario 1**
+
+
+Should Conflict?: No
+```
+Time 
---------------------------------------------------------------------------------->
+
+Txn1:    [ Start Txn1 ]------------------------------[ Validate 
]--------------------[ Commit Txn1 ]
+            Uses Schema: S1                                                    
 Writes Schema: S1
+
+Table Schema:
+          [ Not Exists ]----------------------------[ Not Exists 
]--------------------[ S1 ]
+```
+Notes:
+- Txn1 is the first transaction; it creates the table with schema S1.
+- No conflicts occur since there are no other transactions.
+
+
+**Scenario 2**
+
+```
+Time 
---------------------------------------------------------------------------------->
+
+Txn1:    [ Start Txn1 ]------------------------[ Commit Txn1 ]
+            Creates Schema: S1
+
+Txn2:            [ Start Txn2 ]------------------------[ Validate 
]--------------------[ Commit Txn2 ]
+                    Uses Schema: S1                                            
     Writes Schema: S1
+
+Table Schema:
+          [ Not Exists ]-----------------------[ S1 
]----------------------------------[ S1 ]
+```
+Notes:
+- Txn2 starts when the table does not exist.
+- Txn1 commits before Txn2 validates, creating schema S1.
+- Txn2 validates against schema S1; no conflict occurs.
+
+**Scenario 3**
+```
+Time 
---------------------------------------------------------------------------------->
+
+Txn1:    [ Start Txn1 ]------------------------[ Commit Txn1 ]
+            Creates Schema: S1
+
+Txn2:            [ Start Txn2 ]------------------------[ Validate ]----X
+                    Uses Schema: S2
+
+Table Schema:
+          [ Not Exists ]-----------------------[ S1 
]----------------------------------------
+```
+Notes:
+- Txn2 starts when the table does not exist, intending to use schema S2.
+- Txn1 commits before Txn2 validates, creating schema S1.
+- At validation, Txn2 detects schema conflict (S1 vs. S2); transaction fails.
+
+A future improvement is to check the compatibility of s1 s2 and reconcile 
properly.
+
+**Scenario 4**
+```
+Time 
---------------------------------------------------------------------------------->
+
+Txn1:    [ Start Txn1 ]------------------------------[ Validate 
]--------------------[ Commit Txn1 ]
+            Uses Schema: S1                                                    
 Writes Schema: S1
+
+Table Schema:
+          [ S1 ]------------------------------------[ S1 
]----------------------------[ S1 ]
+```
+Notes:
+- Txn1 operates entirely under schema S1.
+- No schema changes occur; no conflicts arise.
+
+**Scenario 5**
+```
+Time 
---------------------------------------------------------------------------------->
+
+Txn1:    [ Start Txn1 ]------------------------------[ Validate 
]--------------------[ Commit Txn1 ]
+            Uses Schema: S2                                                    
 Writes Schema: S2
+
+Table Schema:
+          [ S1 ]------------------------------------[ S1 
]----------------------------[ S2 ]
+```
+Notes:
+- Txn1 starts with schema S1 and evolves it to S2 within the transaction.
+- No other transactions interfere; no conflicts occur.
+
+**Scenario 6**
+```
+Time 
---------------------------------------------------------------------------------->
+
+Txn2:           [ Start Txn2 ]------------------------[ Commit Txn2 ]
+                   Evolves Schema: S1 to S2
+
+Txn1:    [ Start Txn1 ]------------------------------[ Validate 
]--------------------[ Commit Txn1 ]
+            Uses Schema: S1                                                    
 Writes Schema: S2
+
+Table Schema:
+          [ S1 ]---------------------------------------[ S2 
]---------------------------------[ S2 ]
+```
+Notes:
+- Txn1 starts with schema S1.
+- Txn2 commits before Txn1 validates, evolving the schema to S2.
+- Txn1 validates against schema S2; backward compatibility allows it to 
proceed.
+- Txn1 writes data compatible with S2; commits successfully.
+
+**Scenario 7**
+```
+Time 
---------------------------------------------------------------------------------->
+
+Txn2:           [ Start Txn2 ]------------------------[ Commit Txn2 ]
+                   Evolves Schema: S1 to S2
+
+Txn1:    [ Start Txn1 ]------------------------------[ Validate 
]--------------------[ Commit Txn1 ]
+            Uses Schema: S2                                                    
 Writes Schema: S2
+
+Table Schema:
+          [ S1 ]--------------------------[ S2 
]----------------------------------------[ S2 ]
+```
+Notes:
+- Both Txn1 and Txn2 aim to evolve schema from S1 to S2.
+- Txn2 commits before Txn1 validates, updating schema to S2.
+- Txn1 detects that schema is already S2; no conflict occurs.
+- Txn1 commits successfully.
+
+**Scenario 8**
+```
+Time 
---------------------------------------------------------------------------------->
+
+Txn2:           [ Start Txn2 ]------------------------[ Commit Txn2 ]
+                   Evolves Schema: S1 to S2
+
+Txn1:    [ Start Txn1 ]------------------------------[ Validate ]----X
+            Uses Schema: S3
+
+Table Schema:
+          [ S1 ]--------------------------------------[ S2 
]-------------------------------------[S2]
+```
+Notes:
+- Txn1 intends to evolve schema from S1 to S3.
+- Txn2 commits before Txn1 validates, updating schema to S2.
+- At validation, Txn1 detects schema conflict (S2 vs. S3); transaction fails.

Review Comment:
   > for your first comment - we need to validate start schema to differentiate 
case 2 and case 6. case 6 implicitly relying on the existing schema evolution 
check.
   
   Both case 2 and 6 just validate the write schema against the latest table 
schema, no matter whether the table schema had been evolved during the write 
process. 
   
   Still we should calrify this question: how do we manage the granularity of 
base schemas during a write, it looks like we have a probability of unified 
write schema but varigated base schemas during one commit write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to