westhide commented on PR #1212: URL: https://github.com/apache/datafusion-ballista/pull/1212#issuecomment-2741958494
> apparently you found another bug: > > https://github.com/apache/datafusion-ballista/blob/bb10a1bebd52ebb91515efa7a2a977df740c2d7a/ballista/scheduler/src/scheduler_server/grpc.rs#L127 > > maybe if it is changed to: > > ```rust > for (_, task) in schedulable_tasks { > match self > .state > .task_manager > .prepare_task_definition(task.clone()) > { > Ok(task_definition) => tasks.push(task_definition), > Err(e) => { > let job_id = task.partition.job_id; > error!( > "Error preparing task for job_id: {} error: {:?} ", > job_id, > e.to_string(), > ); > let _ = self > .state > .task_manager > .abort_job(&job_id, e.to_string()) > .await; > } > } > } > ``` > > whole job gets cancelled in case of error, wdyt? > apparently you found another bug: > > https://github.com/apache/datafusion-ballista/blob/bb10a1bebd52ebb91515efa7a2a977df740c2d7a/ballista/scheduler/src/scheduler_server/grpc.rs#L127 > > maybe if it is changed to: > > ```rust > for (_, task) in schedulable_tasks { > match self > .state > .task_manager > .prepare_task_definition(task.clone()) > { > Ok(task_definition) => tasks.push(task_definition), > Err(e) => { > let job_id = task.partition.job_id; > error!( > "Error preparing task for job_id: {} error: {:?} ", > job_id, > e.to_string(), > ); > let _ = self > .state > .task_manager > .abort_job(&job_id, e.to_string()) > .await; > } > } > } > ``` > > whole job gets cancelled in case of error, wdyt? Yes, I will try to fix this bug by sending the scheduler_server error to client side. ```text ballista_scheduler::scheduler_server::grpc: Error preparing task definition: DataFusionError(Internal("Unsupported plan and extension codec failed with [Internal error: unsupported plan type: NdJsonExec { base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[home/westhide/Code/apache/datafusion-ballista/examples/testdata/simple.json]]}, projection=[a], projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, file_compression_type: FileCompressionType { variant: UNCOMPRESSED }, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: \"a\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, output_ordering: None } }.\nThis was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker]. Plan: NdJsonExec { base_config: object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: None, path: \"/\", query: None, fragment: None } }, statistics=Statistics { num_rows: Absen t, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, file_groups={1 group: [[home/westhide/Code/apache/datafusion-ballista/examples/testdata/simple.json]]}, projection=[a], projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent, column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent, sum_value: Absent, distinct_count: Absent }] }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, file_compression_type: FileCompressionType { variant: UNCOMPRESSED }, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], constraints: Constraints { inner: [] }, schema: Schema { fields: [Field { name: \"a\", data_type: Int64, nullable: true, dict_id: 0, dict_is_order ed: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, output_ordering: None } }")) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org