gabotechs commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2851555892
##########
datafusion/common/src/config.rs:
##########
@@ -996,6 +996,39 @@ config_namespace! {
///
/// Note: This may reduce parallelism, rooting from the I/O level, if
the number of distinct
/// partitions is less than the target_partitions.
+ ///
+ /// Note for partitioned hash join dynamic filtering:
+ /// preserving file partitions can allow partition-index routing (`i
-> i`) instead of
+ /// CASE-hash routing, but this assumes build/probe partition indices
stay aligned for
+ /// partition hash join / dynamic filter consumers.
+ ///
Review Comment:
This assumption is not specific to dynamic filters. If partition indices are
not aligned, data returned in the join would be wrong whether dynamic filters
are there or not.
As I don't think this advice is specific to dynamic filters, I'd try to keep
this doc comment more minimal. Note that this is supposed to be rendered not
only as docs, but also as part of a `SHOW ALL` query, which might not render
nicely, so having ASCII graphics is probably the not most render-friendly thing
for this specific case.
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1478,23 +1571,46 @@ fn update_children(mut dist_context:
DistributionContext) -> Result<Distribution
child_plan_any.is::<SortPreservingMergeExec>()
|| child_plan_any.is::<CoalescePartitionsExec>()
|| child_context.plan.children().is_empty()
- || child_context.children[0].data
+ || child_context.children[0].data.dist_changing
|| child_context
.plan
.required_input_distribution()
.iter()
.zip(child_context.children.iter())
.any(|(required_dist, child_context)| {
- child_context.data
+ child_context.data.dist_changing
&& matches!(
required_dist,
Distribution::UnspecifiedDistribution
)
})
- }
+ };
+
+ // Track whether partitioning originates from a RepartitionExec,
following the partition
+ // determining path through the context tree.
+ child_context.data.repartitioned =
+ if let Some(repartition) =
child_plan_any.downcast_ref::<RepartitionExec>() {
+ !matches!(
+ repartition.partitioning(),
+ Partitioning::UnknownPartitioning(_)
+ )
+ } else if child_context.plan.children().is_empty() {
Review Comment:
It might make sense to not use a `matches!` macro here, and just manually
match each possibility. The reason is that, if in the future a new enum entry
is added to `Partitioning`, the compiler will force the dev to chose a right
value here manually, forcing them to think about it.
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1454,21 +1481,87 @@ pub fn ensure_distribution(
plan.with_new_children(children_plans)?
};
+ /// Helper to describe partitioning scheme for error messages
+ fn partitioning_scheme_name(is_repartitioned: bool) -> &'static str {
+ if is_repartitioned {
+ "hash-repartitioned"
+ } else {
+ "file-grouped"
+ }
+ }
+
+ // For partitioned hash joins, decide dynamic filter routing mode.
+ //
+ // Dynamic filtering requires matching partitioning schemes on both sides:
+ // - PartitionIndex: Both sides use file-grouped partitioning
(value-based).
+ // Partition i on build corresponds to partition i on probe by partition
value.
+ // - CaseHash: Both sides use hash repartitioning (hash-based).
+ // Uses CASE expression with hash(row) % N to route to correct partition
filter.
+ //
+ // NOTE: If partitioning schemes are misaligned (one file-grouped, one
hash-repartitioned),
+ // the partitioned join itself is incorrect.
+ // Partition assignments don't match:
+ // - File-grouped: partition 0 = all rows where column="A" (value-based)
+ // - Hash-repartitioned: partition 0 = all rows where hash(column) % N ==
0 (hash-based)
+ // These are incompatible, so the join will miss matching rows.
+ plan = if let Some(hash_join) =
plan.as_any().downcast_ref::<HashJoinExec>()
Review Comment:
Pretty smart and clean way of propagating whether data is getting
repartitioned across steps :+1:
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -86,6 +108,11 @@ struct Inner {
/// This is redundant with the watch channel state, but allows us to
return immediately
/// from `wait_complete()` without subscribing if already complete.
is_complete: bool,
+ /// Per-partition filter expressions for partition-index routing.
+ /// When both sides of a hash join preserve their file partitioning (no
RepartitionExec(Hash)),
+ /// build-partition i corresponds to probe-partition i. This allows
storing per-partition
+ /// filters so that each partition only sees its own bounds, giving
tighter filtering.
+ partitioned_exprs: PartitionedFilters,
}
Review Comment:
I wonder if there is a better alternative than leaking the concept of plan
partitioning into dynamic filters.
I do agree with @adriangb about leaking the concept of partitioning to
expressions. I'd say:
- If we want expressions to be aware of partitions, let's do a proper API
design that is as clean and future proof as possible, and use it here.
- If we want expressions to be agnostic from partitions, let's try to find a
way implement this so that DynamicFilterExpressions are still agnostic from
partitions.
It would be nice to avoid middle ground solutions that exists just for the
sake of shipping PRs faster.
Some ideas that come to mind for keeping dynamic filters agnostic from plan
partitioning in its structs:
### Introduce a new `__partition_index` REE column or something similar
In the record batch right before calling `evalute()` on the dynamic filter
expression, we could introduce a new column with the partition index. I think
you already explored this without success, but I would like to understand
better what blockers specifically you faced, I do see this approach yielding a
potential good result.
### Use Arrow's Schema metadata for threading the partition index
Arrow's schemas have the concept of
[metadata](https://github.com/apache/arrow-rs/blob/main/arrow-schema/src/schema.rs#L191)
that can be used for threading arbitrary information. I think this should also
be considered "leaking partitioning details", but I wonder if this brings an
opportunity of making it cleaner?
### Letting the hash join hold a Vec<HashJoinExecDynamicFilter> instead of a
single HashJoinExecDynamicFilter
That way, there is as many HashJoinExecDynamicFilter as partitions, and they
behave exactly the same as before, but as if there was only 1 partition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]