vinothchandar commented on code in PR #18276:
URL: https://github.com/apache/hudi/pull/18276#discussion_r2943550828


##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+        v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister + 
CreatableRelationProvider)
+        v
+Spark treats as V1 source for writes
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+        v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+        v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+        v
+Spark treats as V1 DataSource
+        v
+DefaultSource.createRelation(...)
+        v
+MergeOnReadSnapshotRelation / BaseRelation
+        v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+        v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+        v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+        v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+        v
+HoodieDataSourceV2.getTable(...)
+        v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+        v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter 
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+        v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+        v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...);   -- table created with USING hudi
+        v
+Spark Analyzer resolves table via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+isHoodieTable => true, v2ReadEnabled = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+        v
+V1Write -> InsertableRelation.insert(data, overwrite)
+        v
+Align columns (rename + cast to table's user schema)
+        v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+        v
+RETURNS: HoodieInternalV2Table(...).v1TableWrapper = V1Table(catalogTable)
+        v
+Spark uses V1 fallback -> DefaultSource.createRelation()
+        v
+HoodieFileIndex -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+SELECT * FROM hudi_table;   -- table created with USING hudi
+        v
+Spark Analyzer resolves table name via catalog
+        v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+        v
+super.loadTable(ident)
+        v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+        v
+isHoodieTable(catalogTable) => true
+        v
+v2ReadEnabled = conf("hoodie.datasource.read.use.v2") = true
+        v
+RETURNS: HoodieSparkV2Table(...)
+        v
+SupportsRead.newScanBuilder() -> HoodieScanBuilder
+        v
+HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
 
 ### Read
 
-<!-- main part -->
+All new classes go into package `org.apache.spark.sql.hudi.v2` inside 
`hudi-spark-common`.
+
+| Class | Spark Interface | Responsibility |
+|-------|-----------------|----------------|
+| `HoodieDataSourceV2` | `TableProvider`, `DataSourceRegister`, 
`CreatableRelationProvider` | SPI entry point for `format("hudi_v2")`. 
`CreatableRelationProvider` enables DataFrame API writes via 
`df.write.format("hudi_v2")`. |
+| `HoodieSparkV2Table` | `Table`, `SupportsRead`, `SupportsWrite`, 
`V2TableWithV1Fallback` | Routes reads to DSv2, writes to DSv1 fallback via 
`HoodieV1WriteBuilder`. |
+| `HoodieScanBuilder` | `ScanBuilder`, `SupportsPushDownFilters`, 
`SupportsPushDownRequiredColumns` | Collects filter and column pruning 
pushdowns. |
+| `HoodieBatchScan` | `Scan`, `Batch` | Plans input partitions using existing 
`HoodieFileIndex`. |
+| `HoodieInputPartition` | `InputPartition` | Serializable descriptor for file 
slices. |
+| `HoodiePartitionReaderFactory` | `PartitionReaderFactory` | Creates readers 
on executors. |
+| `HoodiePartitionReader` | `PartitionReader[InternalRow]` | Delegates to 
existing file-reading code from `HoodieBaseRelation`. |
+| `HoodieV1WriteBuilder` (reused) | `SupportsTruncate`, `SupportsOverwrite`, 
`ProvidesHoodieConfig` | Existing V1 write fallback builder from 
`HoodieInternalV2Table`, reused by `HoodieSparkV2Table`. |
 
 ### Table services
 
-<!-- with read substages -->
+Table services (compaction, clustering, cleaning) are not affected by this 
change.
+They operate via the write client and are triggered independently of the read 
path.
+
+### Implementation phases
+
+1. **Coexistence POC.** All new classes return empty read results, SPI 
registration, reuse of `HoodieV1WriteBuilder` for V1 write fallback, 
`hoodie.datasource.read.use.v2` config, 
+`HoodieV1OrV2Table` extractor update in `HoodieSparkBaseAnalysis` to recognize 
`HoodieSparkV2Table` for DDL operations.
+2. **COW snapshot read.** Wire `HoodieBatchScan.planInputPartitions()` to 
`HoodieFileIndex`, implement base file reading in `HoodiePartitionReader`. 
Column pruning support.
+3. **Filter pushdown.** Implement `HoodieScanBuilder.pushFilters()` for 
partition pruning and data skipping via `HoodieFileIndex`.
+4. **MOR snapshot read.** Extend `HoodiePartitionReader` with base + log merge 
logic, reusing `HoodieFileGroupReader`.
+5. **Incremental and CDC queries.** Route based on query type option in 
`HoodieScanBuilder`.
+6. **Advanced pushdowns.** `SupportsPushDownAggregates`, 
`SupportsPushDownLimit`, `SupportsPushDownTopN`.
 
 ## Rollout/Adoption Plan
 
-<!-- 
-    - rollback of some changes in HUDI-4178
-    - check performance before and after, find what actually degrade when we 
use V1 workaround
-    - implement absent V2 API functionality for read
-    - benchmark again
--->
+- The existing `format("hudi")` path is completely untouched, so there is no 
regression risk.
+- For DataFrame API, users opt in by using `format("hudi_v2")`. No config 
needed.
+- For SQL queries, users set `hoodie.datasource.read.use.v2=true` to route 
reads through DSv2.
+- Rollback: switch back to `format("hudi")` or set the config to `false`.
 
 ## Test Plan
 
-<!-- It's important to agree on consistent benchmarks to evaluate changes step 
by step -->
+- Verify that `EXPLAIN` plans show `BatchScanExec` (DSv2) instead of 
`FileSourceScanExec` (DSv1) when DSv2 is enabled.
+- Existing unit and functional tests must pass unchanged (no regressions in 
DSv1 path).
+- New tests for DSv2 read path: COW snapshot, MOR snapshot, filter pushdown, 
column pruning.
+- TPC-H benchmark to compare DSv1 vs DSv2 read performance at each 
implementation phase.

Review Comment:
   can we get a sense for this early, without taking on all of the 
implementation work?



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.

Review Comment:
   we could later switch `hudi` to `hudi_v2` transparently?



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +53,240 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The approach is hybrid: DSv2 for reads, DSv1 fallback for writes 
(`V2TableWithV1Fallback`).
+
+Overall proposed architecture for this hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name `"hudi_v2"` activates the DSv2 path for reading using 
Spark DataFrame API. 
+The existing `"hudi"` path remains unchanged.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+        v
+BaseDefaultSource (V1) -> DefaultSource
+        v
+CreatableRelationProvider.createRelation(...)
+        v
+HoodieSparkSqlWriter.write(...)
+        v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)

