This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 5ff07886f fix: remove broken directBuffer feature for parquet reads
(#3814)
5ff07886f is described below
commit 5ff07886f04bcd7fbd4fb8b28793adea20ef482e
Author: Andy Grove <[email protected]>
AuthorDate: Fri Mar 27 16:00:53 2026 -0600
fix: remove broken directBuffer feature for parquet reads (#3814)
---
.../org/apache/comet/parquet/ColumnReader.java | 16 ++------
.../main/java/org/apache/comet/parquet/Native.java | 12 ------
.../main/scala/org/apache/comet/CometConf.scala | 7 ----
native/core/src/parquet/mod.rs | 48 ++--------------------
.../comet/parquet/CometParquetFileFormat.scala | 3 --
5 files changed, 6 insertions(+), 80 deletions(-)
diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
index 70c2532f1..6ef3bbf73 100644
--- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
@@ -20,7 +20,6 @@
package org.apache.comet.parquet;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.spark.sql.types.DataType;
-import org.apache.comet.CometConf;
import org.apache.comet.CometSchemaImporter;
import org.apache.comet.IcebergApi;
import org.apache.comet.vector.CometDecodedVector;
@@ -271,17 +269,9 @@ public class ColumnReader extends AbstractColumnReader {
"Unsupported value encoding: " +
dataPageV1.getValueEncoding());
}
try {
- boolean useDirectBuffer =
- (Boolean)
CometConf.COMET_PARQUET_ENABLE_DIRECT_BUFFER().get();
- if (useDirectBuffer) {
- ByteBuffer buffer = dataPageV1.getBytes().toByteBuffer();
- Native.setPageBufferV1(
- nativeHandle, pageValueCount, buffer,
dataPageV1.getValueEncoding().ordinal());
- } else {
- byte[] array = dataPageV1.getBytes().toByteArray();
- Native.setPageV1(
- nativeHandle, pageValueCount, array,
dataPageV1.getValueEncoding().ordinal());
- }
+ byte[] array = dataPageV1.getBytes().toByteArray();
+ Native.setPageV1(
+ nativeHandle, pageValueCount, array,
dataPageV1.getValueEncoding().ordinal());
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java
b/common/src/main/java/org/apache/comet/parquet/Native.java
index fb9cedc1e..ec375402c 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -19,7 +19,6 @@
package org.apache.comet.parquet;
-import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.comet.IcebergApi;
@@ -109,17 +108,6 @@ public final class Native extends NativeBase {
public static native void setPageV1(
long handle, int pageValueCount, byte[] pageData, int valueEncoding);
- /**
- * Passes a Parquet data page V1 to the native column reader.
- *
- * @param handle the handle to the native Parquet column reader
- * @param pageValueCount the number of values in this data page
- * @param buffer the actual page data, represented by a DirectByteBuffer.
- * @param valueEncoding the encoding used by the values
- */
- public static native void setPageBufferV1(
- long handle, int pageValueCount, ByteBuffer buffer, int valueEncoding);
-
/**
* Passes a Parquet data page V2 to the native column reader.
*
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index bfe90181f..ab9d87e77 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -635,13 +635,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
- val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
- conf("spark.comet.parquet.enable.directBuffer")
- .category(CATEGORY_PARQUET)
- .doc("Whether to use Java direct byte buffer when reading Parquet.")
- .booleanConf
- .createWithDefault(false)
-
val COMET_ONHEAP_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.onHeap.enabled")
.category(CATEGORY_TESTING)
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index a59a349bf..c1ff725c0 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -33,16 +33,14 @@ mod objectstore;
use std::collections::HashMap;
use std::task::Poll;
-use std::{boxed::Box, ptr::NonNull, sync::Arc};
+use std::{boxed::Box, sync::Arc};
use crate::errors::{try_unwrap_or_throw, CometError};
-use arrow::ffi::FFI_ArrowArray;
-
/// JNI exposed methods
use jni::JNIEnv;
use jni::{
- objects::{GlobalRef, JByteBuffer, JClass},
+ objects::{GlobalRef, JClass},
sys::{jboolean, jint, jlong},
};
@@ -60,7 +58,7 @@ use
crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACT
use crate::parquet::parquet_exec::init_datasource_exec;
use crate::parquet::parquet_support::prepare_object_store_with_configs;
use arrow::array::{Array, RecordBatch};
-use arrow::buffer::{Buffer, MutableBuffer};
+use arrow::buffer::MutableBuffer;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::ExecutionPlan;
@@ -75,7 +73,6 @@ use util::jni::{convert_column_descriptor, convert_encoding,
deserialize_schema}
/// Parquet read context maintained across multiple JNI calls.
struct Context {
pub column_reader: ColumnReader,
- last_data_page: Option<GlobalRef>,
}
#[no_mangle]
@@ -132,7 +129,6 @@ pub extern "system" fn
Java_org_apache_comet_parquet_Native_initColumnReader(
use_decimal_128 != 0,
use_legacy_date_timestamp != 0,
),
- last_data_page: None,
};
let res = Box::new(ctx);
Ok(Box::into_raw(res) as i64)
@@ -193,44 +189,6 @@ pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setPageV1(
})
}
-/// # Safety
-/// This function is inheritly unsafe since it deals with raw pointers passed
from JNI.
-#[no_mangle]
-pub unsafe extern "system" fn
Java_org_apache_comet_parquet_Native_setPageBufferV1(
- e: JNIEnv,
- _jclass: JClass,
- handle: jlong,
- page_value_count: jint,
- buffer: JByteBuffer,
- value_encoding: jint,
-) {
- try_unwrap_or_throw(&e, |env| {
- let ctx = get_context(handle)?;
- let reader = &mut ctx.column_reader;
-
- // convert value encoding ordinal to the native encoding definition
- let encoding = convert_encoding(value_encoding);
-
- // Convert the page to global reference so it won't get GC'd by Java.
Also free the last
- // page if there is any.
- ctx.last_data_page = Some(env.new_global_ref(&buffer)?);
-
- let buf_slice = env.get_direct_buffer_address(&buffer)?;
- let buf_capacity = env.get_direct_buffer_capacity(&buffer)?;
-
- unsafe {
- let page_ptr = NonNull::new_unchecked(buf_slice);
- let buffer = Buffer::from_custom_allocation(
- page_ptr,
- buf_capacity,
- Arc::new(FFI_ArrowArray::empty()),
- );
- reader.set_page_v1(page_value_count as usize, buffer, encoding);
- }
- Ok(())
- })
-}
-
/// # Safety
/// This function is inheritly unsafe since it deals with raw pointers passed
from JNI.
#[no_mangle]
diff --git
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index 7874f3774..c2728fcab 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -201,9 +201,6 @@ object CometParquetFileFormat extends Logging with
ShimSQLConf {
sqlConf.isParquetINT96AsTimestamp)
// Comet specific configs
- hadoopConf.setBoolean(
- CometConf.COMET_PARQUET_ENABLE_DIRECT_BUFFER.key,
- CometConf.COMET_PARQUET_ENABLE_DIRECT_BUFFER.get())
hadoopConf.setBoolean(
CometConf.COMET_USE_DECIMAL_128.key,
CometConf.COMET_USE_DECIMAL_128.get())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]