andygrove commented on code in PR #3731:
URL: https://github.com/apache/datafusion-comet/pull/3731#discussion_r2968475909


##########
native/core/src/execution/operators/shuffle_scan.rs:
##########
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+    errors::CometError,
+    execution::{
+        operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID,
+        shuffle::codec::read_ipc_compressed,
+    },
+    jvm_bridge::{jni_call, JVMClasses},
+};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::common::{arrow_datafusion_err, Result as DataFusionResult};
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::metrics::{
+    BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
+};
+use datafusion::{
+    execution::TaskContext,
+    physical_expr::*,
+    physical_plan::{ExecutionPlan, *},
+};
+use futures::Stream;
+use jni::objects::{GlobalRef, JByteBuffer, JObject};
+use std::{
+    any::Any,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+use super::scan::InputBatch;
+
+/// ShuffleScanExec reads compressed shuffle blocks from JVM via JNI and 
decodes them natively.
+/// Unlike ScanExec which receives Arrow arrays via FFI, ShuffleScanExec 
receives raw compressed
+/// bytes from CometShuffleBlockIterator and decodes them using 
read_ipc_compressed().
+#[derive(Debug, Clone)]
+pub struct ShuffleScanExec {
+    /// The ID of the execution context that owns this subquery.
+    pub exec_context_id: i64,
+    /// The input source: a global reference to a JVM 
CometShuffleBlockIterator object.
+    pub input_source: Option<Arc<GlobalRef>>,
+    /// The data types of columns in the shuffle output.
+    pub data_types: Vec<DataType>,
+    /// Schema of the shuffle output.
+    pub schema: SchemaRef,
+    /// The current input batch, populated by get_next_batch() before 
poll_next().
+    pub batch: Arc<Mutex<Option<InputBatch>>>,
+    /// Cache of plan properties.
+    cache: PlanProperties,
+    /// Metrics collector.
+    metrics: ExecutionPlanMetricsSet,
+    /// Baseline metrics.
+    baseline_metrics: BaselineMetrics,
+    /// Time spent decoding compressed shuffle blocks.
+    decode_time: Time,
+}
+
+impl ShuffleScanExec {
+    pub fn new(
+        exec_context_id: i64,
+        input_source: Option<Arc<GlobalRef>>,
+        data_types: Vec<DataType>,
+    ) -> Result<Self, CometError> {
+        let metrics_set = ExecutionPlanMetricsSet::default();
+        let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
+        let decode_time = 
MetricBuilder::new(&metrics_set).subset_time("decode_time", 0);
+
+        let schema = schema_from_data_types(&data_types);
+
+        let cache = PlanProperties::new(
+            EquivalenceProperties::new(Arc::clone(&schema)),
+            Partitioning::UnknownPartitioning(1),
+            EmissionType::Final,
+            Boundedness::Bounded,
+        );
+
+        Ok(Self {
+            exec_context_id,
+            input_source,
+            data_types,
+            batch: Arc::new(Mutex::new(None)),
+            cache,
+            metrics: metrics_set,
+            baseline_metrics,
+            schema,
+            decode_time,
+        })
+    }
+
+    /// Feeds input batch into this scan. Only used in unit tests.
+    pub fn set_input_batch(&mut self, input: InputBatch) {
+        *self.batch.try_lock().unwrap() = Some(input);
+    }
+
+    /// Pull next input batch from JVM. Called externally before poll_next()
+    /// because JNI calls cannot happen from within poll_next on tokio threads.
+    pub fn get_next_batch(&mut self) -> Result<(), CometError> {
+        if self.input_source.is_none() {
+            // Unit test mode - no JNI calls needed.
+            return Ok(());
+        }
+        let mut timer = self.baseline_metrics.elapsed_compute().timer();
+
+        let mut current_batch = self.batch.try_lock().unwrap();
+        if current_batch.is_none() {
+            let next_batch = Self::get_next(
+                self.exec_context_id,
+                self.input_source.as_ref().unwrap().as_obj(),
+                &self.data_types,
+                &self.decode_time,
+            )?;
+            *current_batch = Some(next_batch);
+        }
+
+        timer.stop();
+
+        Ok(())
+    }
+
+    /// Invokes JNI calls to get the next compressed shuffle block and decode 
it.
+    fn get_next(
+        exec_context_id: i64,
+        iter: &JObject,
+        data_types: &[DataType],
+        decode_time: &Time,
+    ) -> Result<InputBatch, CometError> {
+        if exec_context_id == TEST_EXEC_CONTEXT_ID {
+            return Ok(InputBatch::EOF);
+        }
+
+        if iter.is_null() {
+            return Err(CometError::from(ExecutionError::GeneralError(format!(
+                "Null shuffle block iterator object. Plan id: 
{exec_context_id}"
+            ))));
+        }
+
+        let mut env = JVMClasses::get_env()?;

Review Comment:
   `get_env` calls `attach_current_thread`, which is a no-op for 
already-attached threads, so the overhead is minimal AFAIK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to