zhuqi-lucas commented on PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#issuecomment-2922745280

   > After my previous comment I started thinking about how our Java code 
handles this kind of thing. Typically when we do a `Future#cancel(mayInterrupt: 
true)` we have `Thread#isInterrupted` checks in the async code and throw 
`InterruptedException` in response. Then we let the exception bubble up the 
stack and we're done.
   > 
   > Would that maybe be an option? Have a cancellation token that rather than 
triggering `return Pending` triggers a `return 
Ready(Err(DataFusionError::ExecutionJoin))` or something similar. Error 
handling logic is already in place throughout the codebase and errors propagate 
nicely up the call chain. That might be a way to avoid having to sprinkle these 
checks all over the place in the codebase.
   
   
   @pepijnve  I agree with this is the possible solution, and also combined to 
make the cancellation low level, something like:
   
   
   ### Option A: Wrap in `DataSourceExec.open()`  
   
   ```rust
   impl DataSource for FileScanConfig {
       fn open(
           &self,
           partition: usize,
           context: Arc<TaskContext>,
       ) -> Result<SendableRecordBatchStream> {
           // 1. build your raw stream
           let raw_stream = Box::pin(FileStream::new(…)?);
   
           // 2. wrap it in a generic cancellation stream
           let cancellable =
               CancellationStream::new(context.clone(), raw_stream);
   
           // 3. return the wrapped stream
           Ok(Box::pin(cancellable))
       }
   }
   
   ```
   
   ```rust
   
   ...
   if ctx.is_cancelled() {
       return Poll::Ready(Some(Err(DataFusionError::Execution("Query 
cancelled".to_string().into()))));
   }
   ...
   
   ```
   
   
   But for the update cancellation logic, it seems make the logic complex, 
current our ctx:
   ```rust
   #[derive(Clone)]
   pub struct SessionContext {
       /// UUID for the session
       session_id: String,
       /// Session start time
       session_start_time: DateTime<Utc>,
       /// Shared session state for the session
       state: Arc<RwLock<SessionState>>,
   }
   ```
   
   I can try this, i think the benefit is, when the consumer is trying to 
consume different types streaming exec(all based datasource streaming). So if 
one receive the cancel signal, it will return err Query cancelled. So it will 
not polling other streaming execs?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to