Kontinuation commented on PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#issuecomment-2985113515
This implementation of RangePartitioning may be incorrect. RangePartitioning should partition the input DataFrame into partitions with consecutive and non-overlapping ranges, this requires scanning the entire DataFrame to obtain the ranges of each partition before performing the actual shuffle writing. Here is the PySpark code to illustrate the difference between the behavior of Comet and Vanilla Spark. ```python spark.range(0, 100000).write.format("parquet").mode("overwrite").save("range-partitioning") df = spark.read.parquet("range-partitioning") df_range_partitioned = df.repartitionByRange(10, "id") df_range_partitioned.explain() # Show the min and max of each range def get_partition_bounds(idx, iterator): min = None max = None for row in iterator: if min is None or row.id < min: min = row.id if max is None or row.id > max: max = row.id yield idx, min, max partition_bounds = df_range_partitioned.rdd.mapPartitionsWithIndex(get_partition_bounds).collect() # Print the results for partition_id, min_id, max_id in sorted(partition_bounds): print(f"Partition {partition_id}: min_id={min_id}, max_id={max_id}") ``` **Comet**: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- CometExchange rangepartitioning(id#17L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=173] +- CometScan parquet [id#17L] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> Partition 0: min_id=0, max_id=90799 Partition 1: min_id=753, max_id=91680 Partition 2: min_id=1527, max_id=92520 Partition 3: min_id=2399, max_id=93284 Partition 4: min_id=3274, max_id=94123 Partition 5: min_id=4053, max_id=94844 Partition 6: min_id=4851, max_id=95671 Partition 7: min_id=5738, max_id=96522 Partition 8: min_id=6571, max_id=97335 Partition 9: min_id=7408, max_id=99999 ``` **Spark**: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange rangepartitioning(id#20L ASC NULLS FIRST, 10), REPARTITION_BY_NUM, [plan_id=197] +- FileScan parquet [id#20L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/path/to/range-p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> Partition 0: min_id=0, max_id=9974 Partition 1: min_id=9975, max_id=19981 Partition 2: min_id=19982, max_id=29993 Partition 3: min_id=29994, max_id=39997 Partition 4: min_id=39998, max_id=49959 Partition 5: min_id=49960, max_id=59995 Partition 6: min_id=59996, max_id=69898 Partition 7: min_id=69899, max_id=79970 Partition 8: min_id=79971, max_id=89976 Partition 9: min_id=89977, max_id=99999 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org