Copilot commented on code in PR #522:
URL: https://github.com/apache/sedona-db/pull/522#discussion_r2697259827


##########
rust/sedona-spatial-join/src/index/build_side_collector.rs:
##########
@@ -133,39 +130,65 @@ impl BuildSideBatchesCollector {
         streams: Vec<SendableRecordBatchStream>,
         reservations: Vec<MemoryReservation>,
         metrics_vec: Vec<CollectBuildSideMetrics>,
+        concurrent: bool,
     ) -> Result<Vec<BuildPartition>> {
         if streams.is_empty() {
             return Ok(vec![]);
         }
 
-        // Spawn all tasks to scan all build streams concurrently
-        let mut join_set = JoinSet::new();
-        for (partition_id, ((stream, metrics), reservation)) in streams
-            .into_iter()
-            .zip(metrics_vec)
-            .zip(reservations)
-            .enumerate()
-        {
-            let collector = self.clone();
-            join_set.spawn(async move {
-                let result = collector.collect(stream, reservation, 
&metrics).await;
-                (partition_id, result)
-            });
-        }
+        if concurrent {

Review Comment:
   The concurrent parameter is added to collect_all but the logic branches 
between concurrent and sequential collection. Consider splitting this into two 
separate methods (collect_all_concurrent and collect_all_sequential) to make 
the API clearer and avoid the conditional branching.



##########
rust/sedona-spatial-join/src/utils/arrow_utils.rs:
##########
@@ -15,10 +15,149 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::{Array, ArrayData, RecordBatch};
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayData, BinaryViewArray, ListArray, RecordBatch, 
StringViewArray};
+use arrow_array::make_array;
 use arrow_array::ArrayRef;
+use arrow_array::StructArray;
 use arrow_schema::{ArrowError, DataType};
 use datafusion_common::Result;
+use sedona_common::sedona_internal_err;
+
+/// Reconstruct `batch` to organize the payload buffers of each 
`StringViewArray` and
+/// `BinaryViewArray` in sequential order by calling `gc()` on them.
+///
+/// Note this is a workaround until 
<https://github.com/apache/arrow-rs/issues/7185> is
+/// available.
+///
+/// # Rationale
+///
+/// The `interleave` kernel does not reconstruct the inner buffers of view 
arrays by default,
+/// leading to non-sequential payload locations. A single payload buffer might 
be shared by
+/// multiple `RecordBatch`es or multiple rows in the same batch might 
reference scattered
+/// locations in a large buffer.
+///
+/// When writing each batch to disk, the writer has to write all referenced 
buffers. This
+/// causes extra disk reads and writes, and potentially execution failure 
(e.g. No space left
+/// on device).
+///
+/// # Example
+///
+/// Before interleaving:
+/// batch1 -> buffer1 (large)
+/// batch2 -> buffer2 (large)
+///
+/// interleaved_batch -> buffer1 (sparse access)
+///                   -> buffer2 (sparse access)
+///
+/// Then when spilling the interleaved batch, the writer has to write both 
buffer1 and buffer2
+/// entirely, even if only a few bytes are used.
+pub(crate) fn compact_batch(batch: RecordBatch) -> Result<RecordBatch> {
+    let mut new_columns: Vec<Arc<dyn Array>> = 
Vec::with_capacity(batch.num_columns());
+    let mut arr_mutated = false;
+
+    for array in batch.columns() {
+        let (new_array, mutated) = compact_array(Arc::clone(array))?;
+        new_columns.push(new_array);
+        arr_mutated |= mutated;
+    }
+
+    if arr_mutated {
+        Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
+    } else {
+        Ok(batch)
+    }
+}
+
+/// Recursively compacts view arrays in `array` by calling `gc()` on them.
+/// Returns a tuple of the potentially new array and a boolean indicating
+/// whether any compaction was performed.
+pub(crate) fn compact_array(array: ArrayRef) -> Result<(ArrayRef, bool)> {
+    if let Some(view_array) = array.as_any().downcast_ref::<StringViewArray>() 
{
+        return Ok((Arc::new(view_array.gc()), true));
+    }
+    if let Some(view_array) = array.as_any().downcast_ref::<BinaryViewArray>() 
{
+        return Ok((Arc::new(view_array.gc()), true));
+    }
+
+    // Fast path for non-nested arrays
+    if !array.data_type().is_nested() {
+        return Ok((array, false));
+    }
+
+    // Avoid ArrayData -> ArrayRef roundtrips for commonly used data types,
+    // including StructArray and ListArray.
+
+    if let Some(struct_array) = array.as_any().downcast_ref::<StructArray>() {
+        let mut mutated = false;
+        let mut new_columns: Vec<ArrayRef> = 
Vec::with_capacity(struct_array.num_columns());
+        for col in struct_array.columns() {
+            let (new_col, col_mutated) = compact_array(Arc::clone(col))?;
+            mutated |= col_mutated;
+            new_columns.push(new_col);
+        }
+
+        if !mutated {
+            return Ok((array, false));
+        }
+
+        let rebuilt = StructArray::new(
+            struct_array.fields().clone(),
+            new_columns,
+            struct_array.nulls().cloned(),
+        );
+        return Ok((Arc::new(rebuilt), true));
+    }
+
+    if let Some(list_array) = array.as_any().downcast_ref::<ListArray>() {
+        let (new_values, mutated) = 
compact_array(list_array.values().clone())?;
+        if !mutated {
+            return Ok((array, false));
+        }
+
+        let DataType::List(field) = list_array.data_type() else {
+            // Defensive: this downcast should only succeed for DataType::List.
+            return sedona_internal_err!(
+                "ListArray has non-List data type: {:?}",

Review Comment:
   The error message states 'ListArray has non-List data type' but this 
scenario should never occur since the downcast_ref on line 113 only succeeds 
for ListArray types. Consider clarifying that this is a defensive check or use 
unreachable!() if this is truly impossible.
   ```suggestion
               unreachable!(
                   "ListArray downcast succeeded but data_type is not 
DataType::List: {:?}",
   ```



##########
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs:
##########
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::VecDeque,
+    iter,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow_array::RecordBatch;
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::{DataFusionError, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::{
+    disk_manager::RefCountedTempFile, RecordBatchStream, 
SendableRecordBatchStream,
+};
+use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
+use futures::{FutureExt, StreamExt};
+use pin_project_lite::pin_project;
+
+use crate::evaluated_batch::{
+    evaluated_batch_stream::EvaluatedBatchStream,
+    spill::{
+        spilled_batch_to_evaluated_batch, spilled_schema_to_evaluated_schema,
+        EvaluatedBatchSpillReader,
+    },
+    EvaluatedBatch,
+};
+
+const RECORD_BATCH_CHANNEL_CAPACITY: usize = 2;
+
+pin_project! {
+    /// Streams [`EvaluatedBatch`] values read back from on-disk spill files.
+    ///
+    /// This stream is intended for the “spilled” path where batches have been 
written to disk and
+    /// must be read back into memory. It wraps an 
[`ExternalRecordBatchStream`] and uses
+    /// background tasks to prefetch/forward batches so downstream operators 
can process a batch
+    /// while the next one is being loaded.
+    pub struct ExternalEvaluatedBatchStream {
+        #[pin]
+        inner: RecordBatchToEvaluatedStream,
+        schema: SchemaRef,
+    }
+}
+
+enum State {
+    AwaitingFile,
+    Opening(SpawnedTask<Result<EvaluatedBatchSpillReader>>),
+    Reading(SpawnedTask<(EvaluatedBatchSpillReader, 
Option<Result<RecordBatch>>)>),
+    Finished,
+}
+
+impl ExternalEvaluatedBatchStream {
+    /// Creates an external stream from a single spill file.
+    pub fn try_from_spill_file(spill_file: Arc<RefCountedTempFile>) -> 
Result<Self> {
+        let record_stream =
+            
ExternalRecordBatchStream::try_from_spill_files(iter::once(spill_file))?;
+        let evaluated_stream =
+            
RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?;
+        let schema = evaluated_stream.schema();
+        Ok(Self {
+            inner: evaluated_stream,
+            schema,
+        })
+    }
+
+    /// Creates an external stream from multiple spill files.
+    ///
+    /// The stream yields the batches from each file in order. When 
`spill_files` is empty the
+    /// stream is empty (returns `None` immediately) and no schema validation 
is performed.
+    pub fn try_from_spill_files<I>(schema: SchemaRef, spill_files: I) -> 
Result<Self>
+    where
+        I: IntoIterator<Item = Arc<RefCountedTempFile>>,
+    {
+        let record_stream = 
ExternalRecordBatchStream::try_from_spill_files(spill_files)?;
+        if !record_stream.is_empty() {
+            // `ExternalRecordBatchStream` only has a meaningful schema when 
at least one spill
+            // file is provided. In that case, validate that the 
caller-provided evaluated schema
+            // matches what would be derived from the spilled schema.
+            let actual_schema = 
spilled_schema_to_evaluated_schema(&record_stream.schema())?;
+            assert_eq!(schema, actual_schema);
+        }
+        let evaluated_stream =
+            
RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?;
+        Ok(Self {
+            inner: evaluated_stream,
+            schema,
+        })
+    }

Review Comment:
   Using assert_eq! in production code will panic on schema mismatch. This 
should return a proper Result with a descriptive error message instead of 
causing a panic.
   ```suggestion
               if schema != actual_schema {
                   return Err(DataFusionError::Execution(format!(
                       "Schema mismatch when creating 
ExternalEvaluatedBatchStream: \
   ```



##########
rust/sedona-spatial-join/src/utils/spill.rs:
##########
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fs::File, io::BufReader, sync::Arc};
+
+use arrow::ipc::{
+    reader::StreamReader,
+    writer::{IpcWriteOptions, StreamWriter},
+};
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
+use datafusion_physical_plan::metrics::SpillMetrics;
+
+use crate::utils::arrow_utils::{compact_batch, get_record_batch_memory_size};
+
+/// Generic Arrow IPC stream spill writer for [`RecordBatch`].
+///
+/// Shared between multiple components so spill metrics are updated 
consistently.
+pub(crate) struct RecordBatchSpillWriter {
+    in_progress_file: RefCountedTempFile,
+    writer: StreamWriter<File>,
+    metrics: SpillMetrics,
+    batch_size_threshold: Option<usize>,
+}
+
+impl RecordBatchSpillWriter {
+    pub fn try_new(
+        env: Arc<RuntimeEnv>,
+        schema: SchemaRef,
+        request_description: &str,
+        compression: SpillCompression,
+        metrics: SpillMetrics,
+        batch_size_threshold: Option<usize>,
+    ) -> Result<Self> {
+        let in_progress_file = 
env.disk_manager.create_tmp_file(request_description)?;
+        let file = File::create(in_progress_file.path())?;
+
+        let mut write_options = IpcWriteOptions::default();
+        write_options = 
write_options.try_with_compression(compression.into())?;
+
+        let writer = StreamWriter::try_new_with_options(file, schema.as_ref(), 
write_options)?;
+        metrics.spill_file_count.add(1);
+
+        Ok(Self {
+            in_progress_file,
+            writer,
+            metrics,
+            batch_size_threshold,
+        })
+    }
+
+    /// Write a record batch to the spill file.
+    ///
+    /// If `batch_size_threshold` is configured and the in-memory size of the 
batch exceeds the
+    /// threshold, this will automatically split the batch into smaller slices 
and (optionally)
+    /// compact each slice before writing.
+    pub fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
+        let num_rows = batch.num_rows();
+        if num_rows == 0 {
+            // Preserve "empty batch" semantics: callers may rely on spilling 
and reading back a
+            // zero-row batch (e.g. as a sentinel for an empty stream).
+            return self.write_one_batch(batch);
+        }
+
+        let rows_per_split = self.calculate_rows_per_split(batch, num_rows)?;
+        if rows_per_split < num_rows {
+            let mut offset = 0;
+            while offset < num_rows {
+                let length = std::cmp::min(rows_per_split, num_rows - offset);
+                let slice = batch.slice(offset, length);
+                let compacted = compact_batch(slice)?;
+                self.write_one_batch(&compacted)?;
+                offset += length;
+            }
+        } else {
+            self.write_one_batch(batch)?;
+        }
+        Ok(())
+    }
+
+    fn calculate_rows_per_split(&self, batch: &RecordBatch, num_rows: usize) 
-> Result<usize> {
+        let Some(threshold) = self.batch_size_threshold else {
+            return Ok(num_rows);
+        };
+        if threshold == 0 {
+            return Ok(num_rows);
+        }
+
+        let batch_size = get_record_batch_memory_size(batch)?;
+        if batch_size <= threshold {
+            return Ok(num_rows);
+        }
+
+        let num_splits = batch_size.div_ceil(threshold);
+        let rows = num_rows.div_ceil(num_splits);
+        Ok(std::cmp::max(1, rows))
+    }
+
+    fn write_one_batch(&mut self, batch: &RecordBatch) -> Result<()> {
+        self.writer.write(batch).map_err(|e| {
+            DataFusionError::Execution(format!(
+                "Failed to write RecordBatch to spill file {:?}: {}",
+                self.in_progress_file.path(),
+                e
+            ))
+        })?;
+
+        self.metrics.spilled_rows.add(batch.num_rows());
+        self.metrics
+            .spilled_bytes
+            .add(get_record_batch_memory_size(batch)?);
+
+        Ok(())
+    }
+
+    pub fn finish(mut self) -> Result<RefCountedTempFile> {
+        self.writer.finish()?;
+
+        let mut in_progress_file = self.in_progress_file;
+        in_progress_file.update_disk_usage()?;
+        let size = in_progress_file.current_disk_usage();
+        self.metrics.spilled_bytes.add(size as usize);

Review Comment:
   The spilled bytes metric is being double-counted. Line 128 already adds the 
memory size of the batch via `get_record_batch_memory_size`, and line 139 adds 
the on-disk file size. This results in inflated spilled_bytes metrics that 
don't accurately represent either memory or disk usage.
   ```suggestion
   
   ```



##########
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs:
##########
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow_array::RecordBatch;
+use arrow_schema::{DataType, SchemaRef};
+use datafusion_common::Result;
+use datafusion_physical_plan::{metrics, SendableRecordBatchStream};
+use futures::{Stream, StreamExt};
+
+use crate::evaluated_batch::{
+    evaluated_batch_stream::{EvaluatedBatchStream, 
SendableEvaluatedBatchStream},
+    EvaluatedBatch,
+};
+use crate::operand_evaluator::{EvaluatedGeometryArray, OperandEvaluator};
+use crate::utils::arrow_utils::compact_batch;
+
+/// An evaluator that can evaluate geometry expressions on record batches
+/// and produces evaluated geometry arrays.
+trait Evaluator: Unpin {
+    fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray>;
+}
+
+/// An evaluator for build-side geometry expressions.
+struct BuildSideEvaluator {
+    evaluator: Arc<dyn OperandEvaluator>,
+}
+
+impl Evaluator for BuildSideEvaluator {
+    fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
+        self.evaluator.evaluate_build(batch)
+    }
+}
+
+/// An evaluator for probe-side geometry expressions.
+struct ProbeSideEvaluator {
+    evaluator: Arc<dyn OperandEvaluator>,
+}
+
+impl Evaluator for ProbeSideEvaluator {
+    fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
+        self.evaluator.evaluate_probe(batch)
+    }
+}
+
+/// Wraps a `SendableRecordBatchStream` and evaluates the probe-side geometry
+/// expression eagerly so downstream consumers can operate on 
`EvaluatedBatch`s.
+struct EvaluateOperandBatchStream<E: Evaluator> {
+    inner: SendableRecordBatchStream,
+    evaluator: E,
+    evaluation_time: metrics::Time,
+    gc_view_arrays: bool,
+}
+
+impl<E: Evaluator> EvaluateOperandBatchStream<E> {
+    fn new(
+        inner: SendableRecordBatchStream,
+        evaluator: E,
+        evaluation_time: metrics::Time,
+        gc_view_arrays: bool,
+    ) -> Self {
+        let gc_view_arrays = gc_view_arrays && 
schema_contains_view_types(&inner.schema());

Review Comment:
   The schema_contains_view_types check uses flattened_fields().iter() which 
traverses the entire schema tree on every stream construction. Consider caching 
this result or performing the check once during evaluator creation.



##########
rust/sedona-spatial-join/src/evaluated_batch/spill.rs:
##########
@@ -0,0 +1,794 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::Float64Array;
+use arrow_array::{Array, RecordBatch, StructArray};
+use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::SpillMetrics;
+use sedona_common::sedona_internal_err;
+use sedona_schema::datatypes::SedonaType;
+
+use crate::{
+    evaluated_batch::EvaluatedBatch,
+    operand_evaluator::EvaluatedGeometryArray,
+    utils::spill::{RecordBatchSpillReader, RecordBatchSpillWriter},
+};
+
+/// Writer for spilling evaluated batches to disk
+pub struct EvaluatedBatchSpillWriter {
+    /// The temporary spill file being written to
+    inner: RecordBatchSpillWriter,
+
+    /// Schema of the spilled record batches. It is augmented from the schema 
of original record batches
+    /// The spill_schema has 4 fields:
+    /// * `data`: StructArray containing the original record batch columns
+    /// * `geom`: geometry array in storage format
+    /// * `dist`: distance field
+    spill_schema: Schema,
+    /// Inner fields of the "data" StructArray in the spilled record batches
+    data_inner_fields: Fields,
+}
+
+impl EvaluatedBatchSpillWriter {
+    /// Create a new SpillWriter
+    pub fn try_new(
+        env: Arc<RuntimeEnv>,
+        schema: SchemaRef,
+        sedona_type: &SedonaType,
+        request_description: &str,
+        compression: SpillCompression,
+        metrics: SpillMetrics,
+        batch_size_threshold: Option<usize>,
+    ) -> Result<Self> {
+        // Construct schema of record batches to be written. The written 
batches is augmented from the original record batches.
+        let data_inner_fields = schema.fields().clone();
+        let data_struct_field =
+            Field::new("data", DataType::Struct(data_inner_fields.clone()), 
false);
+        let geom_field = sedona_type.to_storage_field("geom", true)?;
+        let dist_field = Field::new("dist", DataType::Float64, true);
+        let spill_schema = Schema::new(vec![data_struct_field, geom_field, 
dist_field]);
+
+        // Create spill file
+        let inner = RecordBatchSpillWriter::try_new(
+            env,
+            Arc::new(spill_schema.clone()),
+            request_description,
+            compression,
+            metrics,
+            batch_size_threshold,
+        )?;
+
+        Ok(Self {
+            inner,
+            spill_schema,
+            data_inner_fields,
+        })
+    }
+
+    /// Append an EvaluatedBatch to the spill file
+    pub fn append(&mut self, evaluated_batch: &EvaluatedBatch) -> Result<()> {
+        let record_batch = self.spilled_record_batch(evaluated_batch)?;
+
+        // Splitting/compaction and spill bytes/rows metrics are handled by 
`RecordBatchSpillWriter`.
+        self.inner.write_batch(&record_batch)?;
+        Ok(())
+    }
+
+    /// Finish writing and return the temporary file
+    pub fn finish(self) -> Result<RefCountedTempFile> {
+        self.inner.finish()
+    }
+
+    fn spilled_record_batch(&self, evaluated_batch: &EvaluatedBatch) -> 
Result<RecordBatch> {
+        let num_rows = evaluated_batch.num_rows();
+
+        // Store the original data batch into a StructArray
+        let data_batch = &evaluated_batch.batch;
+        let data_arrays = data_batch.columns().to_vec();
+        let data_struct_array =
+            StructArray::try_new(self.data_inner_fields.clone(), data_arrays, 
None)?;
+
+        // Store dist into a Float64Array
+        let mut dist_builder = 
arrow::array::Float64Builder::with_capacity(num_rows);
+        let geom_array = &evaluated_batch.geom_array;
+        match &geom_array.distance {
+            Some(ColumnarValue::Scalar(scalar)) => match scalar {
+                ScalarValue::Float64(dist_value) => {
+                    for _ in 0..num_rows {
+                        dist_builder.append_option(*dist_value);
+                    }
+                }
+                _ => {
+                    return Err(DataFusionError::Internal(
+                        "Distance columnar value is not a 
Float64Array".to_string(),
+                    ));
+                }
+            },
+            Some(ColumnarValue::Array(array)) => {
+                let float_array = array
+                    .as_any()
+                    .downcast_ref::<arrow::array::Float64Array>()
+                    .unwrap();
+                dist_builder.append_array(float_array);
+            }
+            None => {
+                for _ in 0..num_rows {
+                    dist_builder.append_null();
+                }
+            }
+        }
+        let dist_array = dist_builder.finish();
+
+        // Assemble the final spilled RecordBatch
+        let columns = vec![
+            Arc::new(data_struct_array) as Arc<dyn arrow::array::Array>,
+            Arc::clone(&geom_array.geometry_array),
+            Arc::new(dist_array) as Arc<dyn arrow::array::Array>,
+        ];
+        let spilled_record_batch =
+            RecordBatch::try_new(Arc::new(self.spill_schema.clone()), 
columns)?;
+        Ok(spilled_record_batch)
+    }
+}
+/// Reader for reading spilled evaluated batches from disk
+pub struct EvaluatedBatchSpillReader {
+    inner: RecordBatchSpillReader,
+}
+impl EvaluatedBatchSpillReader {
+    /// Create a new SpillReader
+    pub fn try_new(temp_file: &RefCountedTempFile) -> Result<Self> {
+        Ok(Self {
+            inner: RecordBatchSpillReader::try_new(temp_file)?,
+        })
+    }
+
+    /// Get the schema of the spilled data
+    pub fn schema(&self) -> SchemaRef {
+        self.inner.schema()
+    }
+
+    /// Read the next EvaluatedBatch from the spill file
+    #[allow(unused)]
+    pub fn next_batch(&mut self) -> Option<Result<EvaluatedBatch>> {
+        self.next_raw_batch()
+            .map(|record_batch| 
record_batch.and_then(spilled_batch_to_evaluated_batch))
+    }
+
+    /// Read the next raw RecordBatch from the spill file
+    pub fn next_raw_batch(&mut self) -> Option<Result<RecordBatch>> {
+        self.inner.next_batch()
+    }
+}
+
+pub(crate) fn spilled_batch_to_evaluated_batch(
+    record_batch: RecordBatch,
+) -> Result<EvaluatedBatch> {
+    // Extract the data struct array (column 0) and convert back to the 
original RecordBatch
+    let data_array = record_batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<StructArray>()
+        .ok_or_else(|| {
+            DataFusionError::Internal("Expected data column to be a 
StructArray".to_string())
+        })?;
+
+    let data_schema = Arc::new(Schema::new(match data_array.data_type() {
+        DataType::Struct(fields) => fields.clone(),
+        _ => {
+            return Err(DataFusionError::Internal(
+                "Expected data column to have Struct data type".to_string(),
+            ))
+        }
+    }));
+
+    let data_columns = (0..data_array.num_columns())
+        .map(|i| Arc::clone(data_array.column(i)))
+        .collect::<Vec<_>>();
+
+    let batch = RecordBatch::try_new(data_schema, data_columns)?;
+
+    // Extract the geometry array (column 1)
+    let geom_array = Arc::clone(record_batch.column(1));
+
+    // Determine the SedonaType from the geometry field in the record batch 
schema
+    let schema = record_batch.schema();
+    let geom_field = schema.field(1);
+    let sedona_type = SedonaType::from_storage_field(geom_field)?;

Review Comment:
   The hardcoded index '1' for the geometry field is fragile. If the spill 
schema structure changes (e.g., fields are reordered), this will silently 
break. Consider using a named constant or field lookup by name to make this 
more maintainable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to