Hi Peter, 

Thanks for reviewing the SPIP doc and PR. I've updated section B.3.B and B.3.C 
in the SPIP to clarify.
                                                                                
                                                                          
When I traced through the optimizer rule ordering for MOR vs CoW, I observed 
the following (experts here: please correct me if I'm wrong):                   
                                          

For MOR (WriteDelta), the DataSourceV2Relation stays in the plan through the 
normal optimizer batches. V2ScanRelationPushDown handles it like any other DSv2 
scan. It looks at what the plan above references and narrows accordingly. Since 
my implementation produces a Project that only references the 
connector-declared columns, ColumnPruning propagates that narrowness down, and 
V2ScanRelationPushDown picks it up naturally.

For CoW (ReplaceData), I found that GroupBasedRowLevelOperationScanPlanning 
fires in preOptimizationBatches, i.e. before ColumnPruning or 
V2ScanRelationPushDown run. This rule pattern-matches only on ReplaceData nodes 
(never WriteDelta) and converts the DataSourceV2Relation into a physical scan 
reading relation.output directly, ignoring any Project above it. By the time 
the normal optimizer runs, there's no DataSourceV2Relation left to narrow.      
                           

So the implementation narrows DataSourceV2Relation.output at analysis time for 
CoW (in buildRelationWithAttrs).
                                                                                
                                                                          
In summary:                                                                     
                                                                        
  - MOR: narrow Project → standard optimizer pipeline handles it (no rule 
changes)
  - CoW: narrow DataSourceV2Relation.output at analysis time → 
GroupBasedRowLevelOperationScanPlanning sees it already narrow 
→RowLevelOperationRuntimeGroupFiltering tolerates missing columns    

I’m open to ideas to make this more clean, please let me know. 

Thanks, 
Anurag            



> On Apr 28, 2026, at 2:36 AM, Peter Toth <[email protected]> wrote:
> 
> Thank you Anurag for working on this!
> Let's focus on the SPIP first.
> The schema resolution flow makes sense to me, but I found the differences 
> between the "Merge-on-Read"  and "Copy-on-Write" implementations a bit 
> challenging to grasp at first. Could you clarify the purpose of the mentioned 
> rules and how they are applied/affected in your implementation? I left some 
> comments in the doc.
> 
> Thanks,
> Peter
> 
> On Thu, Apr 23, 2026 at 8:39 PM Anurag Mantripragada 
> <[email protected] <mailto:[email protected]>> 
> wrote:
>> Hi everyone,
>> 
>> I would like to start a discussion regarding an enhancement to the DSv2 API. 
>> This proposal allows connectors to declare which columns they need to 
>> receive during an update, significantly improving performance and reducing 
>> write amplification. This is particularly beneficial for connectors like 
>> Iceberg on wide tables, which are increasingly common in AI/ML use cases.
>> 
>> I have included a PR with this SPIP that demonstrates the changes. It has 
>> been tested on the Iceberg connector and is working well end-to-end. 
>> 
>> Huaxian Gao has agreed to serve as the shepherd for this SPIP.
>> 
>> SPARK-56599 <https://issues.apache.org/jira/browse/SPARK-56599>
>> SPIP Doc 
>> <https://docs.google.com/document/d/1-Wiw9U54ESpbLakb9Cn_mO4AviM4nrk4TF7rNhI3JZg/edit?tab=t.0#heading=h.yoitjxhaitk8>
>> PR <https://github.com/apache/spark/pull/55518>
>> 
>> Please take a look and provide feedback! 
>> 
>> Thanks,
>> Anurag Mantripragada

Reply via email to