This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ff07886f fix: remove broken directBuffer feature for parquet reads 
(#3814)
5ff07886f is described below

commit 5ff07886f04bcd7fbd4fb8b28793adea20ef482e
Author: Andy Grove <[email protected]>
AuthorDate: Fri Mar 27 16:00:53 2026 -0600

    fix: remove broken directBuffer feature for parquet reads (#3814)
---
 .../org/apache/comet/parquet/ColumnReader.java     | 16 ++------
 .../main/java/org/apache/comet/parquet/Native.java | 12 ------
 .../main/scala/org/apache/comet/CometConf.scala    |  7 ----
 native/core/src/parquet/mod.rs                     | 48 ++--------------------
 .../comet/parquet/CometParquetFileFormat.scala     |  3 --
 5 files changed, 6 insertions(+), 80 deletions(-)

diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
index 70c2532f1..6ef3bbf73 100644
--- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
@@ -20,7 +20,6 @@
 package org.apache.comet.parquet;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.spark.sql.types.DataType;
 
-import org.apache.comet.CometConf;
 import org.apache.comet.CometSchemaImporter;
 import org.apache.comet.IcebergApi;
 import org.apache.comet.vector.CometDecodedVector;
@@ -271,17 +269,9 @@ public class ColumnReader extends AbstractColumnReader {
                   "Unsupported value encoding: " + 
dataPageV1.getValueEncoding());
             }
             try {
-              boolean useDirectBuffer =
-                  (Boolean) 
CometConf.COMET_PARQUET_ENABLE_DIRECT_BUFFER().get();
-              if (useDirectBuffer) {
-                ByteBuffer buffer = dataPageV1.getBytes().toByteBuffer();
-                Native.setPageBufferV1(
-                    nativeHandle, pageValueCount, buffer, 
dataPageV1.getValueEncoding().ordinal());
-              } else {
-                byte[] array = dataPageV1.getBytes().toByteArray();
-                Native.setPageV1(
-                    nativeHandle, pageValueCount, array, 
dataPageV1.getValueEncoding().ordinal());
-              }
+              byte[] array = dataPageV1.getBytes().toByteArray();
+              Native.setPageV1(
+                  nativeHandle, pageValueCount, array, 
dataPageV1.getValueEncoding().ordinal());
             } catch (IOException e) {
               throw new RuntimeException(e);
             }
diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java 
b/common/src/main/java/org/apache/comet/parquet/Native.java
index fb9cedc1e..ec375402c 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -19,7 +19,6 @@
 
 package org.apache.comet.parquet;
 
-import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.comet.IcebergApi;
@@ -109,17 +108,6 @@ public final class Native extends NativeBase {
   public static native void setPageV1(
       long handle, int pageValueCount, byte[] pageData, int valueEncoding);
 
-  /**
-   * Passes a Parquet data page V1 to the native column reader.
-   *
-   * @param handle the handle to the native Parquet column reader
-   * @param pageValueCount the number of values in this data page
-   * @param buffer the actual page data, represented by a DirectByteBuffer.
-   * @param valueEncoding the encoding used by the values
-   */
-  public static native void setPageBufferV1(
-      long handle, int pageValueCount, ByteBuffer buffer, int valueEncoding);
-
   /**
    * Passes a Parquet data page V2 to the native column reader.
    *
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index bfe90181f..ab9d87e77 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -635,13 +635,6 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(false)
 
-  val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
-    conf("spark.comet.parquet.enable.directBuffer")
-      .category(CATEGORY_PARQUET)
-      .doc("Whether to use Java direct byte buffer when reading Parquet.")
-      .booleanConf
-      .createWithDefault(false)
-
   val COMET_ONHEAP_ENABLED: ConfigEntry[Boolean] =
     conf("spark.comet.exec.onHeap.enabled")
       .category(CATEGORY_TESTING)
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index a59a349bf..c1ff725c0 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -33,16 +33,14 @@ mod objectstore;
 
 use std::collections::HashMap;
 use std::task::Poll;
-use std::{boxed::Box, ptr::NonNull, sync::Arc};
+use std::{boxed::Box, sync::Arc};
 
 use crate::errors::{try_unwrap_or_throw, CometError};
 
-use arrow::ffi::FFI_ArrowArray;
-
 /// JNI exposed methods
 use jni::JNIEnv;
 use jni::{
-    objects::{GlobalRef, JByteBuffer, JClass},
+    objects::{GlobalRef, JClass},
     sys::{jboolean, jint, jlong},
 };
 
@@ -60,7 +58,7 @@ use 
crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACT
 use crate::parquet::parquet_exec::init_datasource_exec;
 use crate::parquet::parquet_support::prepare_object_store_with_configs;
 use arrow::array::{Array, RecordBatch};
-use arrow::buffer::{Buffer, MutableBuffer};
+use arrow::buffer::MutableBuffer;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::execution::SendableRecordBatchStream;
 use datafusion::physical_plan::ExecutionPlan;
@@ -75,7 +73,6 @@ use util::jni::{convert_column_descriptor, convert_encoding, 
deserialize_schema}
 /// Parquet read context maintained across multiple JNI calls.
 struct Context {
     pub column_reader: ColumnReader,
-    last_data_page: Option<GlobalRef>,
 }
 
 #[no_mangle]
@@ -132,7 +129,6 @@ pub extern "system" fn 
Java_org_apache_comet_parquet_Native_initColumnReader(
                 use_decimal_128 != 0,
                 use_legacy_date_timestamp != 0,
             ),
-            last_data_page: None,
         };
         let res = Box::new(ctx);
         Ok(Box::into_raw(res) as i64)
@@ -193,44 +189,6 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_setPageV1(
     })
 }
 
-/// # Safety
-/// This function is inheritly unsafe since it deals with raw pointers passed 
from JNI.
-#[no_mangle]
-pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_setPageBufferV1(
-    e: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    page_value_count: jint,
-    buffer: JByteBuffer,
-    value_encoding: jint,
-) {
-    try_unwrap_or_throw(&e, |env| {
-        let ctx = get_context(handle)?;
-        let reader = &mut ctx.column_reader;
-
-        // convert value encoding ordinal to the native encoding definition
-        let encoding = convert_encoding(value_encoding);
-
-        // Convert the page to global reference so it won't get GC'd by Java. 
Also free the last
-        // page if there is any.
-        ctx.last_data_page = Some(env.new_global_ref(&buffer)?);
-
-        let buf_slice = env.get_direct_buffer_address(&buffer)?;
-        let buf_capacity = env.get_direct_buffer_capacity(&buffer)?;
-
-        unsafe {
-            let page_ptr = NonNull::new_unchecked(buf_slice);
-            let buffer = Buffer::from_custom_allocation(
-                page_ptr,
-                buf_capacity,
-                Arc::new(FFI_ArrowArray::empty()),
-            );
-            reader.set_page_v1(page_value_count as usize, buffer, encoding);
-        }
-        Ok(())
-    })
-}
-
 /// # Safety
 /// This function is inheritly unsafe since it deals with raw pointers passed 
from JNI.
 #[no_mangle]
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 7874f3774..c2728fcab 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -201,9 +201,6 @@ object CometParquetFileFormat extends Logging with 
ShimSQLConf {
       sqlConf.isParquetINT96AsTimestamp)
 
     // Comet specific configs
-    hadoopConf.setBoolean(
-      CometConf.COMET_PARQUET_ENABLE_DIRECT_BUFFER.key,
-      CometConf.COMET_PARQUET_ENABLE_DIRECT_BUFFER.get())
     hadoopConf.setBoolean(
       CometConf.COMET_USE_DECIMAL_128.key,
       CometConf.COMET_USE_DECIMAL_128.get())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to