Review Comment:
   I guess the issue is : the datasource is picked up by the SPI or short name? 
   
   IMO: this is acceptable in a first release, where we keep `hudi_v2` 
experimental. This is a large change, we cannot switch to v2 in one go. 



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -37,7 +39,7 @@ Adopting the V2 API is essential for enhanced control over 
the data source, deep
 
 First steps towards integrating of Spark Datasource V2 were taken in 
[RFC-38](../rfc-38/rfc-38.md). 
 However, there are multiple issues with advertising Hudi table as V2 without 
actual implementing certain API, and with using custom relation rule to fall 
back to V1 API.
-As a result, the current implementation of `HoodieCatalog` and 
`Spark3DefaultSource` returns a `V1Table` instead of `HoodieInternalV2Table`, 
+As a result, the current implementation of `HoodieCatalog` and 
`BaseDefaultSource` returns a `V1Table` instead of `HoodieInternalV2Table`,

Review Comment:
   Can we move beyond general arguments like v2 > v1, more modern etc.. and 
clearly spell out for users, what exactly moving to v2 on reads buys them?



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.

Review Comment:
   ```suggestion
   DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic 
for this, and moving to this entirely would be a non-starter. Also, due to the 
flexibility of the V1 API in terms of allowing the writes to shuffle data after 
the `df.write.format....save` is invoked, Hudi supports a streaming DF write 
for its upsert operation. A good majority of Hudi jobs work this way today, and 
we cannot break all of these at once
   ```



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.

Review Comment:
   I love this



##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +54,260 @@ The current implementation of Spark Datasource V2 
integration is presented in th
 
 ## Implementation
 
-<!--  -->
+The main problem is that Hudi's write path involves indexing, precombining, 
upsert/insert routing, file sizing, and table services 
(compaction/clustering/cleaning). 
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition 
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for 
this, and moving to this entirely would be high risk.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for 
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or 
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future 
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the 
following schema:
+
+![Proposed approach with hybrid V1 write and V2 
read](integration_with_DSv2_read.jpg)
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the 
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be 
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write 
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>

Review Comment:
   can we please markdown format this cleanly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to