Kontinuation commented on code in PR #522:
URL: https://github.com/apache/sedona-db/pull/522#discussion_r2697292175


##########
rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs:
##########
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::VecDeque,
+    iter,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow_array::RecordBatch;
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::{DataFusionError, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::{
+    disk_manager::RefCountedTempFile, RecordBatchStream, 
SendableRecordBatchStream,
+};
+use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
+use futures::{FutureExt, StreamExt};
+use pin_project_lite::pin_project;
+
+use crate::evaluated_batch::{
+    evaluated_batch_stream::EvaluatedBatchStream,
+    spill::{
+        spilled_batch_to_evaluated_batch, spilled_schema_to_evaluated_schema,
+        EvaluatedBatchSpillReader,
+    },
+    EvaluatedBatch,
+};
+
+const RECORD_BATCH_CHANNEL_CAPACITY: usize = 2;
+
+pin_project! {
+    /// Streams [`EvaluatedBatch`] values read back from on-disk spill files.
+    ///
+    /// This stream is intended for the “spilled” path where batches have been 
written to disk and
+    /// must be read back into memory. It wraps an 
[`ExternalRecordBatchStream`] and uses
+    /// background tasks to prefetch/forward batches so downstream operators 
can process a batch
+    /// while the next one is being loaded.
+    pub struct ExternalEvaluatedBatchStream {
+        #[pin]
+        inner: RecordBatchToEvaluatedStream,
+        schema: SchemaRef,
+    }
+}
+
+enum State {
+    AwaitingFile,
+    Opening(SpawnedTask<Result<EvaluatedBatchSpillReader>>),
+    Reading(SpawnedTask<(EvaluatedBatchSpillReader, 
Option<Result<RecordBatch>>)>),
+    Finished,
+}
+
+impl ExternalEvaluatedBatchStream {
+    /// Creates an external stream from a single spill file.
+    pub fn try_from_spill_file(spill_file: Arc<RefCountedTempFile>) -> 
Result<Self> {
+        let record_stream =
+            
ExternalRecordBatchStream::try_from_spill_files(iter::once(spill_file))?;
+        let evaluated_stream =
+            
RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?;
+        let schema = evaluated_stream.schema();
+        Ok(Self {
+            inner: evaluated_stream,
+            schema,
+        })
+    }
+
+    /// Creates an external stream from multiple spill files.
+    ///
+    /// The stream yields the batches from each file in order. When 
`spill_files` is empty the
+    /// stream is empty (returns `None` immediately) and no schema validation 
is performed.
+    pub fn try_from_spill_files<I>(schema: SchemaRef, spill_files: I) -> 
Result<Self>
+    where
+        I: IntoIterator<Item = Arc<RefCountedTempFile>>,
+    {
+        let record_stream = 
ExternalRecordBatchStream::try_from_spill_files(spill_files)?;
+        if !record_stream.is_empty() {
+            // `ExternalRecordBatchStream` only has a meaningful schema when 
at least one spill
+            // file is provided. In that case, validate that the 
caller-provided evaluated schema
+            // matches what would be derived from the spilled schema.
+            let actual_schema = 
spilled_schema_to_evaluated_schema(&record_stream.schema())?;
+            assert_eq!(schema, actual_schema);
+        }
+        let evaluated_stream =
+            
RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?;
+        Ok(Self {
+            inner: evaluated_stream,
+            schema,
+        })
+    }

Review Comment:
   Switched to return sedona_internal_err!.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to