martin-g commented on code in PR #1726:
URL: 
https://github.com/apache/datafusion-ballista/pull/1726#discussion_r3265027132


##########
ballista/scheduler/src/state/mod.rs:
##########
@@ -380,120 +366,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         job_id: &str,
         job_name: &str,
         session_ctx: Arc<SessionContext>,
-        plan: &LogicalPlan,
+        logical_plan: &LogicalPlan,
         queued_at: u64,
         subscriber: Option<JobStatusSubscriber>,
     ) -> Result<()> {
         let start = Instant::now();
-        let session_config = Arc::new(session_ctx.copied_config());
-        if log::max_level() >= log::Level::Debug {
-            // optimizing the plan here is redundant because the physical 
planner will do this again
-            // but it is helpful to see what the optimized plan will be
-            let optimized_plan = session_ctx.state().optimize(plan)?;
-            debug!("Optimized plan: {}", optimized_plan.display_indent());
-        }
-
-        let mut explain_inner_logical_plan: Option<Arc<LogicalPlan>> = None;
-        plan.apply(&mut |plan: &LogicalPlan| {
-            if let LogicalPlan::TableScan(scan) = plan {
-                let provider = source_as_provider(&scan.source)?;
-                if let Some(table) = 
provider.as_any().downcast_ref::<ListingTable>() {
-                    let local_paths: Vec<&ListingTableUrl> = table
-                        .table_paths()
-                        .iter()
-                        .filter(|url| url.as_str().starts_with("file:///"))
-                        .collect();
-                    if !local_paths.is_empty() {
-                        // These are local files rather than remote object 
stores, so we

Review Comment:
   Is it intentional that this check for local files is no more made ?



##########
ballista/scheduler/src/state/aqe/mod.rs:
##########
@@ -129,20 +132,33 @@ impl AdaptiveExecutionGraph {
     /// This will use the `DistributedPlanner` to break the plan into stages
     /// and build the DAG structure needed for distributed execution.
     #[allow(clippy::too_many_arguments)]
-    pub fn try_new(
+    pub async fn try_new(
         scheduler_id: &str,
         job_id: &str,
         job_name: &str,
-        session_id: &str,
-        plan: Arc<dyn ExecutionPlan>,
+        ctx: &SessionContext,
+        logical_plan: &LogicalPlan,
         queued_at: u64,
         session_config: Arc<SessionConfig>,

Review Comment:
   The `session_config` parameter could be removed. It is available via 
`ctx.copied_config()`
   
https://github.com/apache/datafusion-ballista/pull/1726/changes#diff-b96609436c205ce10f64d745f73eb8043c695665b84679d2d606432603f4884cR285
   



##########
ballista/scheduler/src/state/mod.rs:
##########
@@ -380,120 +366,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         job_id: &str,
         job_name: &str,
         session_ctx: Arc<SessionContext>,
-        plan: &LogicalPlan,
+        logical_plan: &LogicalPlan,
         queued_at: u64,
         subscriber: Option<JobStatusSubscriber>,
     ) -> Result<()> {
         let start = Instant::now();
-        let session_config = Arc::new(session_ctx.copied_config());
-        if log::max_level() >= log::Level::Debug {
-            // optimizing the plan here is redundant because the physical 
planner will do this again
-            // but it is helpful to see what the optimized plan will be
-            let optimized_plan = session_ctx.state().optimize(plan)?;
-            debug!("Optimized plan: {}", optimized_plan.display_indent());

Review Comment:
   It would be good to still log the optimized plan for better diagnostics.



##########
ballista/scheduler/src/state/mod.rs:
##########
@@ -380,120 +366,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         job_id: &str,
         job_name: &str,
         session_ctx: Arc<SessionContext>,
-        plan: &LogicalPlan,
+        logical_plan: &LogicalPlan,
         queued_at: u64,
         subscriber: Option<JobStatusSubscriber>,
     ) -> Result<()> {
         let start = Instant::now();
-        let session_config = Arc::new(session_ctx.copied_config());
-        if log::max_level() >= log::Level::Debug {
-            // optimizing the plan here is redundant because the physical 
planner will do this again
-            // but it is helpful to see what the optimized plan will be
-            let optimized_plan = session_ctx.state().optimize(plan)?;
-            debug!("Optimized plan: {}", optimized_plan.display_indent());
-        }
-
-        let mut explain_inner_logical_plan: Option<Arc<LogicalPlan>> = None;
-        plan.apply(&mut |plan: &LogicalPlan| {
-            if let LogicalPlan::TableScan(scan) = plan {
-                let provider = source_as_provider(&scan.source)?;
-                if let Some(table) = 
provider.as_any().downcast_ref::<ListingTable>() {
-                    let local_paths: Vec<&ListingTableUrl> = table
-                        .table_paths()
-                        .iter()
-                        .filter(|url| url.as_str().starts_with("file:///"))
-                        .collect();
-                    if !local_paths.is_empty() {
-                        // These are local files rather than remote object 
stores, so we
-                        // need to check that they are accessible on the 
scheduler (the client
-                        // may not be on the same host, or the data path may 
not be correctly
-                        // mounted in the container). There could be thousands 
of files so we
-                        // just check the first one.
-                        let url = &local_paths[0].as_str();
-                        // the unwraps are safe here because we checked that 
the url starts with file:///
-                        // we need to check both versions here to support 
Linux & Windows
-                        
ListingTableUrl::parse(url.strip_prefix("file://").unwrap())
-                            .or_else(|_| {
-                                ListingTableUrl::parse(
-                                    url.strip_prefix("file:///").unwrap(),
-                                )
-                            })
-                            .map_err(|e| {
-                                DataFusionError::External(
-                                    format!(
-                                        "logical plan refers to path on local 
file system \
-                                that is not accessible in the scheduler: 
{url}: {e:?}"
-                                    )
-                                        .into(),
-                                )
-                            })?;
-                    }
-                }
-            } else if let LogicalPlan::Explain(explain_plan) = plan {
-                explain_inner_logical_plan = Some(explain_plan.plan.clone());
-            }
-            Ok(TreeNodeRecursion::Continue)
-        })?;
-
-        let explain_distributed_plan = if let Some(inner_lp) = 
explain_inner_logical_plan
-        {
-            Some(
-                generate_distributed_explain_plan(job_id, session_ctx.clone(), 
inner_lp)
-                    .await?,
-            )
-        } else {
-            None
-        };
-        let logical_plan_str = plan.display_indent().to_string();
-
-        let plan = session_ctx.state().create_physical_plan(plan).await?;
-        debug!(
-            "Physical plan: {}",
-            DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
-        );
-
-        let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
-            if node.output_partitioning().partition_count() == 0 {
-                let empty: Arc<dyn ExecutionPlan> =
-                    Arc::new(EmptyExec::new(node.schema()));

Review Comment:
   Should this logic be preserved somewhere in the new implementation ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to