zhuqi-lucas commented on PR #16196: URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2922745280
> After my previous comment I started thinking about how our Java code handles this kind of thing. Typically when we do a `Future#cancel(mayInterrupt: true)` we have `Thread#isInterrupted` checks in the async code and throw `InterruptedException` in response. Then we let the exception bubble up the stack and we're done. > > Would that maybe be an option? Have a cancellation token that rather than triggering `return Pending` triggers a `return Ready(Err(DataFusionError::ExecutionJoin))` or something similar. Error handling logic is already in place throughout the codebase and errors propagate nicely up the call chain. That might be a way to avoid having to sprinkle these checks all over the place in the codebase. @pepijnve I agree with this is the possible solution, and also combined to make the cancellation low level, something like: ### Option A: Wrap in `DataSourceExec.open()` ```rust impl DataSource for FileScanConfig { fn open( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { // 1. build your raw stream let raw_stream = Box::pin(FileStream::new(…)?); // 2. wrap it in a generic cancellation stream let cancellable = CancellationStream::new(context.clone(), raw_stream); // 3. return the wrapped stream Ok(Box::pin(cancellable)) } } ``` ```rust ... if ctx.is_cancelled() { return Poll::Ready(Some(Err(DataFusionError::Execution("Query cancelled".to_string().into())))); } ... ``` But for the update cancellation logic, it seems make the logic complex, current our ctx: ```rust #[derive(Clone)] pub struct SessionContext { /// UUID for the session session_id: String, /// Session start time session_start_time: DateTime<Utc>, /// Shared session state for the session state: Arc<RwLock<SessionState>>, } ``` I can try this, i think the benefit is, when the consumer is trying to consume different types streaming exec(all based datasource streaming). So if one receive the cancel signal, it will return err Query cancelled. So it will not polling other streaming execs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org