andygrove opened a new issue, #833:
URL: https://github.com/apache/datafusion-comet/issues/833

   ### What is the problem the feature request solves?
   
   I recently worked on a few experimental PRs around CBO that I am going to 
close because they did not provide an immediate benefit, but I wanted to 
document the work so am using this issue for that.
   
   ## Using DataFusion's Physical Optimizer
   
   DataFusion provides a physical optimizer and there may be benefit in the 
future from applying DataFusion's rules or custom Comet rules. For example, 
injecting CopyExec into the plan would be ideal for an optimizer rule.
   
   In `jni_api.rs` we would need to add the rules that we want to enable:
   
   ```rust
     let state = SessionStateBuilder::new()
            .with_config(session_config)
            .with_runtime_env(Arc::new(runtime))
            .with_default_features()
            
.with_physical_optimizer_rules(vec![Arc::new(TopKAggregation::new())])
            .build();
   ```
   
   Then in `planner.rs` we could add the call to optimize the plan:
   
   ```rust
      pub fn optimize_plan(
            &self,
            plan: Arc<dyn ExecutionPlan>,
        ) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
            // optimize the physical plan
            let datafusion_planner = DefaultPhysicalPlanner::default();
            datafusion_planner
                .optimize_physical_plan(plan, &self.session_ctx.state(), |_, _| 
{})
                .map_err(|e| e.into())
        }
   ```
   
   Because we receive a plan that is already optimized by Spark, there is no 
immediate benefit in enabling the current rules from DataFusion.
   
   ## Passing statistics down to the native plan
   
   It is possible for use to pass Spark statistics down to the native plan. For 
example, we can add this `QueryPlanSerde.scala`:
   
   ```scala
           op match {
              case qs: QueryStageExec =>
                qs.computeStats() match {
                  case Some(stats) =>
                    val statsBuilder = 
OperatorOuterClass.Statistics.newBuilder()
                    stats.rowCount.foreach(c => 
statsBuilder.setRowCount(c.toFloat))
                    statsBuilder.setSizeInBytes(stats.sizeInBytes.toFloat)
                    scanBuilder.setStatistics(statsBuilder.build())
                  case _ =>
                }
              case _ =>
            }
   ```
   
   It is also possible to get size in bytes from any Hadoop input relations and 
infer row count based on schema.
   
   There is no value in doing this though until we have optimizer rules that 
can make use of this data.
   
   ## Cost-model to determine when to fall back to Spark
   
   We could implement a cost-model with the relative costs of Comet vs Spark 
operators and expressions and fall back to Spark in the case where we estimate 
that Comet would be more expensive.
   
   
   ### Describe the potential solution
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to