tobixdev commented on issue #18566:
URL: https://github.com/apache/datafusion/issues/18566#issuecomment-3728690204

   Also looks good from our side (RDF Fusion): 
https://github.com/tobixdev/rdf-fusion/pull/166
   
   The only thing I stumbled upon was that we had issues with the "lazy dynamic 
filter pushdown" in `HashJoin`. 
   
   For context, the `DynamicFilterPhysicalExpr` has an outer `Arc` (`Arc<dyn 
PhysicalExpr>`) and an inner `Arc` that holds the inner state 
`DynamicFilterPhysicalExpr::inner`. The check whether the dynamic filter is 
"used", I believe, is based on the inner `Arc` as this is important when 
rewriting the expression.
   
   We cloned the "outer Arc" such that the "inner Arc" was still marked as 
"unused" which caused that the `HashJoin` was not computing the dynamic filters.
   
   Could also be that our implementation does something weird with the filter. 
Basically we want to keep it around until our query operator starts running. 
This is where we snapshot it. Maybe we should include this change as a note in 
the upgrade guide? I think this is related to 
https://github.com/apache/datafusion/issues/17527 . Any thoughts @adriangb 
@LiaCastaneda . 
   
   Maybe I am also doing something wrong or my thinking has an error. With the 
2nd version below it seems to work again. 
   
   Also is there a reason why `DynamicFilterPhysicalExpr` does not implement 
`Clone`? This would avoid the awkward `with_new_children` call below.
   
   I am not sure whether this is a problems as I am not sure whether this is 
expected behavior and I am doing something stupid. I'd appreciate if someone 
has further information on that. Thanks!!
   
   Our earlier code:
   
   ```rust
   impl MyPredicateExpr {
       pub fn try_from(expr: &Arc<dyn PhysicalExpr>) -> Option<Self> {
           let any_expr = Arc::clone(expr) as Arc<dyn Any + Send + Sync>; // 
<=== Directly clone the outer Arc. Inner Arc remains at one ref count.
           if let Ok(expr) = any_expr.downcast::<DynamicFilterPhysicalExpr>() {
               return Some(MyPredicateExpr::Dynamic(expr));
           }
      // ...
      }
   }
   ```
   
   Now we have a workaround that copied the inner `Arc` but I am not sure 
whether this is the best way. I am not 
   
   ```rust
   impl MyPredicateExpr {
       pub fn try_from(expr: &Arc<dyn PhysicalExpr>) -> Option<Self> {
           // We explicitly do not copy the `Arc` as the filter might be 
considered as unused.
           if let Some(_) = 
expr.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {
               let cloned = expr
                   .clone()
                   .with_new_children(expr.children().iter().map(|c| 
(*c).clone()).collect()) // <=== Causes cloning of the 
                   .expect("Old children are valid");
               let cast = (cloned as Arc<dyn Any + Send + Sync>)
                   .downcast::<DynamicFilterPhysicalExpr>()
                   .expect("Type checked above");
   
               return Some(MyPredicateExpr::Dynamic(cast));
           }
      // ...
      }
   }
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to