parthchandra commented on code in PR #2346:
URL: https://github.com/apache/datafusion-comet/pull/2346#discussion_r2331698398


##########
native/core/src/execution/operators/scan.rs:
##########
@@ -239,6 +239,87 @@ impl ScanExec {
 
         let mut timer = arrow_ffi_time.timer();
 
+        // Check for selection vectors and get selection indices if needed from
+        // JVM via FFI
+        // Selection vectors can be provided by, for instance, Iceberg to
+        // remove rows that have been deleted.
+        let selection_indices_arrays = Self::get_selection_indices(&mut env, 
iter, num_cols)?;
+
+        // fetch batch data from JVMi via FFI
+        let (num_rows, array_addrs, schema_addrs) =
+            Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?;
+
+        let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols);
+
+        // Process each column
+        for i in 0..num_cols {
+            let array_ptr = array_addrs[i];
+            let schema_ptr = schema_addrs[i];
+            let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;
+
+            // TODO: validate array input data
+            // array_data.validate_full()?;
+
+            let array = make_array(array_data);
+
+            // Apply selection if selection vectors exist (applies to all 
columns)
+            let array = if let Some(ref selection_arrays) = 
selection_indices_arrays {
+                let indices = &selection_arrays[i];
+                // Apply the selection using Arrow's take kernel
+                match take(&*array, &**indices, None) {
+                    Ok(selected_array) => selected_array,
+                    Err(e) => {
+                        return 
Err(CometError::from(ExecutionError::ArrowError(format!(
+                            "Failed to apply selection for column {i}: {e}",
+                        ))));
+                    }
+                }
+            } else {
+                array
+            };
+
+            let array = if arrow_ffi_safe {
+                // ownership of this array has been transferred to native
+                array
+            } else {
+                // it is necessary to copy the array because the contents may 
be
+                // overwritten on the JVM side in the future
+                copy_array(&array)
+            };
+
+            inputs.push(array);
+
+            // Drop the Arcs to avoid memory leak
+            unsafe {
+                Rc::from_raw(array_ptr as *const FFI_ArrowArray);
+                Rc::from_raw(schema_ptr as *const FFI_ArrowSchema);
+            }
+        }
+
+        timer.stop();

Review Comment:
   This is where the timer is stopped originally (it is supposed to be 
measuring the ffi time though it looks like it is doing more than that) . But I 
realized this was now messed up due to the selection vector changes. Updated 
that now. 
   @andygrove, can you take a second look to see if I've done this correctly?



