pwrliang commented on code in PR #465:
URL: https://github.com/apache/sedona-db/pull/465#discussion_r2696516037


##########
rust/sedona-spatial-join-gpu/tests/gpu_functional_test.rs:
##########
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! GPU Functional Tests
+//!
+//! These tests require actual GPU hardware and CUDA toolkit.
+//! They verify the correctness and performance of actual GPU computation.
+//!
+//! **Prerequisites:**
+//! - CUDA-capable GPU (compute capability 6.0+)
+//! - CUDA Toolkit 11.0+ installed
+//! - Linux or Windows OS
+//! - Build with --features gpu
+//!
+//! **Running:**
+//! ```bash
+//! # Run all GPU functional tests
+//! cargo test --package sedona-spatial-join-gpu --features gpu 
gpu_functional_tests
+//!
+//! # Run ignored tests (requires GPU)
+//! cargo test --package sedona-spatial-join-gpu --features gpu -- --ignored
+//! ```
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::reader::StreamReader;
+use arrow_array::{Int32Array, RecordBatch};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::StreamExt;
+use sedona_spatial_join_gpu::{
+    GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    SpatialPredicate,
+};
+use std::fs::File;
+use std::sync::Arc;
+
+/// Helper to create test geometry data
+#[allow(dead_code)]
+fn create_point_wkb(x: f64, y: f64) -> Vec<u8> {
+    let mut wkb = vec![0x01, 0x01, 0x00, 0x00, 0x00]; // Little endian point 
type
+    wkb.extend_from_slice(&x.to_le_bytes());
+    wkb.extend_from_slice(&y.to_le_bytes());
+    wkb
+}
+
+/// Check if GPU is actually available
+fn is_gpu_available() -> bool {
+    use sedona_libgpuspatial::GpuSpatialContext;
+
+    match GpuSpatialContext::new() {
+        Ok(mut ctx) => ctx.init().is_ok(),
+        Err(_) => false,
+    }
+}
+
+/// Mock execution plan that produces geometry data
+#[allow(dead_code)]
+struct GeometryDataExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+}
+
+#[allow(dead_code)]
+impl GeometryDataExec {
+    fn new(ids: Vec<i32>, geometries: Vec<Vec<u8>>) -> Self {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("geometry", DataType::Binary, false),
+        ]));
+
+        let id_array = Int32Array::from(ids);
+
+        // Build BinaryArray using builder to avoid lifetime issues
+        let mut builder = arrow_array::builder::BinaryBuilder::new();
+        for geom in geometries {
+            builder.append_value(&geom);
+        }
+        let geom_array = builder.finish();
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(id_array), Arc::new(geom_array)],
+        )
+        .unwrap();
+
+        Self { schema, batch }
+    }
+}
+
+impl std::fmt::Debug for GeometryDataExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "GeometryDataExec")
+    }
+}
+
+impl datafusion::physical_plan::DisplayAs for GeometryDataExec {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "GeometryDataExec")
+    }
+}
+
+impl ExecutionPlan for GeometryDataExec {
+    fn name(&self) -> &str {
+        "GeometryDataExec"
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+        unimplemented!("properties not needed for test")
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> 
datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> 
{
+        use datafusion::physical_plan::{RecordBatchStream, 
SendableRecordBatchStream};
+        use futures::Stream;
+        use std::pin::Pin;
+        use std::task::{Context, Poll};
+
+        struct SingleBatchStream {
+            schema: Arc<Schema>,
+            batch: Option<RecordBatch>,
+        }
+
+        impl Stream for SingleBatchStream {
+            type Item = datafusion_common::Result<RecordBatch>;
+
+            fn poll_next(
+                mut self: Pin<&mut Self>,
+                _cx: &mut Context<'_>,
+            ) -> Poll<Option<Self::Item>> {
+                Poll::Ready(self.batch.take().map(Ok))
+            }
+        }
+
+        impl RecordBatchStream for SingleBatchStream {
+            fn schema(&self) -> Arc<Schema> {
+                self.schema.clone()
+            }
+        }
+
+        Ok(Box::pin(SingleBatchStream {
+            schema: self.schema.clone(),
+            batch: Some(self.batch.clone()),
+        }) as SendableRecordBatchStream)
+    }
+}
+
+#[tokio::test]
+#[ignore] // Requires GPU hardware
+async fn test_gpu_spatial_join_basic_correctness() {
+    let _ = env_logger::builder().is_test(true).try_init();
+
+    if !is_gpu_available() {
+        log::warn!("GPU not available, skipping test");
+        return;
+    }
+
+    let test_data_dir = concat!(
+        env!("CARGO_MANIFEST_DIR"),
+        "/../../c/sedona-libgpuspatial/libgpuspatial/test_data"
+    );
+    let points_path = format!("{}/test_points.arrows", test_data_dir);
+    let polygons_path = format!("{}/test_polygons.arrows", test_data_dir);
+
+    let points_file =
+        File::open(&points_path).unwrap_or_else(|_| panic!("Failed to open 
{}", points_path));
+    let polygons_file =
+        File::open(&polygons_path).unwrap_or_else(|_| panic!("Failed to open 
{}", polygons_path));
+
+    let mut points_reader = StreamReader::try_new(points_file, None).unwrap();
+    let mut polygons_reader = StreamReader::try_new(polygons_file, 
None).unwrap();
+
+    // Process all batches like the CUDA test does
+    let mut total_rows = 0;
+    let mut iteration = 0;
+
+    loop {
+        // Read next batch from each stream
+        let polygons_batch = match polygons_reader.next() {
+            Some(Ok(batch)) => batch,
+            Some(Err(e)) => panic!("Error reading polygons batch: {}", e),
+            None => break, // End of stream
+        };
+
+        let points_batch = match points_reader.next() {
+            Some(Ok(batch)) => batch,
+            Some(Err(e)) => panic!("Error reading points batch: {}", e),
+            None => break, // End of stream
+        };
+
+        if iteration == 0 {
+            log::info!(
+                "Batch {}: {} polygons, {} points",
+                iteration,
+                polygons_batch.num_rows(),
+                points_batch.num_rows()
+            );
+        }
+
+        // Find geometry column index
+        let points_geom_idx = points_batch
+            .schema()
+            .index_of("geometry")
+            .expect("geometry column not found");
+        let polygons_geom_idx = polygons_batch
+            .schema()
+            .index_of("geometry")
+            .expect("geometry column not found");
+
+        // Create execution plans from the batches
+        let left_plan =
+            Arc::new(SingleBatchExec::new(polygons_batch.clone())) as Arc<dyn 
ExecutionPlan>;
+        let right_plan =
+            Arc::new(SingleBatchExec::new(points_batch.clone())) as Arc<dyn 
ExecutionPlan>;
+
+        let config = GpuSpatialJoinConfig {
+            join_type: datafusion::logical_expr::JoinType::Inner,
+            left_geom_column: GeometryColumnInfo {
+                name: "geometry".to_string(),
+                index: polygons_geom_idx,
+            },
+            right_geom_column: GeometryColumnInfo {
+                name: "geometry".to_string(),
+                index: points_geom_idx,
+            },
+            predicate: 
GpuSpatialPredicate::Relation(SpatialPredicate::Intersects),
+            device_id: 0,
+            batch_size: 8192,
+            additional_filters: None,
+            max_memory: None,
+            fallback_to_cpu: false,
+        };
+
+        let gpu_join = Arc::new(GpuSpatialJoinExec::new(left_plan, right_plan, 
config).unwrap());
+        let task_context = Arc::new(TaskContext::default());
+        let mut stream = gpu_join.execute(0, task_context).unwrap();
+
+        while let Some(result) = stream.next().await {
+            match result {
+                Ok(batch) => {
+                    let batch_rows = batch.num_rows();
+                    total_rows += batch_rows;
+                    if batch_rows > 0 && iteration < 5 {
+                        log::debug!(
+                            "Iteration {}: Got {} rows from GPU join",
+                            iteration,
+                            batch_rows
+                        );
+                    }
+                }
+                Err(e) => {
+                    panic!("GPU join failed at iteration {}: {}", iteration, 
e);
+                }
+            }
+        }
+
+        iteration += 1;
+    }
+
+    log::info!(
+        "Total rows from GPU join across {} iterations: {}",
+        iteration,
+        total_rows
+    );
+    // Test passes if GPU join completes without crashing and finds results
+    // The CUDA reference test loops through all batches to accumulate results
+    assert!(
+        total_rows > 0,
+        "Expected at least some results across {} iterations, got {}",
+        iteration,
+        total_rows
+    );
+    log::info!(
+        "GPU spatial join completed successfully with {} result rows",
+        total_rows
+    );
+}
+/// Helper execution plan that returns a single pre-loaded batch
+struct SingleBatchExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+    props: datafusion::physical_plan::PlanProperties,
+}
+
+impl SingleBatchExec {
+    fn new(batch: RecordBatch) -> Self {
+        let schema = batch.schema();
+        let eq_props = 
datafusion::physical_expr::EquivalenceProperties::new(schema.clone());
+        let partitioning = 
datafusion::physical_plan::Partitioning::UnknownPartitioning(1);
+        let props = datafusion::physical_plan::PlanProperties::new(
+            eq_props,
+            partitioning,
+            datafusion::physical_plan::execution_plan::EmissionType::Final,
+            datafusion::physical_plan::execution_plan::Boundedness::Bounded,
+        );
+        Self {
+            schema,
+            batch,
+            props,
+        }
+    }
+}
+
+impl std::fmt::Debug for SingleBatchExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "SingleBatchExec")
+    }
+}
+
+impl datafusion::physical_plan::DisplayAs for SingleBatchExec {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "SingleBatchExec")
+    }
+}
+
+impl datafusion::physical_plan::ExecutionPlan for SingleBatchExec {
+    fn name(&self) -> &str {
+        "SingleBatchExec"
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+        &self.props
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn 
datafusion::physical_plan::ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn datafusion::physical_plan::ExecutionPlan>>,
+    ) -> datafusion_common::Result<Arc<dyn 
datafusion::physical_plan::ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<datafusion::execution::context::TaskContext>,
+    ) -> 
datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> 
{
+        use datafusion::physical_plan::{RecordBatchStream, 
SendableRecordBatchStream};
+        use futures::Stream;
+        use std::pin::Pin;
+        use std::task::{Context, Poll};
+
+        struct OnceBatchStream {
+            schema: Arc<Schema>,
+            batch: Option<RecordBatch>,
+        }
+
+        impl Stream for OnceBatchStream {
+            type Item = datafusion_common::Result<RecordBatch>;
+
+            fn poll_next(
+                mut self: Pin<&mut Self>,
+                _cx: &mut Context<'_>,
+            ) -> Poll<Option<Self::Item>> {
+                Poll::Ready(self.batch.take().map(Ok))
+            }
+        }
+
+        impl RecordBatchStream for OnceBatchStream {
+            fn schema(&self) -> Arc<Schema> {
+                self.schema.clone()
+            }
+        }
+
+        Ok(Box::pin(OnceBatchStream {
+            schema: self.schema.clone(),
+            batch: Some(self.batch.clone()),
+        }) as SendableRecordBatchStream)
+    }
+}
+#[tokio::test]
+#[ignore] // Requires GPU hardware
+async fn test_gpu_spatial_join_correctness() {
+    use sedona_expr::scalar_udf::SedonaScalarUDF;
+    use sedona_geos::register::scalar_kernels;
+    use sedona_schema::crs::lnglat;
+    use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY};
+    use sedona_testing::create::create_array_storage;
+    use sedona_testing::testers::ScalarUdfTester;
+
+    let _ = env_logger::builder().is_test(true).try_init();
+
+    if !is_gpu_available() {
+        log::warn!("GPU not available, skipping test");
+        return;
+    }
+
+    // Use the same test data as the libgpuspatial reference test
+    let polygon_values = &[
+        Some("POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))"),
+        Some("POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 
20, 20 30))"),
+        Some("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0), (2 2, 3 2, 3 3, 2 3, 2 
2), (6 6, 8 6, 8 8, 6 8, 6 6))"),
+        Some("POLYGON ((30 0, 60 20, 50 50, 10 50, 0 20, 30 0), (20 30, 25 40, 
15 40, 20 30), (30 30, 35 40, 25 40, 30 30), (40 30, 45 40, 35 40, 40 30))"),
+        Some("POLYGON ((40 0, 50 30, 80 20, 90 70, 60 90, 30 80, 20 40, 40 0), 
(50 20, 65 30, 60 50, 45 40, 50 20), (30 60, 50 70, 45 80, 30 60))"),
+    ];
+
+    let point_values = &[
+        Some("POINT (30 20)"), // poly0
+        Some("POINT (20 20)"), // poly1
+        Some("POINT (1 1)"),   // poly2
+        Some("POINT (70 70)"), // no match
+        Some("POINT (55 35)"), // poly4
+    ];
+
+    // Create Arrow arrays from WKT (shared for all predicates)
+    let polygons = create_array_storage(polygon_values, &WKB_GEOMETRY);
+    let points = create_array_storage(point_values, &WKB_GEOMETRY);
+
+    // Create RecordBatches (shared for all predicates)
+    let polygon_schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("geometry", DataType::Binary, false),
+    ]));
+
+    let point_schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("geometry", DataType::Binary, false),
+    ]));
+
+    let polygon_ids = Int32Array::from(vec![0, 1, 2, 3, 4]);
+    let point_ids = Int32Array::from(vec![0, 1, 2, 3, 4]);
+
+    let polygon_batch = RecordBatch::try_new(
+        polygon_schema.clone(),
+        vec![Arc::new(polygon_ids), polygons],
+    )
+    .unwrap();
+
+    let point_batch =
+        RecordBatch::try_new(point_schema.clone(), vec![Arc::new(point_ids), 
points]).unwrap();
+
+    // Pre-create CPU testers for all predicates (shared across all tests)
+    let kernels = scalar_kernels();
+    let sedona_type = SedonaType::Wkb(Edges::Planar, lnglat());
+
+    let _cpu_testers: std::collections::HashMap<&str, ScalarUdfTester> = [

Review Comment:
   This test was incomplete, but it has now been completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to