yihua commented on code in PR #18276:
URL: https://github.com/apache/hudi/pull/18276#discussion_r3042937052
##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +49,294 @@ The current implementation of Spark Datasource V2
integration is presented in th
## Implementation
-<!-- -->
+Hudi's write path is mature, and involves indexing, precombining,
upsert/insert routing, file sizing, and table services
(compaction/clustering/cleaning).
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for
this, and moving to this entirely would be a non-starter. Also, due to the
flexibility of the V1 API in terms of allowing the writes to shuffle data after
the `df.write.format....save` is invoked, Hudi supports a streaming DF write
for its upsert operation. A good majority of Hudi jobs work this way today, and
we cannot break all of these at once.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the
following schema:
+
+
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+ v
+BaseDefaultSource (V1) -> DefaultSource
+ v
+CreatableRelationProvider.createRelation(...)
+ v
+HoodieSparkSqlWriter.write(...)
+ v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+ v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister +
CreatableRelationProvider)
+ v
+Spark treats as V1 source for writes
+ v
+CreatableRelationProvider.createRelation(...)
+ v
+HoodieSparkSqlWriter.write(...)
+ v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+ v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+ v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+ v
+Spark treats as V1 DataSource
+ v
+DefaultSource.createRelation(...)
+ v
+MergeOnReadSnapshotRelation / BaseRelation
+ v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+ v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+ v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+ v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+ v
+HoodieDataSourceV2.getTable(...)
+ v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+ v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...); -- table created with USING hudi
+ v
+Spark Analyzer resolves table via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+ v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+ v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+ v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...); -- table created with USING hudi
+ v
+Spark Analyzer resolves table via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+isHoodieTable => true, v2ReadEnabled = true
+ v
+RETURNS: HoodieSparkV2Table(...)
+ v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+ v
+V1Write -> InsertableRelation.insert(data, overwrite)
+ v
+Align columns (rename + cast to table's user schema)
+ v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table; -- table created with USING hudi
+ v
+Spark Analyzer resolves table name via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+super.loadTable(ident)
+ v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+ v
+isHoodieTable(catalogTable) => true
+ v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+ v
+RETURNS: HoodieInternalV2Table(...).v1TableWrapper = V1Table(catalogTable)
+ v
+Spark uses V1 fallback -> DefaultSource.createRelation()
+ v
+HoodieFileIndex -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+SELECT * FROM hudi_table; -- table created with USING hudi
+ v
+Spark Analyzer resolves table name via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+super.loadTable(ident)
+ v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+ v
+isHoodieTable(catalogTable) => true
+ v
+v2ReadEnabled = conf("hoodie.datasource.read.use.v2") = true
+ v
+RETURNS: HoodieSparkV2Table(...)
+ v
+SupportsRead.newScanBuilder() -> HoodieScanBuilder
+ v
+HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
### Read
-<!-- main part -->
+All new classes go into package `org.apache.spark.sql.hudi.v2` inside
`hudi-spark-common`.
+
+| Class | Spark Interface
| Responsibility
|
+|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `HoodieDataSourceV2` | `TableProvider`, `DataSourceRegister`,
`CreatableRelationProvider`
| SPI entry point for `format("hudi_v2")`.
`CreatableRelationProvider` enables DataFrame API writes via
`df.write.format("hudi_v2")`.
|
+| `HoodieSparkV2Table` | `Table`, `SupportsRead`, `SupportsWrite`,
`V2TableWithV1Fallback`
| Routes reads to DSv2, writes to DSv1 fallback via
`HoodieV1WriteBuilder`.
|
+| `HoodieScanBuilder` | `ScanBuilder`, `SupportsPushDownFilters`,
`SupportsPushDownRequiredColumns`, `PartialLimitPushDown`,
`SupportsPushDownAggregates` | Collects filter, column pruning, limit, and
aggregate pushdowns.
|
+| `PartialLimitPushDown` | extends `SupportsPushDownLimit`
| Custom Hudi Java interface providing `isPartiallyPushed() = true` as a
default method. Avoids Scala `override` incompatibility between Spark 3.3
(method absent) and 3.4+ (default method present).
|
+| `HoodieBatchScan` | `Scan`, `Batch`
| Plans input partitions using existing `HoodieFileIndex`.
|
+| `HoodieInputPartition` | `InputPartition`
| Serializable descriptor for file slices.
|
+| `HoodiePartitionReaderFactory` | `PartitionReaderFactory`
| Creates readers on executors. Overrides `supportColumnarReads()` and
`createColumnarReader()` for COW vectorized reads.
|
+| `HoodiePartitionReader` | `PartitionReader[InternalRow]`
| Row-based reader for MOR, incremental, CDC, and COW fallback
(unsupported schema).
|
+| `HoodieColumnarPartitionReader` | `PartitionReader[ColumnarBatch]`
| Columnar reader for COW base files. Returns vectorized Parquet batches
directly to Spark.
|
+| `HoodieV1WriteBuilder` (reused) | `SupportsTruncate`, `SupportsOverwrite`,
`ProvidesHoodieConfig`
| Existing V1 write fallback builder, defined as `private[hudi]` in
`HoodieInternalV2Table.scala`. `HoodieSparkV2Table` directly instantiates it
(sibling class, not a subclass of `HoodieInternalV2Table`).
`HoodieInternalV2Table` is retained for the schema-evolution code path. |
### Table services
-<!-- with read substages -->
+Table services (compaction, clustering, cleaning) are not affected by this
change.
+They operate via the write client and are triggered independently of the read
path.
+
+### Implementation phases
+
+The phases below describe the logical design ordering.
+In practice, `HoodieScanBuilder` declares all pushdown interfaces from the
outset with working implementations, and the PRs may ship multiple phases
together.
+
+1. **Coexistence POC.** All new classes return empty read results, SPI
registration, reuse of `HoodieV1WriteBuilder` for V1 write fallback,
`hoodie.datasource.read.use.v2` config,
+`HoodieV1OrV2Table` extractor update in `HoodieSparkBaseAnalysis` to recognize
`HoodieSparkV2Table` for DDL operations.
+2. **COW snapshot read.** Wire `HoodieBatchScan.planInputPartitions()` to
`HoodieFileIndex`, implement base file reading in `HoodiePartitionReader`.
Column pruning support.
+3. **Filter pushdown.** Implement `HoodieScanBuilder.pushFilters()` for
partition pruning and data skipping via `HoodieFileIndex`.
+4. **Vectorized COW reads.** Enable columnar batch output for COW snapshot
reads to match V1 performance.
+5. **MOR snapshot read.** Extend `HoodiePartitionReader` with base + log merge
logic, reusing `HoodieFileGroupReader`.
+6. **Incremental and CDC queries.** Route based on query type option in
`HoodieScanBuilder`.
+7. **Advanced pushdowns.** `SupportsPushDownAggregates`,
`SupportsPushDownLimit`, `SupportsPushDownTopN`.
## Rollout/Adoption Plan
-<!--
- - rollback of some changes in HUDI-4178
- - check performance before and after, find what actually degrade when we
use V1 workaround
- - implement absent V2 API functionality for read
- - benchmark again
--->
+- The existing `format("hudi")` path is completely untouched, so regression
risk is minimized.
+- For DataFrame API, users opt in by using `format("hudi_v2")`. No config
needed.
+- For SQL queries, users set `hoodie.datasource.read.use.v2=true` to route
reads through DSv2.
+- Rollback: switch back to `format("hudi")` or set the config to `false`.
+
+### Config interaction: `hoodie.datasource.read.use.v2` vs
`hoodie.schema.on.read.enable`
+
+In `HoodieCatalog.loadTable()`, `v2ReadEnabled` is evaluated first and takes
strict precedence:
+
+| `hoodie.datasource.read.use.v2` | `hoodie.schema.on.read.enable` | Table
returned |
+|---------------------------------|--------------------------------|----------------------------------------------------------|
+| `true` | any |
`HoodieSparkV2Table` (DSv2 read) |
+| `false` | `true` |
`HoodieInternalV2Table` (existing schema-evolution path) |
+| `false` | `false` | `V1Table`
wrapper (existing default) |
+
+The two configs are independent. When both are `true`, `v2ReadEnabled` wins.
## Test Plan
-<!-- It's important to agree on consistent benchmarks to evaluate changes step
by step -->
+- Verify that `EXPLAIN` plans show `BatchScanExec` (DSv2) instead of
`FileSourceScanExec` (DSv1) when DSv2 is enabled.
+- Existing unit and functional tests must pass unchanged (no regressions in
DSv1 path).
+- New tests for DSv2 read path: COW snapshot, MOR snapshot, filter pushdown,
column pruning.
+- TPC-H benchmark to compare DSv1 vs DSv2 read performance at each
implementation phase.
+ Success criteria:
+ - DSv2 COW snapshot full data read should show no regression versus DSv1.
+ - DSv2 COW snapshot read with projections and filter pushdowns should show
10% faster wall-clock time.
+ - DSv2 COW snapshot read with limit and aggregate pushdowns should show
20% faster wall-clock time.
+ - MOR benchmarks should show no regression versus DSv1's row-based MOR
path.
+
+## Future Work
+
+1. DSv2 read support using `hudi_v2` for the DataFrame API, and
`hoodie.datasource.read.use.v2` for the SQL API (`false` by default).
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **Grammar: "These means" → "This means"**
"These" is a plural demonstrative, but the subject is a singular completion
milestone.
```suggestion
This means that all stages from "Implementation phases" chapter are
completed.
```
— *Greptile*
([original](https://github.com/yihua/hudi/pull/22#discussion_r3040256798))
(source:comment#3040256798)
##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +49,294 @@ The current implementation of Spark Datasource V2
integration is presented in th
## Implementation
-<!-- -->
+Hudi's write path is mature, and involves indexing, precombining,
upsert/insert routing, file sizing, and table services
(compaction/clustering/cleaning).
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for
this, and moving to this entirely would be a non-starter. Also, due to the
flexibility of the V1 API in terms of allowing the writes to shuffle data after
the `df.write.format....save` is invoked, Hudi supports a streaming DF write
for its upsert operation. A good majority of Hudi jobs work this way today, and
we cannot break all of these at once.
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **Missing terminal period**
The sentence ending with "...we cannot break all of these at once" is
missing a full stop.
```suggestion
DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic
for this, and moving to this entirely would be a non-starter. Also, due to the
flexibility of the V1 API in terms of allowing the writes to shuffle data after
the `df.write.format....save` is invoked, Hudi supports a streaming DF write
for its upsert operation. A good majority of Hudi jobs work this way today, and
we cannot break all of these at once.
```
— *Greptile*
([original](https://github.com/yihua/hudi/pull/22#discussion_r3040256875))
(source:comment#3040256875)
##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +49,294 @@ The current implementation of Spark Datasource V2
integration is presented in th
## Implementation
-<!-- -->
+Hudi's write path is mature, and involves indexing, precombining,
upsert/insert routing, file sizing, and table services
(compaction/clustering/cleaning).
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for
this, and moving to this entirely would be a non-starter. Also, due to the
flexibility of the V1 API in terms of allowing the writes to shuffle data after
the `df.write.format....save` is invoked, Hudi supports a streaming DF write
for its upsert operation. A good majority of Hudi jobs work this way today, and
we cannot break all of these at once.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the
following schema:
+
+
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+ v
+BaseDefaultSource (V1) -> DefaultSource
+ v
+CreatableRelationProvider.createRelation(...)
+ v
+HoodieSparkSqlWriter.write(...)
+ v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+ v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister +
CreatableRelationProvider)
+ v
+Spark treats as V1 source for writes
+ v
+CreatableRelationProvider.createRelation(...)
+ v
+HoodieSparkSqlWriter.write(...)
+ v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+ v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+ v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+ v
+Spark treats as V1 DataSource
+ v
+DefaultSource.createRelation(...)
+ v
+MergeOnReadSnapshotRelation / BaseRelation
+ v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+ v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+ v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+ v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+ v
+HoodieDataSourceV2.getTable(...)
+ v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+ v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...); -- table created with USING hudi
+ v
+Spark Analyzer resolves table via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+ v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+ v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+ v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...); -- table created with USING hudi
+ v
+Spark Analyzer resolves table via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+isHoodieTable => true, v2ReadEnabled = true
+ v
+RETURNS: HoodieSparkV2Table(...)
+ v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+ v
+V1Write -> InsertableRelation.insert(data, overwrite)
+ v
+Align columns (rename + cast to table's user schema)
+ v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table; -- table created with USING hudi
+ v
+Spark Analyzer resolves table name via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+super.loadTable(ident)
+ v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+ v
+isHoodieTable(catalogTable) => true
+ v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+ v
+RETURNS: HoodieInternalV2Table(...).v1TableWrapper = V1Table(catalogTable)
+ v
+Spark uses V1 fallback -> DefaultSource.createRelation()
+ v
+HoodieFileIndex -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+SELECT * FROM hudi_table; -- table created with USING hudi
+ v
+Spark Analyzer resolves table name via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+super.loadTable(ident)
+ v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+ v
+isHoodieTable(catalogTable) => true
+ v
+v2ReadEnabled = conf("hoodie.datasource.read.use.v2") = true
+ v
+RETURNS: HoodieSparkV2Table(...)
+ v
+SupportsRead.newScanBuilder() -> HoodieScanBuilder
+ v
+HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
### Read
-<!-- main part -->
+All new classes go into package `org.apache.spark.sql.hudi.v2` inside
`hudi-spark-common`.
+
+| Class | Spark Interface
| Responsibility
|
+|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `HoodieDataSourceV2` | `TableProvider`, `DataSourceRegister`,
`CreatableRelationProvider`
| SPI entry point for `format("hudi_v2")`.
`CreatableRelationProvider` enables DataFrame API writes via
`df.write.format("hudi_v2")`.
|
+| `HoodieSparkV2Table` | `Table`, `SupportsRead`, `SupportsWrite`,
`V2TableWithV1Fallback`
| Routes reads to DSv2, writes to DSv1 fallback via
`HoodieV1WriteBuilder`.
|
+| `HoodieScanBuilder` | `ScanBuilder`, `SupportsPushDownFilters`,
`SupportsPushDownRequiredColumns`, `PartialLimitPushDown`,
`SupportsPushDownAggregates` | Collects filter, column pruning, limit, and
aggregate pushdowns.
|
+| `PartialLimitPushDown` | extends `SupportsPushDownLimit`
| Custom Hudi Java interface providing `isPartiallyPushed() = true` as a
default method. Avoids Scala `override` incompatibility between Spark 3.3
(method absent) and 3.4+ (default method present).
|
+| `HoodieBatchScan` | `Scan`, `Batch`
| Plans input partitions using existing `HoodieFileIndex`.
|
+| `HoodieInputPartition` | `InputPartition`
| Serializable descriptor for file slices.
|
+| `HoodiePartitionReaderFactory` | `PartitionReaderFactory`
| Creates readers on executors. Overrides `supportColumnarReads()` and
`createColumnarReader()` for COW vectorized reads.
|
+| `HoodiePartitionReader` | `PartitionReader[InternalRow]`
| Row-based reader for MOR, incremental, CDC, and COW fallback
(unsupported schema).
|
+| `HoodieColumnarPartitionReader` | `PartitionReader[ColumnarBatch]`
| Columnar reader for COW base files. Returns vectorized Parquet batches
directly to Spark.
|
+| `HoodieV1WriteBuilder` (reused) | `SupportsTruncate`, `SupportsOverwrite`,
`ProvidesHoodieConfig`
| Existing V1 write fallback builder, defined as `private[hudi]` in
`HoodieInternalV2Table.scala`. `HoodieSparkV2Table` directly instantiates it
(sibling class, not a subclass of `HoodieInternalV2Table`).
`HoodieInternalV2Table` is retained for the schema-evolution code path. |
### Table services
-<!-- with read substages -->
+Table services (compaction, clustering, cleaning) are not affected by this
change.
+They operate via the write client and are triggered independently of the read
path.
+
+### Implementation phases
+
+The phases below describe the logical design ordering.
+In practice, `HoodieScanBuilder` declares all pushdown interfaces from the
outset with working implementations, and the PRs may ship multiple phases
together.
+
+1. **Coexistence POC.** All new classes return empty read results, SPI
registration, reuse of `HoodieV1WriteBuilder` for V1 write fallback,
`hoodie.datasource.read.use.v2` config,
+`HoodieV1OrV2Table` extractor update in `HoodieSparkBaseAnalysis` to recognize
`HoodieSparkV2Table` for DDL operations.
+2. **COW snapshot read.** Wire `HoodieBatchScan.planInputPartitions()` to
`HoodieFileIndex`, implement base file reading in `HoodiePartitionReader`.
Column pruning support.
+3. **Filter pushdown.** Implement `HoodieScanBuilder.pushFilters()` for
partition pruning and data skipping via `HoodieFileIndex`.
+4. **Vectorized COW reads.** Enable columnar batch output for COW snapshot
reads to match V1 performance.
+5. **MOR snapshot read.** Extend `HoodiePartitionReader` with base + log merge
logic, reusing `HoodieFileGroupReader`.
+6. **Incremental and CDC queries.** Route based on query type option in
`HoodieScanBuilder`.
+7. **Advanced pushdowns.** `SupportsPushDownAggregates`,
`SupportsPushDownLimit`, `SupportsPushDownTopN`.
## Rollout/Adoption Plan
-<!--
- - rollback of some changes in HUDI-4178
- - check performance before and after, find what actually degrade when we
use V1 workaround
- - implement absent V2 API functionality for read
- - benchmark again
--->
+- The existing `format("hudi")` path is completely untouched, so regression
risk is minimized.
+- For DataFrame API, users opt in by using `format("hudi_v2")`. No config
needed.
+- For SQL queries, users set `hoodie.datasource.read.use.v2=true` to route
reads through DSv2.
+- Rollback: switch back to `format("hudi")` or set the config to `false`.
+
+### Config interaction: `hoodie.datasource.read.use.v2` vs
`hoodie.schema.on.read.enable`
+
+In `HoodieCatalog.loadTable()`, `v2ReadEnabled` is evaluated first and takes
strict precedence:
+
+| `hoodie.datasource.read.use.v2` | `hoodie.schema.on.read.enable` | Table
returned |
+|---------------------------------|--------------------------------|----------------------------------------------------------|
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **Spark ≥ 3.5 version floor not explained**
The config table states that `hoodie.datasource.read.use.v2 = true` requires
Spark ≥ 3.5, but the document never explains *why* this version floor exists.
Readers need to know which specific DSv2 interface or API contract was
introduced or stabilised in Spark 3.5 that makes earlier versions unsupported.
Consider adding a short note in the **Implementation** or **Rollout/Adoption
Plan** section. Which specific Spark 3.5 API or behavioral change necessitates
the Spark ≥ 3.5 requirement for DSv2 read support?
— *Greptile*
([original](https://github.com/yihua/hudi/pull/22#discussion_r3040256967))
(source:comment#3040256967)
##########
rfc/rfc-98/rfc-98.md:
##########
@@ -52,25 +49,294 @@ The current implementation of Spark Datasource V2
integration is presented in th
## Implementation
-<!-- -->
+Hudi's write path is mature, and involves indexing, precombining,
upsert/insert routing, file sizing, and table services
(compaction/clustering/cleaning).
+Also `HoodieSparkSqlWriter::write` handles schema evolution, partition
encoding, metadata updates, and multi-writer concurrency.
+DSv2's `WriteBuilder` >> `BatchWrite` >> DataWriter API is too simplistic for
this, and moving to this entirely would be a non-starter. Also, due to the
flexibility of the V1 API in terms of allowing the writes to shuffle data after
the `df.write.format....save` is invoked, Hudi supports a streaming DF write
for its upsert operation. A good majority of Hudi jobs work this way today, and
we cannot break all of these at once.
+
+The proposed approach is hybrid: DSv2 for reads, with a DSv1 fallback for
writes (`V2TableWithV1Fallback`) in the current state.
+Later, if a DSv2 write path can be implemented without loss of performance or
functionality, it may become possible to move to full DSv2 support.
+However, this migration should still be incremental, please check the "Future
Work" chapter for details.
+
+Overall proposed architecture for the hybrid approach is shown in the
following schema:
+
+
+
+### DataFrame API
+
+A new SPI short name, `"hudi_v2"`, activates the DSv2 read path when using the
Spark DataFrame API.
+The existing `"hudi"` path remains unchanged.
+This is done to unblock incremental development of the DSv2 path and will be
removed in the long term, please check the "Future Work" chapter for details.
+It also allows switching later from the current DSv1 fallback to a DSv2 write
path, if an implementation without performance degradation is found.
+The DSv2 write path is currently under research.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+df.write.format("hudi").mode(...).save(path)
+ v
+BaseDefaultSource (V1) -> DefaultSource
+ v
+CreatableRelationProvider.createRelation(...)
+ v
+HoodieSparkSqlWriter.write(...)
+ v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+<td>
+<pre>
+df.write.format("hudi_v2").mode(...).save(path)
+ v
+HoodieDataSourceV2 (TableProvider + DataSourceRegister +
CreatableRelationProvider)
+ v
+Spark treats as V1 source for writes
+ v
+CreatableRelationProvider.createRelation(...)
+ v
+HoodieSparkSqlWriter.write(...)
+ v
+SparkRDDWriteClient -> upsert/insert/bulk_insert
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+spark.read.format("hudi").load(path)
+ v
+V1 DataSource resolution (via ServiceLoader + DataSourceRegister)
+ v
+BaseDefaultSource found
+(extends DefaultSource with DataSourceRegister)
+(not a TableProvider)
+ v
+Spark treats as V1 DataSource
+ v
+DefaultSource.createRelation(...)
+ v
+MergeOnReadSnapshotRelation / BaseRelation
+ v
+LogicalRelation -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+spark.read.format("hudi_v2").load(path)
+ v
+DataSourceV2Utils.lookupProvider("hudi_v2")
+ v
+HoodieDataSourceV2 found
+(extends TableProvider with DataSourceRegister)
+(does not extend SupportsCatalogOptions)
+ v
+Spark uses TableProvider.getTable() directly
+(no catalog routing since no SupportsCatalogOptions)
+ v
+HoodieDataSourceV2.getTable(...)
+ v
+HoodieSparkV2Table(...)
+(no catalogTable, no tableIdentifier)
+ v
+HoodieScanBuilder -> HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
+
+### SQL Queries
+
+Spark SQL API is managed by new configuration parameter
`hoodie.datasource.read.use.v2`, which controls the returned table type.
+
+<table>
+<tr>
+<th>Operation</th>
+<th>Current implementation</th>
+<th>Additional functionality proposed in this RFC</th>
+</tr>
+<tr>
+<td>Write</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...); -- table created with USING hudi
+ v
+Spark Analyzer resolves table via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+isHoodieTable => true, v2ReadEnabled = false, schemaEvol = false
+ v
+RETURNS: V1Table(catalogTable) via v1TableWrapper
+ v
+Spark V1 write path -> InsertIntoHoodieTableCommand (analysis rule)
+ v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+<td>
+<pre>
+INSERT INTO hudi_table VALUES (...); -- table created with USING hudi
+ v
+Spark Analyzer resolves table via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+isHoodieTable => true, v2ReadEnabled = true
+ v
+RETURNS: HoodieSparkV2Table(...)
+ v
+SupportsWrite.newWriteBuilder() -> HoodieV1WriteBuilder
+ v
+V1Write -> InsertableRelation.insert(data, overwrite)
+ v
+Align columns (rename + cast to table's user schema)
+ v
+HoodieSparkSqlWriter.write(...)
+</pre>
+</td>
+</tr>
+<tr>
+<td>Read</td>
+<td>
+<pre>
+SELECT * FROM hudi_table; -- table created with USING hudi
+ v
+Spark Analyzer resolves table name via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+super.loadTable(ident)
+ v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+ v
+isHoodieTable(catalogTable) => true
+ v
+v2ReadEnabled = false, schemaEvolutionEnabled = false (defaults)
+ v
+RETURNS: HoodieInternalV2Table(...).v1TableWrapper = V1Table(catalogTable)
+ v
+Spark uses V1 fallback -> DefaultSource.createRelation()
+ v
+HoodieFileIndex -> FileScan -> ...
+</pre>
+</td>
+<td>
+<pre>
+SELECT * FROM hudi_table; -- table created with USING hudi
+ v
+Spark Analyzer resolves table name via catalog
+ v
+HoodieCatalog.loadTable(Identifier("hudi_table"))
+ v
+super.loadTable(ident)
+ v
+V1Table(catalogTable) where catalogTable.provider = "hudi"
+ v
+isHoodieTable(catalogTable) => true
+ v
+v2ReadEnabled = conf("hoodie.datasource.read.use.v2") = true
+ v
+RETURNS: HoodieSparkV2Table(...)
+ v
+SupportsRead.newScanBuilder() -> HoodieScanBuilder
+ v
+HoodieBatchScan -> ...
+</pre>
+</td>
+</tr>
+</table>
### Read
-<!-- main part -->
+All new classes go into package `org.apache.spark.sql.hudi.v2` inside
`hudi-spark-common`.
+
+| Class | Spark Interface
| Responsibility
|
+|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `HoodieDataSourceV2` | `TableProvider`, `DataSourceRegister`,
`CreatableRelationProvider`
| SPI entry point for `format("hudi_v2")`.
`CreatableRelationProvider` enables DataFrame API writes via
`df.write.format("hudi_v2")`.
|
+| `HoodieSparkV2Table` | `Table`, `SupportsRead`, `SupportsWrite`,
`V2TableWithV1Fallback`
| Routes reads to DSv2, writes to DSv1 fallback via
`HoodieV1WriteBuilder`.
|
+| `HoodieScanBuilder` | `ScanBuilder`, `SupportsPushDownFilters`,
`SupportsPushDownRequiredColumns`, `PartialLimitPushDown`,
`SupportsPushDownAggregates` | Collects filter, column pruning, limit, and
aggregate pushdowns.
|
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **Interface naming inconsistency: `PartialLimitPushDown` vs
`SupportsPushDownLimit`**
The class-responsibility table lists `PartialLimitPushDown` as a Spark
interface implemented by `HoodieScanBuilder`, but implementation phase 7 (line
289) references the standard Spark DSv2 interface `SupportsPushDownLimit`.
`SupportsPushDownLimit` exists in `org.apache.spark.sql.connector.read`;
`PartialLimitPushDown` is not a standard Spark interface name.
If `PartialLimitPushDown` is a Hudi-internal interface that wraps or extends
`SupportsPushDownLimit` to express partial-limit semantics, please add a
clarifying note. Otherwise the table entry should be updated to match the
standard Spark interface name used in phase 7. Is `PartialLimitPushDown` a
Hudi-internal interface that extends `SupportsPushDownLimit`, or should the
class table entry be corrected to `SupportsPushDownLimit`?
— *Greptile*
([original](https://github.com/yihua/hudi/pull/22#discussion_r3040257056))
(source:comment#3040257056)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]