Kontinuation commented on PR #1862:
URL: 
https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2985113515

   This implementation of RangePartitioning may be incorrect. RangePartitioning 
should partition the input DataFrame into partitions with consecutive and 
non-overlapping ranges, this requires scanning the entire DataFrame to obtain 
the ranges of each partition before performing the actual shuffle writing.
   
   Here is the PySpark code to illustrate the difference between the behavior 
of Comet and Vanilla Spark.
   
   ```python
   spark.range(0, 
100000).write.format("parquet").mode("overwrite").save("range-partitioning")
   
   df = spark.read.parquet("range-partitioning")
   df_range_partitioned = df.repartitionByRange(10, "id")
   
   df_range_partitioned.explain()
   
   # Show the min and max of each range
   def get_partition_bounds(idx, iterator):
       min = None
       max = None
       for row in iterator:
           if min is None or row.id < min:
               min = row.id
           if max is None or row.id > max:
               max = row.id
       yield idx, min, max
   
   partition_bounds = 
df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect()
   
   # Print the results
   for partition_id, min_id, max_id in sorted(partition_bounds):
       print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}")
   ```
   
   **Comet**:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10), 
REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173]
      +- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format: 
CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
   
   Partition 0: min_id=0, max_id=90799
   Partition 1: min_id=753, max_id=91680
   Partition 2: min_id=1527, max_id=92520
   Partition 3: min_id=2399, max_id=93284
   Partition 4: min_id=3274, max_id=94123
   Partition 5: min_id=4053, max_id=94844
   Partition 6: min_id=4851, max_id=95671
   Partition 7: min_id=5738, max_id=96522
   Partition 8: min_id=6571, max_id=97335
   Partition 9: min_id=7408, max_id=99999
   ```
   
   **Spark**:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10), 
REPARTITION_BY_NUM, [plan_id=197]
      +- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format: 
Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
   
   Partition 0: min_id=0, max_id=9974
   Partition 1: min_id=9975, max_id=19981
   Partition 2: min_id=19982, max_id=29993
   Partition 3: min_id=29994, max_id=39997
   Partition 4: min_id=39998, max_id=49959
   Partition 5: min_id=49960, max_id=59995
   Partition 6: min_id=59996, max_id=69898
   Partition 7: min_id=69899, max_id=79970
   Partition 8: min_id=79971, max_id=89976
   Partition 9: min_id=89977, max_id=99999
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to