yihua commented on code in PR #12005: URL: https://github.com/apache/hudi/pull/12005#discussion_r1794353376
########## rfc/rfc-82/rfc-82.md: ########## @@ -0,0 +1,254 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-82: Concurrency control of schema evolution + +## Proposers + +- @jonvex +- @Davis-Zhang-Onehouse + +## Approvers +- @yihua +- @sivabalan + +## Status + +JIRA: https://issues.apache.org/jira/browse/HUDI-8221 + +## Abstract + +This RFC proposes to enhance Hudi's concurrency control mechanism to handle concurrent schema evolution scenarios. The current implementation does not adequately address conflicts that may arise when multiple transactions attempt to alter the table schema simultaneously. This enhancement aims to detect and prevent such conflicts, ensuring data consistency and integrity in multi-writer environments. + +## Background + +### Hudi Schema +In Apache Hudi, schema management is a critical component that ensures data consistency and facilitates efficient data processing, especially in environments where data structures may evolve over time. + +The schema of the input data represents the structure of incoming records being ingested into Hudi from various sources. During a write operation, the Hudi write client utilizes a write schema, which is typically derived from the input data schema or specified by a schema provider. This write schema is applied throughout the transaction to maintain consistency. + +Additionally, Hudi stores the table schema within the commit metadata on storage, capturing the authoritative schema of the hudi table table at the time of each commit. This stored schema is crucial for readers to correctly interpret the data and for managing schema evolution across different data versions. These schemas are determined through a reconciliation process that considers both the incoming data schema and the existing table schema, allowing Hudi to handle schema changes gracefully. Currently, Hudi uses this schema management approach to enable seamless read and write operations, support upserts and deletes, and manage schema evolution, ensuring that the system remains robust even as the underlying data structures change. + +### Concurrency control over concurrent schema evolution +Hudi supports concurrency control to handle concurrent write operations. However, the existing implementation does not specifically account for schema evolution conflicts. In a multi-writer environment, it's possible for different clients to attempt schema changes concurrently, which can lead to inconsistencies if not properly managed. + +Schema evolution is a critical operation that can significantly impact data compatibility and query results. Uncontrolled concurrent schema changes can result in data inconsistencies, failed reads, or incorrect query results. By extending the concurrency control mechanism to handle schema evolution, we can prevent these issues and ensure a more robust and reliable data management system. + +Depending on if there are concurrent schema evolution transactions, an inflight transaction may see different schema of the table when it checks at different time, there can always be other transactions committed and potentially changed schema as a result of that. + +We use "txn" as a abbreviation of transaction. + +| Scenario | Table Schema when Txn Start | Table Schema when Txn Validates | Schema Used by curr Txn | Should Conflict? | Write Schema used by Commit Metadata of curr Txn | Notes | +|----------|---------------------|--------------------------|-------------------|------------------|---------------------------------------|-------| +| 1 | Not exists | Not exists | S1 | No | S1 | Current txn is the first commit ever, conflict is impossible | +| 2 | Not exists | S1 | S1 | No | S1 | Second commit, no schema evolution | +| 3 | Not exists | S1 | S2 | Yes | N/A (throws exception) | No predefined schema, effectively concurrent schema definition. The design decision made here is to identify this as schema conflict for simplicity. It can also be broken down into cases where (1) S3 is evolvable from S2, (2) S3 is not evolvable from S2, for further optimization. | +| 4 | S1 | S1 | S1 | No | S1 | No schema evolution | +| 5 | S1 | S1 | S2 | No | S2 | Schema evolution in current transaction | +| 6 | S1 | S2 | S1 | No | S2 | Backwards compatibility handles it | +| 7 | S1 | S2 | S2 | No | S2 | Concurrent evolution to same schema | +| 8 | S1 | S2 | S3 | Yes | N/A (throws exception) | Concurrent evolution to different schemas. Same as case 2, for future optimization we should consider evolvibility from S2 to S3 | + +For timeline graph of each case please refer appendix. + +Notes: +- S1, S2, S3 represent different schemas. The proposed solution does not assume compability/evolvbility among those schemas. +- 3 schemas to consider: + + The table schema from the last committed txn when the current txn starts. + + The table schema from the last committed txn when the current txn validates. + + The table schema that the current txn uses. +- "Not exists" means the table is empty and table schema is not set. + +## Implementation + +The proposed implementation involves the following key changes: + +1. Enhance the `TransactionUtils` class to include schema conflict detection: + - Add a new method `abortTxnOnConcurrentSchemaEvolution` to check for schema conflicts. + +2. Schema conflict detection logic: + - Follow the graph as explained above. + +## Rollout/Adoption Plan + +1. Impact on existing users: + - Users leveraging multi-writer capabilities with schema evolution will benefit from improved consistency. + - No breaking changes for users not using concurrent schema evolution. + +2. Phasing out older behavior: + - The code is not protected by a feature flag, once it is merged the behavior is adopted in the next release. + +3. Migration: + - No special migration tools are required. + - Users should update to the latest Hudi version to benefit from this enhancement. + + +## Test Plan + +The implementation includes comprehensive test cases in the `TestHoodieClientMultiWriter`: + +1. Test scenarios: + - Covered by the table in the previous section. + +2. Test variations: + - Both COPY_ON_WRITE and MERGE_ON_READ table types + +# Appendix + +## Timeline of concurrency schema evolution case + +**Scenario 1** + + +Should Conflict?: No +``` +Time ----------------------------------------------------------------------------------> + +Txn1: [ Start Txn1 ]------------------------------[ Validate ]--------------------[ Commit Txn1 ] + Uses Schema: S1 Writes Schema: S1 + +Table Schema: + [ Not Exists ]----------------------------[ Not Exists ]--------------------[ S1 ] +``` +Notes: +- Txn1 is the first transaction; it creates the table with schema S1. +- No conflicts occur since there are no other transactions. + + +**Scenario 2** + +``` +Time ----------------------------------------------------------------------------------> + +Txn1: [ Start Txn1 ]------------------------[ Commit Txn1 ] + Creates Schema: S1 + +Txn2: [ Start Txn2 ]------------------------[ Validate ]--------------------[ Commit Txn2 ] + Uses Schema: S1 Writes Schema: S1 + +Table Schema: + [ Not Exists ]-----------------------[ S1 ]----------------------------------[ S1 ] +``` +Notes: +- Txn2 starts when the table does not exist. +- Txn1 commits before Txn2 validates, creating schema S1. +- Txn2 validates against schema S1; no conflict occurs. + +**Scenario 3** +``` +Time ----------------------------------------------------------------------------------> + +Txn1: [ Start Txn1 ]------------------------[ Commit Txn1 ] + Creates Schema: S1 + +Txn2: [ Start Txn2 ]------------------------[ Validate ]----X + Uses Schema: S2 + +Table Schema: + [ Not Exists ]-----------------------[ S1 ]---------------------------------------- +``` +Notes: +- Txn2 starts when the table does not exist, intending to use schema S2. +- Txn1 commits before Txn2 validates, creating schema S1. +- At validation, Txn2 detects schema conflict (S1 vs. S2); transaction fails. + +A future improvement is to check the compatibility of s1 s2 and reconcile properly. + +**Scenario 4** +``` +Time ----------------------------------------------------------------------------------> + +Txn1: [ Start Txn1 ]------------------------------[ Validate ]--------------------[ Commit Txn1 ] + Uses Schema: S1 Writes Schema: S1 + +Table Schema: + [ S1 ]------------------------------------[ S1 ]----------------------------[ S1 ] +``` +Notes: +- Txn1 operates entirely under schema S1. +- No schema changes occur; no conflicts arise. + +**Scenario 5** +``` +Time ----------------------------------------------------------------------------------> + +Txn1: [ Start Txn1 ]------------------------------[ Validate ]--------------------[ Commit Txn1 ] + Uses Schema: S2 Writes Schema: S2 + +Table Schema: + [ S1 ]------------------------------------[ S1 ]----------------------------[ S2 ] +``` +Notes: +- Txn1 starts with schema S1 and evolves it to S2 within the transaction. +- No other transactions interfere; no conflicts occur. + +**Scenario 6** +``` +Time ----------------------------------------------------------------------------------> + +Txn2: [ Start Txn2 ]------------------------[ Commit Txn2 ] + Evolves Schema: S1 to S2 + +Txn1: [ Start Txn1 ]------------------------------[ Validate ]--------------------[ Commit Txn1 ] + Uses Schema: S1 Writes Schema: S2 + +Table Schema: + [ S1 ]---------------------------------------[ S2 ]---------------------------------[ S2 ] +``` +Notes: +- Txn1 starts with schema S1. +- Txn2 commits before Txn1 validates, evolving the schema to S2. +- Txn1 validates against schema S2; backward compatibility allows it to proceed. +- Txn1 writes data compatible with S2; commits successfully. + +**Scenario 7** +``` +Time ----------------------------------------------------------------------------------> + +Txn2: [ Start Txn2 ]------------------------[ Commit Txn2 ] + Evolves Schema: S1 to S2 + +Txn1: [ Start Txn1 ]------------------------------[ Validate ]--------------------[ Commit Txn1 ] + Uses Schema: S2 Writes Schema: S2 + +Table Schema: + [ S1 ]--------------------------[ S2 ]----------------------------------------[ S2 ] +``` +Notes: +- Both Txn1 and Txn2 aim to evolve schema from S1 to S2. +- Txn2 commits before Txn1 validates, updating schema to S2. +- Txn1 detects that schema is already S2; no conflict occurs. +- Txn1 commits successfully. + +**Scenario 8** +``` +Time ----------------------------------------------------------------------------------> + +Txn2: [ Start Txn2 ]------------------------[ Commit Txn2 ] + Evolves Schema: S1 to S2 + +Txn1: [ Start Txn1 ]------------------------------[ Validate ]----X + Uses Schema: S3 + +Table Schema: + [ S1 ]--------------------------------------[ S2 ]-------------------------------------[S2] +``` +Notes: +- Txn1 intends to evolve schema from S1 to S3. +- Txn2 commits before Txn1 validates, updating schema to S2. +- At validation, Txn1 detects schema conflict (S2 vs. S3); transaction fails. Review Comment: @Davis-Zhang-Onehouse could you create a follow-up Jira ticket if needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