##########
common/src/main/java/org/apache/comet/vector/CometSelectionVector.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.vector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A zero-copy selection vector that extends CometVector. This implementation 
stores the original
+ * data vector and selection indices as separate CometVectors, providing zero 
copy access to the the
+ * underlying data.
+ *
+ * <p>If the original vector has values [v0, v1, v2, v3, v4, v5, v6, v7] and 
the selection indices
+ * are [0, 1, 3, 4, 5, 7], then this selection vector will logically represent 
[v0, v1, v3, v4, v5,
+ * v7] without actually copying the data.
+ *
+ * <p>Most of the implementations of CometVector methods are implemented for 
completeness. We don't
+ * use this class except to transfer the original data and the selection 
indices to the native code.
+ */
+public class CometSelectionVector extends CometVector {
+  /** The original vector containing all values */
+  private final CometVector values;
+
+  /**
+   * The valid indices in the values vector. This array is converted into an 
Arrow vector so we can
+   * transfer the data to native in one JNI call. This is used to represent 
the rowid mapping used
+   * by Iceberg
+   */
+  private final int[] selectionIndices;
+
+  /**
+   * The indices vector containing selection indices. This is currently 
allocated by the JVM side
+   * unlike the values vector which is allocated on the native side
+   */
+  private final CometVector indices;
+
+  /**
+   * Number of selected elements. The indices array may have a length greater 
than this but only
+   * numValues elements in the array are valid
+   */
+  private final int numValues;
+
+  /**
+   * Creates a new selection vector from the given vector and indices.
+   *
+   * @param values The original vector to select from
+   * @param indices The indices to select from the original vector
+   * @param numValues The number of valid values in the indices array
+   * @throws IllegalArgumentException if any index is out of bounds
+   */
+  public CometSelectionVector(CometVector values, int[] indices, int 
numValues) {
+    // Use the values vector's datatype, useDecimal128, and dictionary provider
+    super(values.dataType(), values.useDecimal128);
+
+    this.values = values;
+    this.selectionIndices = indices;
+    this.numValues = numValues;
+
+    // Validate indices
+    int originalLength = values.numValues();
+    for (int idx : indices) {
+      if (idx < 0 || idx >= originalLength) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Index %d is out of bounds for vector of length %d", idx, 
originalLength));
+      }
+    }
+
+    // Create indices vector
+    BufferAllocator allocator = values.getValueVector().getAllocator();
+    IntVector indicesVector = new IntVector("selection_indices", allocator);
+    indicesVector.allocateNew(numValues);
+    for (int i = 0; i < numValues; i++) {
+      indicesVector.set(i, indices[i]);
+    }
+    indicesVector.setValueCount(numValues);
+
+    this.indices =
+        CometVector.getVector(indicesVector, values.useDecimal128, 
values.getDictionaryProvider());
+  }
+
+  /**
+   * Returns the index in the values vector for the given selection vector 
index.
+   *
+   * @param selectionIndex Index in the selection vector
+   * @return The corresponding index in the original vector
+   * @throws IndexOutOfBoundsException if selectionIndex is out of bounds
+   */
+  public int getValuesIndex(int selectionIndex) {
+    if (selectionIndex < 0 || selectionIndex >= numValues) {
+      throw new IndexOutOfBoundsException(
+          String.format(
+              "Selection index %d is out of bounds for selection vector of 
length %d",
+              selectionIndex, numValues));
+    }
+    return indices.getInt(selectionIndex);
+  }
+
+  /**
+   * Returns a reference to the values vector.
+   *
+   * @return The CometVector containing the values
+   */
+  public CometVector getValues() {
+    return values;
+  }
+
+  /**
+   * Returns the indices vector.
+   *
+   * @return The CometVector containing the indices
+   */
+  public CometVector getIndices() {
+    return indices;
+  }
+
+  /**
+   * Returns the selected indices.
+   *
+   * @return Array of selected indices
+   */
+  int[] getSelectedIndices() {

Review Comment:
   done



##########
common/src/main/java/org/apache/comet/vector/CometSelectionVector.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.vector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A zero-copy selection vector that extends CometVector. This implementation 
stores the original
+ * data vector and selection indices as separate CometVectors, providing zero 
copy access to the the
+ * underlying data.
+ *
+ * <p>If the original vector has values [v0, v1, v2, v3, v4, v5, v6, v7] and 
the selection indices
+ * are [0, 1, 3, 4, 5, 7], then this selection vector will logically represent 
[v0, v1, v3, v4, v5,
+ * v7] without actually copying the data.
+ *
+ * <p>Most of the implementations of CometVector methods are implemented for 
completeness. We don't
+ * use this class except to transfer the original data and the selection 
indices to the native code.
+ */
+public class CometSelectionVector extends CometVector {
+  /** The original vector containing all values */
+  private final CometVector values;
+
+  /**
+   * The valid indices in the values vector. This array is converted into an 
Arrow vector so we can
+   * transfer the data to native in one JNI call. This is used to represent 
the rowid mapping used
+   * by Iceberg
+   */
+  private final int[] selectionIndices;
+
+  /**
+   * The indices vector containing selection indices. This is currently 
allocated by the JVM side
+   * unlike the values vector which is allocated on the native side
+   */
+  private final CometVector indices;
+
+  /**
+   * Number of selected elements. The indices array may have a length greater 
than this but only
+   * numValues elements in the array are valid
+   */
+  private final int numValues;
+
+  /**
+   * Creates a new selection vector from the given vector and indices.
+   *
+   * @param values The original vector to select from
+   * @param indices The indices to select from the original vector
+   * @param numValues The number of valid values in the indices array
+   * @throws IllegalArgumentException if any index is out of bounds
+   */
+  public CometSelectionVector(CometVector values, int[] indices, int 
numValues) {
+    // Use the values vector's datatype, useDecimal128, and dictionary provider
+    super(values.dataType(), values.useDecimal128);
+
+    this.values = values;
+    this.selectionIndices = indices;
+    this.numValues = numValues;
+
+    // Validate indices
+    int originalLength = values.numValues();
+    for (int idx : indices) {
+      if (idx < 0 || idx >= originalLength) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Index %d is out of bounds for vector of length %d", idx, 
originalLength));
+      }
+    }
+
+    // Create indices vector
+    BufferAllocator allocator = values.getValueVector().getAllocator();
+    IntVector indicesVector = new IntVector("selection_indices", allocator);
+    indicesVector.allocateNew(numValues);
+    for (int i = 0; i < numValues; i++) {
+      indicesVector.set(i, indices[i]);
+    }
+    indicesVector.setValueCount(numValues);
+
+    this.indices =
+        CometVector.getVector(indicesVector, values.useDecimal128, 
values.getDictionaryProvider());
+  }
+
+  /**
+   * Returns the index in the values vector for the given selection vector 
index.
+   *
+   * @param selectionIndex Index in the selection vector
+   * @return The corresponding index in the original vector
+   * @throws IndexOutOfBoundsException if selectionIndex is out of bounds
+   */
+  public int getValuesIndex(int selectionIndex) {

Review Comment:
   done



##########
native/core/src/execution/operators/scan.rs:
##########
@@ -239,6 +239,87 @@ impl ScanExec {
 
         let mut timer = arrow_ffi_time.timer();
 
+        // Check for selection vectors and get selection indices if needed from
+        // JVM via FFI
+        // Selection vectors can be provided by, for instance, Iceberg to
+        // remove rows that have been deleted.
+        let selection_indices_arrays = Self::get_selection_indices(&mut env, 
iter, num_cols)?;
+
+        // fetch batch data from JVMi via FFI

Review Comment:
   Ah, vi . Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to