gabotechs commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2851878804
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1454,21 +1481,87 @@ pub fn ensure_distribution(
plan.with_new_children(children_plans)?
};
+ /// Helper to describe partitioning scheme for error messages
+ fn partitioning_scheme_name(is_repartitioned: bool) -> &'static str {
+ if is_repartitioned {
+ "hash-repartitioned"
+ } else {
+ "file-grouped"
+ }
+ }
+
+ // For partitioned hash joins, decide dynamic filter routing mode.
+ //
+ // Dynamic filtering requires matching partitioning schemes on both sides:
+ // - PartitionIndex: Both sides use file-grouped partitioning
(value-based).
+ // Partition i on build corresponds to partition i on probe by partition
value.
+ // - CaseHash: Both sides use hash repartitioning (hash-based).
+ // Uses CASE expression with hash(row) % N to route to correct partition
filter.
+ //
+ // NOTE: If partitioning schemes are misaligned (one file-grouped, one
hash-repartitioned),
+ // the partitioned join itself is incorrect.
+ // Partition assignments don't match:
+ // - File-grouped: partition 0 = all rows where column="A" (value-based)
+ // - Hash-repartitioned: partition 0 = all rows where hash(column) % N ==
0 (hash-based)
+ // These are incompatible, so the join will miss matching rows.
+ plan = if let Some(hash_join) =
plan.as_any().downcast_ref::<HashJoinExec>()
Review Comment:
Pretty smart and clean way of propagating wether data is getting
repartitioned across steps :+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]