gabotechs commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2851555892


##########
datafusion/common/src/config.rs:
##########
@@ -996,6 +996,39 @@ config_namespace! {
         ///
         /// Note: This may reduce parallelism, rooting from the I/O level, if 
the number of distinct
         /// partitions is less than the target_partitions.
+        ///
+        /// Note for partitioned hash join dynamic filtering:
+        /// preserving file partitions can allow partition-index routing (`i 
-> i`) instead of
+        /// CASE-hash routing, but this assumes build/probe partition indices 
stay aligned for
+        /// partition hash join / dynamic filter consumers.
+        ///

Review Comment:
   This assumption is not specific to dynamic filters. If partition indices are 
not aligned, data returned in the join would be wrong whether dynamic filters 
are there or not.
   
   As I don't think this advice is specific to dynamic filters, I'd try to keep 
this doc comment more minimal. Note that this is supposed to be rendered not 
only as docs, but also as part of a `SHOW ALL` query, which might not render 
nicely, so having ASCII graphics is probably the not most render-friendly thing 
for this specific case.



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1478,23 +1571,46 @@ fn update_children(mut dist_context: 
DistributionContext) -> Result<Distribution
                 child_plan_any.is::<SortPreservingMergeExec>()
                     || child_plan_any.is::<CoalescePartitionsExec>()
                     || child_context.plan.children().is_empty()
-                    || child_context.children[0].data
+                    || child_context.children[0].data.dist_changing
                     || child_context
                         .plan
                         .required_input_distribution()
                         .iter()
                         .zip(child_context.children.iter())
                         .any(|(required_dist, child_context)| {
-                            child_context.data
+                            child_context.data.dist_changing
                                 && matches!(
                                     required_dist,
                                     Distribution::UnspecifiedDistribution
                                 )
                         })
-            }
+            };
+
+        // Track whether partitioning originates from a RepartitionExec, 
following the partition
+        // determining path through the context tree.
+        child_context.data.repartitioned =
+            if let Some(repartition) = 
child_plan_any.downcast_ref::<RepartitionExec>() {
+                !matches!(
+                    repartition.partitioning(),
+                    Partitioning::UnknownPartitioning(_)
+                )
+            } else if child_context.plan.children().is_empty() {

Review Comment:
   It might make sense to not use a `matches!` macro here, and just manually 
match each possibility. The reason is that, if in the future a new enum entry 
is added to `Partitioning`, the compiler will force the dev to chose a right 
value here manually, forcing them to think about it.



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1454,21 +1481,87 @@ pub fn ensure_distribution(
         plan.with_new_children(children_plans)?
     };
 
+    /// Helper to describe partitioning scheme for error messages
+    fn partitioning_scheme_name(is_repartitioned: bool) -> &'static str {
+        if is_repartitioned {
+            "hash-repartitioned"
+        } else {
+            "file-grouped"
+        }
+    }
+
+    // For partitioned hash joins, decide dynamic filter routing mode.
+    //
+    // Dynamic filtering requires matching partitioning schemes on both sides:
+    // - PartitionIndex: Both sides use file-grouped partitioning 
(value-based).
+    //   Partition i on build corresponds to partition i on probe by partition 
value.
+    // - CaseHash: Both sides use hash repartitioning (hash-based).
+    //   Uses CASE expression with hash(row) % N to route to correct partition 
filter.
+    //
+    // NOTE: If partitioning schemes are misaligned (one file-grouped, one 
hash-repartitioned),
+    // the partitioned join itself is incorrect.
+    // Partition assignments don't match:
+    // - File-grouped: partition 0 = all rows where column="A" (value-based)
+    // - Hash-repartitioned: partition 0 = all rows where hash(column) % N == 
0 (hash-based)
+    // These are incompatible, so the join will miss matching rows.
+    plan = if let Some(hash_join) = 
plan.as_any().downcast_ref::<HashJoinExec>()

Review Comment:
   Pretty smart and clean way of propagating whether data is getting 
repartitioned across steps :+1:



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -86,6 +108,11 @@ struct Inner {
     /// This is redundant with the watch channel state, but allows us to 
return immediately
     /// from `wait_complete()` without subscribing if already complete.
     is_complete: bool,
+    /// Per-partition filter expressions for partition-index routing.
+    /// When both sides of a hash join preserve their file partitioning (no 
RepartitionExec(Hash)),
+    /// build-partition i corresponds to probe-partition i. This allows 
storing per-partition
+    /// filters so that each partition only sees its own bounds, giving 
tighter filtering.
+    partitioned_exprs: PartitionedFilters,
 }

Review Comment:
   I wonder if there is a better alternative than leaking the concept of plan 
partitioning into dynamic filters.
   
   I do agree with @adriangb about leaking the concept of partitioning to 
expressions. I'd say:
   - If we want expressions to be aware of partitions, let's do a proper API 
design that is as clean and future proof as possible, and use it here.
   - If we want expressions to be agnostic from partitions, let's try to find a 
way implement this so that DynamicFilterExpressions are still agnostic from 
partitions.
   
   It would be nice to avoid middle ground solutions that exists just for the 
sake of shipping PRs faster.
   
   Some ideas that come to mind for keeping dynamic filters agnostic from plan 
partitioning in its structs:
   
   ### Introduce a new `__partition_index` REE column or something similar
   
   In the record batch right before calling `evalute()` on the dynamic filter 
expression, we could introduce a new column with the partition index. I think 
you already explored this without success, but I would like to understand 
better what blockers specifically you faced, I do see this approach yielding a 
potential good result.
   
   ### Use Arrow's Schema metadata for threading the partition index
   
   Arrow's schemas have the concept of 
[metadata](https://github.com/apache/arrow-rs/blob/main/arrow-schema/src/schema.rs#L191)
 that can be used for threading arbitrary information. I think this should also 
be considered "leaking partitioning details", but I wonder if this brings an 
opportunity of making it cleaner?
   
   ### Letting the hash join hold a Vec<HashJoinExecDynamicFilter> instead of a 
single HashJoinExecDynamicFilter
   
   That way, there is as many HashJoinExecDynamicFilter as partitions, and they 
behave exactly the same as before, but as if there was only 1 partition.
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to