hsiang-c commented on code in PR #2346: URL: https://github.com/apache/datafusion-comet/pull/2346#discussion_r2331586302
########## native/core/src/execution/operators/scan.rs: ########## @@ -239,6 +239,87 @@ impl ScanExec { let mut timer = arrow_ffi_time.timer(); + // Check for selection vectors and get selection indices if needed from + // JVM via FFI + // Selection vectors can be provided by, for instance, Iceberg to + // remove rows that have been deleted. + let selection_indices_arrays = Self::get_selection_indices(&mut env, iter, num_cols)?; + + // fetch batch data from JVMi via FFI + let (num_rows, array_addrs, schema_addrs) = + Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?; + + let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols); + + // Process each column + for i in 0..num_cols { + let array_ptr = array_addrs[i]; + let schema_ptr = schema_addrs[i]; + let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; + + // TODO: validate array input data + // array_data.validate_full()?; + + let array = make_array(array_data); + + // Apply selection if selection vectors exist (applies to all columns) + let array = if let Some(ref selection_arrays) = selection_indices_arrays { + let indices = &selection_arrays[i]; + // Apply the selection using Arrow's take kernel + match take(&*array, &**indices, None) { + Ok(selected_array) => selected_array, + Err(e) => { + return Err(CometError::from(ExecutionError::ArrowError(format!( + "Failed to apply selection for column {i}: {e}", + )))); + } + } + } else { + array + }; + + let array = if arrow_ffi_safe { + // ownership of this array has been transferred to native + array + } else { + // it is necessary to copy the array because the contents may be + // overwritten on the JVM side in the future + copy_array(&array) + }; + + inputs.push(array); + + // Drop the Arcs to avoid memory leak + unsafe { + Rc::from_raw(array_ptr as *const FFI_ArrowArray); + Rc::from_raw(schema_ptr as *const FFI_ArrowSchema); + } + } + + timer.stop(); Review Comment: Why do we stop timer here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org