westhide commented on PR #1212:
URL: 
https://github.com/apache/datafusion-ballista/pull/1212#issuecomment-2741958494

   > apparently you found another bug:
   > 
   > 
https://github.com/apache/datafusion-ballista/blob/bb10a1bebd52ebb91515efa7a2a977df740c2d7a/ballista/scheduler/src/scheduler_server/grpc.rs#L127
   > 
   > maybe if it is changed to:
   > 
   > ```rust
   >             for (_, task) in schedulable_tasks {
   >                 match self
   >                     .state
   >                     .task_manager
   >                     .prepare_task_definition(task.clone())
   >                 {
   >                     Ok(task_definition) => tasks.push(task_definition),
   >                     Err(e) => {
   >                         let job_id = task.partition.job_id;
   >                         error!(
   >                             "Error preparing task for job_id: {} error: 
{:?} ",
   >                             job_id,
   >                             e.to_string(),
   >                         );
   >                         let _ = self
   >                             .state
   >                             .task_manager
   >                             .abort_job(&job_id, e.to_string())
   >                             .await;
   >                     }
   >                 }
   >             }
   > ```
   > 
   > whole job gets cancelled in case of error, wdyt?
   
   
   
   > apparently you found another bug:
   > 
   > 
https://github.com/apache/datafusion-ballista/blob/bb10a1bebd52ebb91515efa7a2a977df740c2d7a/ballista/scheduler/src/scheduler_server/grpc.rs#L127
   > 
   > maybe if it is changed to:
   > 
   > ```rust
   >             for (_, task) in schedulable_tasks {
   >                 match self
   >                     .state
   >                     .task_manager
   >                     .prepare_task_definition(task.clone())
   >                 {
   >                     Ok(task_definition) => tasks.push(task_definition),
   >                     Err(e) => {
   >                         let job_id = task.partition.job_id;
   >                         error!(
   >                             "Error preparing task for job_id: {} error: 
{:?} ",
   >                             job_id,
   >                             e.to_string(),
   >                         );
   >                         let _ = self
   >                             .state
   >                             .task_manager
   >                             .abort_job(&job_id, e.to_string())
   >                             .await;
   >                     }
   >                 }
   >             }
   > ```
   > 
   > whole job gets cancelled in case of error, wdyt?
   
   Yes, I will try to fix this bug by sending the scheduler_server error to 
client side.
   ```text
   ballista_scheduler::scheduler_server::grpc: Error preparing task definition: 
DataFusionError(Internal("Unsupported plan and extension codec failed with 
[Internal error: unsupported plan type: NdJsonExec { base_config: 
object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", 
cannot_be_a_base: false, username: \"\", password: None, host: None, port: 
None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { 
num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics 
{ null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, 
distinct_count: Absent }] }, file_groups={1 group: 
[[home/westhide/Code/apache/datafusion-ballista/examples/testdata/simple.json]]},
 projection=[a], projected_statistics: Statistics { num_rows: Absent, 
total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: 
Absent, max_value: Absent, min_value: Absent, sum_value: Absent, 
distinct_count: Absent }] }, metrics: ExecutionPlanMetricsSet
  { inner: Mutex { data: MetricsSet { metrics: [] } } }, file_compression_type: 
FileCompressionType { variant: UNCOMPRESSED }, cache: PlanProperties { 
eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] 
}, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], 
constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: 
\"a\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), 
emission_type: Incremental, boundedness: Bounded, output_ordering: None } 
}.\nThis was likely caused by a bug in DataFusion's code and we would welcome 
that you file an bug report in our issue tracker]. Plan: NdJsonExec { 
base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", 
cannot_be_a_base: false, username: \"\", password: None, host: None, port: 
None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { 
num_rows: Absen
 t, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: 
Absent, max_value: Absent, min_value: Absent, sum_value: Absent, 
distinct_count: Absent }] }, file_groups={1 group: 
[[home/westhide/Code/apache/datafusion-ballista/examples/testdata/simple.json]]},
 projection=[a], projected_statistics: Statistics { num_rows: Absent, 
total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: 
Absent, max_value: Absent, min_value: Absent, sum_value: Absent, 
distinct_count: Absent }] }, metrics: ExecutionPlanMetricsSet { inner: Mutex { 
data: MetricsSet { metrics: [] } } }, file_compression_type: 
FileCompressionType { variant: UNCOMPRESSED }, cache: PlanProperties { 
eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] 
}, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], 
constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: 
\"a\", data_type: Int64, nullable: true, dict_id: 0, dict_is_order
 ed: false, metadata: {} }], metadata: {} } }, partitioning: 
UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, 
output_ordering: None } }"))
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to