paleolimbot commented on code in PR #556: URL: https://github.com/apache/sedona-db/pull/556#discussion_r2755438058
########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/utils/markers.hpp: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#define DISABLE_NVTX_MARKERS + +#ifndef DISABLE_NVTX_MARKERS +#include <nvtx3/nvtx3.hpp> +#endif + +namespace gpuspatial { + +struct Category { + static constexpr uint32_t KernelWorkitems = 1; + static constexpr uint32_t IntervalWorkitems = 2; +}; + +// Colors in ARGB format (Alpha, Red, Green, Blue) +struct Color { + static constexpr uint32_t Red = 0xFF880000; + static constexpr uint32_t Green = 0xFF008800; + static constexpr uint32_t Blue = 0xFF000088; + static constexpr uint32_t Yellow = 0xFFFFFF00; + static constexpr uint32_t Default = 0; Review Comment: Do we need these? ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/loader/parallel_wkb_loader.hpp: ########## @@ -756,102 +799,99 @@ class ParallelWkbLoader { private: Config config_; - ArrowArrayView array_view_; + nanoarrow::UniqueArrayView array_view_; GeometryType geometry_type_; detail::DeviceParsedGeometries<POINT_T, INDEX_T> geoms_; std::shared_ptr<ThreadPool> thread_pool_; - void updateGeometryType(int64_t offset, int64_t length) { + template <typename OFFSET_IT> + void updateGeometryType(OFFSET_IT begin, OFFSET_IT end) { if (geometry_type_ == GeometryType::kGeometryCollection) { - // it's already the most generic type return; } - std::vector<bool> type_flags(8 /*WKB types*/, false); - std::vector<std::thread> workers; + size_t num_offsets = std::distance(begin, end); + if (num_offsets == 0) return; + auto parallelism = thread_pool_->num_threads(); - auto thread_work_size = (length + parallelism - 1) / parallelism; - std::vector<std::future<void>> futures; + auto thread_work_size = (num_offsets + parallelism - 1) / parallelism; + + std::vector<std::future<uint32_t>> futures; + futures.reserve(parallelism); + + // Detect Endianness once (outside the loop) + const bool host_is_little = detail::is_little_endian(); for (int thread_idx = 0; thread_idx < parallelism; thread_idx++) { - auto run = [&](int tid) { - auto thread_work_start = tid * thread_work_size; - auto thread_work_end = std::min(length, thread_work_start + thread_work_size); - GeoArrowWKBReader reader; - GeoArrowError error; - GEOARROW_THROW_NOT_OK(nullptr, GeoArrowWKBReaderInit(&reader)); + auto run = [=](int tid) -> uint32_t { + size_t thread_work_start = tid * thread_work_size; + size_t thread_work_end = + std::min(num_offsets, thread_work_start + thread_work_size); + + uint32_t local_seen_mask = 0; for (uint32_t work_offset = thread_work_start; work_offset < thread_work_end; work_offset++) { - auto arrow_offset = work_offset + offset; - // handle null value - if (ArrowArrayViewIsNull(&array_view_, arrow_offset)) { + auto arrow_offset = begin[work_offset]; + + if (ArrowArrayViewIsNull(array_view_.get(), arrow_offset)) { Review Comment: Since it looks like you're trying to avoid branching here, I believe it's faster to decompose this into three branchless loops: - Assert no nulls - Assert all lengths > 5 - Accumulate the bitmask - Fall back on an implementation with branches inside the loop if any of the assertions fails ########## c/sedona-libgpuspatial/src/libgpuspatial.rs: ########## @@ -331,179 +327,387 @@ impl GpuSpatialJoinerWrapper { &[] } - pub fn get_stream_indices_buffer(&self, ctx: &mut GpuSpatialJoinerContext) -> &[u32] { - if let Some(get_stream_indices_buffer_fn) = self.joiner.get_stream_indices_buffer { - let mut stream_indices_ptr: *mut c_void = std::ptr::null_mut(); - let mut stream_indices_len: u32 = 0; + pub fn get_probe_indices_buffer(&self, ctx: &mut SedonaSpatialIndexContext) -> &[u32] { + if let Some(get_probe_indices_buffer_fn) = self.index.get_probe_indices_buffer { + let mut probe_indices_ptr: *mut u32 = std::ptr::null_mut(); + let mut probe_indices_len: u32 = 0; unsafe { - get_stream_indices_buffer_fn( + get_probe_indices_buffer_fn( ctx as *mut _, - &mut stream_indices_ptr as *mut *mut c_void, - &mut stream_indices_len as *mut u32, + &mut probe_indices_ptr as *mut *mut u32, + &mut probe_indices_len as *mut u32, ); // Check length first - empty vectors return empty slice - if stream_indices_len == 0 { + if probe_indices_len == 0 { return &[]; } // Validate pointer (should not be null if length > 0) - if stream_indices_ptr.is_null() { + if probe_indices_ptr.is_null() { return &[]; } // Convert the raw pointer to a slice. This is safe to do because // we've validated the pointer is non-null and length is valid. - let typed_ptr = stream_indices_ptr as *const u32; + let typed_ptr = probe_indices_ptr as *const u32; // Safety: We've checked ptr is non-null and len > 0 - return std::slice::from_raw_parts(typed_ptr, stream_indices_len as usize); + return std::slice::from_raw_parts(typed_ptr, probe_indices_len as usize); } } &[] } +} - pub fn release(&mut self) { - // Call the release function if it exists - if let Some(release_fn) = self.joiner.release { - unsafe { - release_fn(&mut self.joiner as *mut _); - } +impl Default for GpuSpatialIndexFloat2DWrapper { + fn default() -> Self { + GpuSpatialIndexFloat2DWrapper { + index: SedonaFloatIndex2D { + clear: None, + create_context: None, + destroy_context: None, + push_build: None, + finish_building: None, + probe: None, + get_build_indices_buffer: None, + get_probe_indices_buffer: None, + get_last_error: None, + context_get_last_error: None, + release: None, + private_data: std::ptr::null_mut(), + }, + _runtime: Arc::new(Mutex::new(GpuSpatialRuntimeWrapper::default())), } } } -impl Drop for GpuSpatialJoinerWrapper { +impl Drop for GpuSpatialIndexFloat2DWrapper { fn drop(&mut self) { // Call the release function if it exists - if let Some(release_fn) = self.joiner.release { + if let Some(release_fn) = self.index.release { unsafe { - release_fn(&mut self.joiner as *mut _); + release_fn(&mut self.index as *mut _); } } } } -#[cfg(test)] -mod test { - use super::*; - use sedona_expr::scalar_udf::SedonaScalarUDF; - use sedona_geos::register::scalar_kernels; - use sedona_schema::crs::lnglat; - use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY}; - use sedona_testing::create::create_array_storage; - use sedona_testing::testers::ScalarUdfTester; - use std::env; - use std::path::PathBuf; - - #[test] - fn test_gpu_joiner_end2end() { - let mut joiner = GpuSpatialJoinerWrapper::new(); - - let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); - let ptx_root = out_path.join("share/gpuspatial/shaders"); - - joiner - .init( - 1, - ptx_root.to_str().expect("Failed to convert path to string"), - ) - .expect("Failed to init GpuSpatialJoiner"); - - let polygon_values = &[ - Some("POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))"), - Some("POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))"), - Some("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0), (2 2, 3 2, 3 3, 2 3, 2 2), (6 6, 8 6, 8 8, 6 8, 6 6))"), - Some("POLYGON ((30 0, 60 20, 50 50, 10 50, 0 20, 30 0), (20 30, 25 40, 15 40, 20 30), (30 30, 35 40, 25 40, 30 30), (40 30, 45 40, 35 40, 40 30))"), - Some("POLYGON ((40 0, 50 30, 80 20, 90 70, 60 90, 30 80, 20 40, 40 0), (50 20, 65 30, 60 50, 45 40, 50 20), (30 60, 50 70, 45 80, 30 60))"), - ]; - let polygons = create_array_storage(polygon_values, &WKB_GEOMETRY); - - // Let the gpusaptial joiner to parse WKBs and get building boxes - joiner - .push_build(&polygons, 0, polygons.len().try_into().unwrap()) - .expect("Failed to push building"); - // Build a spatial index for Build internally on GPU - joiner.finish_building().expect("Failed to finish building"); - - // Each thread that performs spatial joins should have its own context. - // The context is passed to PushStream calls to perform spatial joins. - let mut ctx = GpuSpatialJoinerContext { - last_error: std::ptr::null(), +#[repr(u32)] +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum GpuSpatialRelationPredicateWrapper { + Equals = 0, + Disjoint = 1, + Touches = 2, + Contains = 3, + Covers = 4, + Intersects = 5, + Within = 6, + CoveredBy = 7, +} Review Comment: Is there a reason that we need two enums defined like this? ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/refine/rt_spatial_refiner.hpp: ########## @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "gpuspatial/refine/spatial_refiner.hpp" +#include "gpuspatial/rt/rt_engine.hpp" + +#include <memory> + +namespace gpuspatial { + +struct RTSpatialRefinerConfig { + std::shared_ptr<RTEngine> rt_engine; + // Prefer fast build the BVH Review Comment: Can these be proper docstrings? ########## c/sedona-libgpuspatial/libgpuspatial/test/c_wrapper_test.cc: ########## @@ -84,23 +182,138 @@ TEST_F(CWrapperTest, InitializeJoiner) { ASSERT_EQ(ArrowArrayStreamGetSchema(point_stream.get(), stream_schema.get(), &error), NANOARROW_OK); - joiner_.push_build(&joiner_, build_schema.get(), build_array.get(), 0, - build_array->length); - joiner_.finish_building(&joiner_); + class GEOSCppHandle { + public: + GEOSContextHandle_t handle; + + GEOSCppHandle() { handle = GEOS_init_r(); } + + ~GEOSCppHandle() { GEOS_finish_r(handle); } + }; + GEOSCppHandle handle; + + reader.InitFromEncoding(handle.handle, GEOARROW_GEOS_ENCODING_WKB); + + geoarrow::geos::GeometryVector geom_build(handle.handle); + + geom_build.resize(build_array->length); + size_t n_build; + + ASSERT_EQ(reader.Read(build_array.get(), 0, build_array->length, + geom_build.mutable_data(), &n_build), + GEOARROW_GEOS_OK); + auto* tree = GEOSSTRtree_create_r(handle.handle, 10); + std::vector<box_t> rects; + + for (size_t build_idx = 0; build_idx < build_array->length; build_idx++) { + auto* geom = geom_build.borrow(build_idx); + auto* box = GEOSEnvelope_r(handle.handle, geom); + + double xmin, ymin, xmax, ymax; + int result = GEOSGeom_getExtent_r(handle.handle, box, &xmin, &ymin, &xmax, &ymax); + ASSERT_EQ(result, 1); + box_t bbox(fpoint_t((float)xmin, (float)ymin), fpoint_t((float)xmax, (float)ymax)); + + rects.push_back(bbox); + + GEOSGeom_setUserData_r(handle.handle, (GEOSGeometry*)geom, (void*)build_idx); + GEOSSTRtree_insert_r(handle.handle, tree, box, (void*)geom); + GEOSGeom_destroy_r(handle.handle, box); + } + + index_.clear(&index_); + ASSERT_EQ(index_.push_build(&index_, (float*)rects.data(), rects.size()), 0); + ASSERT_EQ(index_.finish_building(&index_), 0); - joiner_.push_stream(&joiner_, &context_, stream_schema.get(), stream_array.get(), 0, - stream_array->length, GpuSpatialPredicateContains, 0); + geoarrow::geos::GeometryVector geom_stream(handle.handle); + size_t n_stream; + geom_stream.resize(stream_array->length); - void* build_indices_ptr; - void* stream_indices_ptr; + ASSERT_EQ(reader.Read(stream_array.get(), 0, stream_array->length, + geom_stream.mutable_data(), &n_stream), + GEOARROW_GEOS_OK); + + std::vector<box_t> queries; + + for (size_t stream_idx = 0; stream_idx < stream_array->length; stream_idx++) { + auto* geom = geom_stream.borrow(stream_idx); + double xmin, ymin, xmax, ymax; + int result = GEOSGeom_getExtent_r(handle.handle, geom, &xmin, &ymin, &xmax, &ymax); + ASSERT_EQ(result, 1); + box_t bbox(fpoint_t((float)xmin, (float)ymin), fpoint_t((float)xmax, (float)ymax)); + queries.push_back(bbox); + } + + SedonaSpatialIndexContext idx_ctx; + index_.create_context(&idx_ctx); + + index_.probe(&index_, &idx_ctx, (float*)queries.data(), queries.size()); + + uint32_t* build_indices_ptr; + uint32_t* probe_indices_ptr; uint32_t build_indices_length; - uint32_t stream_indices_length; + uint32_t probe_indices_length; - joiner_.get_build_indices_buffer(&context_, (void**)&build_indices_ptr, - &build_indices_length); - joiner_.get_stream_indices_buffer(&context_, (void**)&stream_indices_ptr, - &stream_indices_length); - } + index_.get_build_indices_buffer(&idx_ctx, &build_indices_ptr, &build_indices_length); + index_.get_probe_indices_buffer(&idx_ctx, &probe_indices_ptr, &probe_indices_length); + + uint32_t new_len; + ASSERT_EQ( + refiner_.refine(&refiner_, build_schema.get(), build_array.get(), + stream_schema.get(), stream_array.get(), + SedonaSpatialRelationPredicate::SedonaSpatialPredicateContains, + (uint32_t*)build_indices_ptr, (uint32_t*)probe_indices_ptr, + build_indices_length, &new_len), + 0); - joiner_.destroy_context(&context_); + std::vector<uint32_t> build_indices((uint32_t*)build_indices_ptr, + (uint32_t*)build_indices_ptr + new_len); + std::vector<uint32_t> probe_indices((uint32_t*)probe_indices_ptr, + (uint32_t*)probe_indices_ptr + new_len); + + struct Payload { + GEOSContextHandle_t handle; + const GEOSGeometry* geom; + std::vector<uint32_t> build_indices; + std::vector<uint32_t> stream_indices; + SedonaSpatialRelationPredicate predicate; + }; + Review Comment: I believe a lot of this code also exists in another test? Is it worth testing the C wrapper in that test as well instead of duplicating the code that calculates the "correct" answer or consolidating the three places where that is currently done into some common code? ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/loader/parallel_wkb_loader.hpp: ########## @@ -875,21 +915,107 @@ class ParallelWkbLoader { nums.shrink_to_fit(stream); } - size_t estimateTotalBytes(const ArrowArray* array, int64_t offset, int64_t length) { - ArrowError arrow_error; - if (ArrowArrayViewSetArray(&array_view_, array, &arrow_error) != NANOARROW_OK) { - throw std::runtime_error("ArrowArrayViewSetArray error " + - std::string(arrow_error.message)); - } + template <typename OFFSET_IT> + size_t estimateTotalBytes(OFFSET_IT begin, OFFSET_IT end) const { size_t total_bytes = 0; - for (int64_t i = 0; i < length; i++) { - if (!ArrowArrayViewIsNull(&array_view_, offset + i)) { - auto item = ArrowArrayViewGetBytesUnsafe(&array_view_, offset + i); + for (auto it = begin; it != end; ++it) { + auto offset = *it; + if (!ArrowArrayViewIsNull(array_view_.get(), offset)) { + auto item = ArrowArrayViewGetBytesUnsafe(array_view_.get(), offset); total_bytes += item.size_bytes - 1 // byte order - 2 * sizeof(uint32_t); // type + size } } return total_bytes; } + + template <typename OFFSET_IT> + std::vector<uint32_t> assignBalancedWorks(OFFSET_IT begin, OFFSET_IT end, + uint32_t num_threads) const { + size_t total_bytes = 0; + std::vector<uint32_t> bytes_per_row; + size_t num_rows = std::distance(begin, end); + + bytes_per_row.resize(num_rows, 0); + + // 1. Calculate bytes per row + for (auto it = begin; it != end; ++it) { + auto offset = *it; + if (!ArrowArrayViewIsNull(array_view_.get(), offset)) { + auto item = ArrowArrayViewGetBytesUnsafe(array_view_.get(), offset); + // Assuming item.size_bytes fits in uint32_t based on vector definition + bytes_per_row[it - begin] = static_cast<uint32_t>(item.size_bytes); + } + } + + // 2. Calculate prefix sum + // We use size_t (or uint64_t) for the sum to prevent overflow + std::vector<size_t> prefix_sum; + prefix_sum.reserve(num_rows + 1); + prefix_sum.push_back(0); + + for (uint32_t b : bytes_per_row) { + total_bytes += b; + prefix_sum.push_back(total_bytes); + } + + // 3. Calculate balanced split points + std::vector<uint32_t> split_points; + split_points.reserve(num_threads + 1); + split_points.push_back(0); // The start index for the first thread + + // Avoid division by zero + if (num_threads > 0) { Review Comment: Should something like this fail with an assertion or exception? ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/refine/rt_spatial_refiner.cuh: ########## @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include "gpuspatial/geom/box.hpp" +#include "gpuspatial/geom/point.hpp" +#include "gpuspatial/loader/device_geometries.hpp" +#include "gpuspatial/loader/parallel_wkb_loader.hpp" +#include "gpuspatial/refine/rt_spatial_refiner.hpp" +#include "gpuspatial/refine/spatial_refiner.hpp" +#include "gpuspatial/relate/relate_engine.cuh" +#include "gpuspatial/rt/rt_engine.hpp" +#include "gpuspatial/utils/gpu_timer.hpp" +#include "gpuspatial/utils/thread_pool.hpp" + +#include "geoarrow/geoarrow_type.h" +#include "nanoarrow/nanoarrow.h" + +#include "rmm/cuda_stream_pool.hpp" +#include "rmm/cuda_stream_view.hpp" + +#include <thread> + +#define GPUSPATIAL_PROFILING Review Comment: Should this define be in some configuration header somewhere or is this intentional? ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/loader/parallel_wkb_loader.hpp: ########## @@ -875,21 +915,107 @@ class ParallelWkbLoader { nums.shrink_to_fit(stream); } - size_t estimateTotalBytes(const ArrowArray* array, int64_t offset, int64_t length) { - ArrowError arrow_error; - if (ArrowArrayViewSetArray(&array_view_, array, &arrow_error) != NANOARROW_OK) { - throw std::runtime_error("ArrowArrayViewSetArray error " + - std::string(arrow_error.message)); - } + template <typename OFFSET_IT> + size_t estimateTotalBytes(OFFSET_IT begin, OFFSET_IT end) const { size_t total_bytes = 0; - for (int64_t i = 0; i < length; i++) { - if (!ArrowArrayViewIsNull(&array_view_, offset + i)) { - auto item = ArrowArrayViewGetBytesUnsafe(&array_view_, offset + i); + for (auto it = begin; it != end; ++it) { + auto offset = *it; + if (!ArrowArrayViewIsNull(array_view_.get(), offset)) { + auto item = ArrowArrayViewGetBytesUnsafe(array_view_.get(), offset); total_bytes += item.size_bytes - 1 // byte order - 2 * sizeof(uint32_t); // type + size } } return total_bytes; } + + template <typename OFFSET_IT> + std::vector<uint32_t> assignBalancedWorks(OFFSET_IT begin, OFFSET_IT end, + uint32_t num_threads) const { + size_t total_bytes = 0; Review Comment: To my reading these are only iterating over sequences representable by `offset` and `length`. If this is the case you can optmize some things (e.g., calling `ArrowArrayViewIsNull()` in a loop is slow and can often be skipped by checking for a zero null count and proceeding with a branchless loop). Also, if this can be reduced to an offset/length you can compute the value you're looking for without peeking into the values at all depending on the array type (binary/large binary this is "just" reading two values from the offsets buffer). ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/loader/parallel_wkb_loader.hpp: ########## @@ -875,21 +915,107 @@ class ParallelWkbLoader { nums.shrink_to_fit(stream); } - size_t estimateTotalBytes(const ArrowArray* array, int64_t offset, int64_t length) { - ArrowError arrow_error; - if (ArrowArrayViewSetArray(&array_view_, array, &arrow_error) != NANOARROW_OK) { - throw std::runtime_error("ArrowArrayViewSetArray error " + - std::string(arrow_error.message)); - } + template <typename OFFSET_IT> + size_t estimateTotalBytes(OFFSET_IT begin, OFFSET_IT end) const { size_t total_bytes = 0; - for (int64_t i = 0; i < length; i++) { - if (!ArrowArrayViewIsNull(&array_view_, offset + i)) { - auto item = ArrowArrayViewGetBytesUnsafe(&array_view_, offset + i); + for (auto it = begin; it != end; ++it) { + auto offset = *it; + if (!ArrowArrayViewIsNull(array_view_.get(), offset)) { + auto item = ArrowArrayViewGetBytesUnsafe(array_view_.get(), offset); total_bytes += item.size_bytes - 1 // byte order - 2 * sizeof(uint32_t); // type + size Review Comment: Later you just use `item.size_bytes`. Are the extra five bytes per item important here and not elsewhere? ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/refine/spatial_refiner.hpp: ########## @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include "gpuspatial/relate/predicate.hpp" + +#include "nanoarrow/nanoarrow.h" + +namespace gpuspatial { +class SpatialRefiner { Review Comment: This class and its methods would benefit from docstrings because it is one of the places where the general approach of this library is constrained. You can link to the docstrings here in the C interface documentation to avoid duplicating them. ########## c/sedona-libgpuspatial/libgpuspatial/test/refiner_test.cu: ########## @@ -0,0 +1,738 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "array_stream.hpp" +#include "gpuspatial/index/rt_spatial_index.hpp" +#include "gpuspatial/loader/device_geometries.hpp" +#include "gpuspatial/refine/rt_spatial_refiner.hpp" +#include "test_common.hpp" + +#include "geoarrow_geos/geoarrow_geos.hpp" +#include "nanoarrow/nanoarrow.hpp" + +#include <geoarrow/geoarrow.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> +#include <numeric> // For std::iota + +#include "gpuspatial/index/rt_spatial_index.cuh" +#include "gpuspatial/refine/rt_spatial_refiner.cuh" + +namespace gpuspatial { +// Function to read a single Parquet file and extract a column. +static arrow::Status ReadParquetFromFile( + arrow::fs::FileSystem* fs, // 1. Filesystem pointer (e.g., LocalFileSystem) + const std::string& file_path, // 2. Single file path instead of a folder + int64_t batch_size, const char* column_name, + std::vector<std::shared_ptr<arrow::Array>>& out_arrays) { Review Comment: This sequence of tests is a little hard to follow: - Do we need both a Parquet and IPC loader to load test files? I wonder if this would make it slightly easier to avoid indirection associated with Arrow C++/Array vs nanoarrow/UniqueArray/UniqueSchema. In the early version of this library we were reading Parquet from S3 as part of the end-to-end bechmarks; however, I think once we hook this up to Rust it is better to do those benchmarks there. - It may help to split some of these longer functions into pieces with more descriptive names. Perhaps encapsulate the GEOS portions as a function like `GenerateReferenceResultUsingGEOS()` and `GenerateResultUsingSpatialJoiner()` that both return the same thing (a vector of index pairs?). ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/refine/spatial_refiner.hpp: ########## @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include "gpuspatial/relate/predicate.hpp" + +#include "nanoarrow/nanoarrow.h" + +namespace gpuspatial { +class SpatialRefiner { + public: + virtual ~SpatialRefiner() = default; + + virtual void Clear() = 0; + + virtual void PushBuild(const ArrowSchema* build_schema, + const ArrowArray* build_array) = 0; + + virtual void FinishBuilding() = 0; + + virtual uint32_t Refine(const ArrowSchema* probe_schema, const ArrowArray* probe_array, Review Comment: For the purposes of the internal C++ class for this, can you simplify these signatures by passing a `const ArrowArrayView*`? ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/gpuspatial_c.h: ########## @@ -14,60 +14,162 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#include <stdbool.h> #include <stdint.h> #ifdef __cplusplus extern "C" { #endif -struct GpuSpatialJoinerConfig { - uint32_t concurrency; +struct ArrowSchema; +struct ArrowArray; + +// Interfaces for ray-tracing engine (OptiX) +struct GpuSpatialRuntimeConfig { + /** Path to PTX files */ const char* ptx_root; + /** Device ID to use, 0 is the first GPU */ + int device_id; + /** Whether to use CUDA memory pool for allocations */ + bool use_cuda_memory_pool; + /** Ratio of initial memory pool size to total GPU memory, between 0 and 100 */ + int cuda_memory_pool_init_precent; }; -struct GpuSpatialJoinerContext { - const char* last_error; // Pointer to std::string to store last error message - void* private_data; // GPUSpatial context - void* build_indices; // Pointer to std::vector<uint32_t> to store results - void* stream_indices; +/** Opaque runtime for GPU spatial operations + * Each process should only has one instance of GpuSpatialRuntimeexactly + * + */ +struct GpuSpatialRuntime { + /** Initialize the runtime (OptiX) with the given configuration + * @return 0 on success, non-zero on failure + */ + int (*init)(struct GpuSpatialRuntime* self, struct GpuSpatialRuntimeConfig* config); + void (*release)(struct GpuSpatialRuntime* self); + const char* (*get_last_error)(struct GpuSpatialRuntime* self); + void* private_data; }; -enum GpuSpatialPredicate { - GpuSpatialPredicateEquals = 0, - GpuSpatialPredicateDisjoint, - GpuSpatialPredicateTouches, - GpuSpatialPredicateContains, - GpuSpatialPredicateCovers, - GpuSpatialPredicateIntersects, - GpuSpatialPredicateWithin, - GpuSpatialPredicateCoveredBy +/** Create an instance of GpuSpatialRuntime */ +void GpuSpatialRuntimeCreate(struct GpuSpatialRuntime* runtime); + +struct GpuSpatialIndexConfig { + /** Pointer to an initialized GpuSpatialRuntime struct */ + struct GpuSpatialRuntime* runtime; + /** How many threads will concurrently call Probe method */ + uint32_t concurrency; +}; + +// An opaque context for concurrent probing +struct SedonaSpatialIndexContext { + void* private_data; +}; + +struct SedonaFloatIndex2D { + /** Clear the spatial index, removing all built data */ + int (*clear)(struct SedonaFloatIndex2D* self); + /** Create a new context for concurrent probing */ + void (*create_context)(struct SedonaSpatialIndexContext* context); + /** Destroy a previously created context */ + void (*destroy_context)(struct SedonaSpatialIndexContext* context); + /** Push rectangles for building the spatial index, each rectangle is represented by 4 + * floats: [min_x, min_y, max_x, max_y] Points can also be indexed by providing [x, y, + * x, y] but points and rectangles cannot be mixed + * + * @return 0 on success, non-zero on failure + */ + int (*push_build)(struct SedonaFloatIndex2D* self, const float* buf, uint32_t n_rects); + /** + * Finish building the spatial index after all rectangles have been pushed + * + * @return 0 on success, non-zero on failure + */ + int (*finish_building)(struct SedonaFloatIndex2D* self); + /** + * Probe the spatial index with the given rectangles, each rectangle is represented by 4 + * floats: [min_x, min_y, max_x, max_y] Points can also be probed by providing [x, y, x, + * y] but points and rectangles cannot be mixed in one Probe call. The results of the + * probe will be stored in the context. + * + * @return 0 on success, non-zero on failure + */ + int (*probe)(struct SedonaFloatIndex2D* self, struct SedonaSpatialIndexContext* context, + const float* buf, uint32_t n_rects); + /** Get the build indices buffer from the context + * + * @return A pointer to the buffer and its length + */ + void (*get_build_indices_buffer)(struct SedonaSpatialIndexContext* context, + uint32_t** build_indices, + uint32_t* build_indices_length); + /** Get the probe indices buffer from the context + * + * @return A pointer to the buffer and its length + */ + void (*get_probe_indices_buffer)(struct SedonaSpatialIndexContext* context, + uint32_t** probe_indices, + uint32_t* probe_indices_length); + const char* (*get_last_error)(struct SedonaFloatIndex2D* self); + const char* (*context_get_last_error)(struct SedonaSpatialIndexContext* context); + /** Release the spatial index and free all resources */ + void (*release)(struct SedonaFloatIndex2D* self); + void* private_data; }; -struct GpuSpatialJoiner { - int (*init)(struct GpuSpatialJoiner* self, struct GpuSpatialJoinerConfig* config); - void (*clear)(struct GpuSpatialJoiner* self); - void (*create_context)(struct GpuSpatialJoiner* self, - struct GpuSpatialJoinerContext* context); - void (*destroy_context)(struct GpuSpatialJoinerContext* context); - int (*push_build)(struct GpuSpatialJoiner* self, const struct ArrowSchema* schema, - const struct ArrowArray* array, int64_t offset, int64_t length); - int (*finish_building)(struct GpuSpatialJoiner* self); - int (*push_stream)(struct GpuSpatialJoiner* self, - struct GpuSpatialJoinerContext* context, - const struct ArrowSchema* schema, const struct ArrowArray* array, - int64_t offset, int64_t length, enum GpuSpatialPredicate predicate, - int32_t array_index_offset); - void (*get_build_indices_buffer)(struct GpuSpatialJoinerContext* context, - void** build_indices, uint32_t* build_indices_length); - void (*get_stream_indices_buffer)(struct GpuSpatialJoinerContext* context, - void** stream_indices, - uint32_t* stream_indices_length); - void (*release)(struct GpuSpatialJoiner* self); +int GpuSpatialIndexFloat2DCreate(struct SedonaFloatIndex2D* index, + const struct GpuSpatialIndexConfig* config); + +struct GpuSpatialRefinerConfig { + /** Pointer to an initialized GpuSpatialRuntime struct */ + struct GpuSpatialRuntime* runtime; + /** How many threads will concurrently call Probe method */ + uint32_t concurrency; + /** Whether to compress the BVH structures to save memory */ + bool compress_bvh; + /** Number of batches to pipeline for parsing and refinement; setting to 1 disables + * pipelining */ + uint32_t pipeline_batches; +}; + +enum SedonaSpatialRelationPredicate { + SedonaSpatialPredicateEquals = 0, + SedonaSpatialPredicateDisjoint, + SedonaSpatialPredicateTouches, + SedonaSpatialPredicateContains, + SedonaSpatialPredicateCovers, + SedonaSpatialPredicateIntersects, + SedonaSpatialPredicateWithin, + SedonaSpatialPredicateCoveredBy +}; + +struct SedonaSpatialRefiner { Review Comment: Can this (and its members) have docstrings? ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/gpuspatial_c.h: ########## @@ -14,60 +14,162 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#include <stdbool.h> #include <stdint.h> #ifdef __cplusplus extern "C" { #endif -struct GpuSpatialJoinerConfig { - uint32_t concurrency; +struct ArrowSchema; +struct ArrowArray; + +// Interfaces for ray-tracing engine (OptiX) +struct GpuSpatialRuntimeConfig { + /** Path to PTX files */ const char* ptx_root; + /** Device ID to use, 0 is the first GPU */ + int device_id; + /** Whether to use CUDA memory pool for allocations */ + bool use_cuda_memory_pool; + /** Ratio of initial memory pool size to total GPU memory, between 0 and 100 */ + int cuda_memory_pool_init_precent; }; -struct GpuSpatialJoinerContext { - const char* last_error; // Pointer to std::string to store last error message - void* private_data; // GPUSpatial context - void* build_indices; // Pointer to std::vector<uint32_t> to store results - void* stream_indices; +/** Opaque runtime for GPU spatial operations + * Each process should only has one instance of GpuSpatialRuntimeexactly + * + */ +struct GpuSpatialRuntime { + /** Initialize the runtime (OptiX) with the given configuration + * @return 0 on success, non-zero on failure + */ + int (*init)(struct GpuSpatialRuntime* self, struct GpuSpatialRuntimeConfig* config); + void (*release)(struct GpuSpatialRuntime* self); + const char* (*get_last_error)(struct GpuSpatialRuntime* self); + void* private_data; }; -enum GpuSpatialPredicate { - GpuSpatialPredicateEquals = 0, - GpuSpatialPredicateDisjoint, - GpuSpatialPredicateTouches, - GpuSpatialPredicateContains, - GpuSpatialPredicateCovers, - GpuSpatialPredicateIntersects, - GpuSpatialPredicateWithin, - GpuSpatialPredicateCoveredBy +/** Create an instance of GpuSpatialRuntime */ +void GpuSpatialRuntimeCreate(struct GpuSpatialRuntime* runtime); + +struct GpuSpatialIndexConfig { + /** Pointer to an initialized GpuSpatialRuntime struct */ + struct GpuSpatialRuntime* runtime; + /** How many threads will concurrently call Probe method */ + uint32_t concurrency; +}; + +// An opaque context for concurrent probing +struct SedonaSpatialIndexContext { + void* private_data; +}; + +struct SedonaFloatIndex2D { + /** Clear the spatial index, removing all built data */ + int (*clear)(struct SedonaFloatIndex2D* self); + /** Create a new context for concurrent probing */ + void (*create_context)(struct SedonaSpatialIndexContext* context); + /** Destroy a previously created context */ + void (*destroy_context)(struct SedonaSpatialIndexContext* context); + /** Push rectangles for building the spatial index, each rectangle is represented by 4 + * floats: [min_x, min_y, max_x, max_y] Points can also be indexed by providing [x, y, + * x, y] but points and rectangles cannot be mixed + * + * @return 0 on success, non-zero on failure + */ + int (*push_build)(struct SedonaFloatIndex2D* self, const float* buf, uint32_t n_rects); + /** + * Finish building the spatial index after all rectangles have been pushed + * + * @return 0 on success, non-zero on failure + */ + int (*finish_building)(struct SedonaFloatIndex2D* self); + /** + * Probe the spatial index with the given rectangles, each rectangle is represented by 4 + * floats: [min_x, min_y, max_x, max_y] Points can also be probed by providing [x, y, x, + * y] but points and rectangles cannot be mixed in one Probe call. The results of the + * probe will be stored in the context. + * + * @return 0 on success, non-zero on failure + */ + int (*probe)(struct SedonaFloatIndex2D* self, struct SedonaSpatialIndexContext* context, + const float* buf, uint32_t n_rects); + /** Get the build indices buffer from the context + * + * @return A pointer to the buffer and its length + */ + void (*get_build_indices_buffer)(struct SedonaSpatialIndexContext* context, + uint32_t** build_indices, + uint32_t* build_indices_length); + /** Get the probe indices buffer from the context + * + * @return A pointer to the buffer and its length + */ + void (*get_probe_indices_buffer)(struct SedonaSpatialIndexContext* context, + uint32_t** probe_indices, + uint32_t* probe_indices_length); + const char* (*get_last_error)(struct SedonaFloatIndex2D* self); + const char* (*context_get_last_error)(struct SedonaSpatialIndexContext* context); + /** Release the spatial index and free all resources */ + void (*release)(struct SedonaFloatIndex2D* self); + void* private_data; }; -struct GpuSpatialJoiner { - int (*init)(struct GpuSpatialJoiner* self, struct GpuSpatialJoinerConfig* config); - void (*clear)(struct GpuSpatialJoiner* self); - void (*create_context)(struct GpuSpatialJoiner* self, - struct GpuSpatialJoinerContext* context); - void (*destroy_context)(struct GpuSpatialJoinerContext* context); - int (*push_build)(struct GpuSpatialJoiner* self, const struct ArrowSchema* schema, - const struct ArrowArray* array, int64_t offset, int64_t length); - int (*finish_building)(struct GpuSpatialJoiner* self); - int (*push_stream)(struct GpuSpatialJoiner* self, - struct GpuSpatialJoinerContext* context, - const struct ArrowSchema* schema, const struct ArrowArray* array, - int64_t offset, int64_t length, enum GpuSpatialPredicate predicate, - int32_t array_index_offset); - void (*get_build_indices_buffer)(struct GpuSpatialJoinerContext* context, - void** build_indices, uint32_t* build_indices_length); - void (*get_stream_indices_buffer)(struct GpuSpatialJoinerContext* context, - void** stream_indices, - uint32_t* stream_indices_length); - void (*release)(struct GpuSpatialJoiner* self); +int GpuSpatialIndexFloat2DCreate(struct SedonaFloatIndex2D* index, + const struct GpuSpatialIndexConfig* config); + +struct GpuSpatialRefinerConfig { + /** Pointer to an initialized GpuSpatialRuntime struct */ + struct GpuSpatialRuntime* runtime; + /** How many threads will concurrently call Probe method */ + uint32_t concurrency; + /** Whether to compress the BVH structures to save memory */ + bool compress_bvh; + /** Number of batches to pipeline for parsing and refinement; setting to 1 disables + * pipelining */ + uint32_t pipeline_batches; +}; + +enum SedonaSpatialRelationPredicate { + SedonaSpatialPredicateEquals = 0, + SedonaSpatialPredicateDisjoint, + SedonaSpatialPredicateTouches, + SedonaSpatialPredicateContains, + SedonaSpatialPredicateCovers, + SedonaSpatialPredicateIntersects, + SedonaSpatialPredicateWithin, + SedonaSpatialPredicateCoveredBy +}; + +struct SedonaSpatialRefiner { + int (*clear)(struct SedonaSpatialRefiner* self); + + int (*push_build)(struct SedonaSpatialRefiner* self, + const struct ArrowSchema* build_schema, + const struct ArrowArray* build_array); + + int (*finish_building)(struct SedonaSpatialRefiner* self); + + int (*refine_loaded)(struct SedonaSpatialRefiner* self, + const struct ArrowSchema* probe_schema, + const struct ArrowArray* probe_array, + enum SedonaSpatialRelationPredicate predicate, + uint32_t* build_indices, uint32_t* probe_indices, + uint32_t indices_size, uint32_t* new_indices_size); Review Comment: I noticed in a recent PR that @Kontinuation used 64-bit indices for the build side but 32-bit indices for the probe side. I am guessing that here the extra 4 bytes per row probably makes a difference because you have to shuffle them to and from the GPU, but if it doesn't, using a `uint64` for the build side might better align with Kristin's join code. ########## c/sedona-libgpuspatial/libgpuspatial/src/memory_manager.cc: ########## @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gpuspatial/mem/memory_manager.hpp" +#include "gpuspatial/utils/logger.hpp" + +#if defined(_WIN32) +#include <windows.h> +#elif defined(__linux__) +#include <sys/sysinfo.h> +#else // POSIX (BSD, Solaris, etc.) +#include <unistd.h> +#endif +namespace gpuspatial { +namespace detail { +inline long long get_free_physical_memory() { +#if defined(_WIN32) + // --- Windows --- + MEMORYSTATUSEX status; + status.dwLength = sizeof(status); + if (GlobalMemoryStatusEx(&status)) { + return (long long)status.ullAvailPhys; + } + return 0; + +#elif defined(__linux__) + // --- Linux (sysinfo) --- + struct sysinfo info; + if (sysinfo(&info) == 0) { + return (long long)info.freeram * (long long)info.mem_unit; + } + return 0; Review Comment: There is at least one other place where you query this information by calling `sysinfo()`. Should that code use this helper? ########## c/sedona-libgpuspatial/Cargo.toml: ########## @@ -40,8 +40,11 @@ which = "8.0" arrow-array = { workspace = true, features = ["ffi"] } arrow-schema = { workspace = true } thiserror = { workspace = true } +geo = { workspace = true } +wkt = { workspace = true } log = "0.4" sedona-schema = { path = "../../rust/sedona-schema" } +nvml-wrapper = "0.10.0" Review Comment: Can this be a test dependency as well or can it be removed? (I thought all of the nvidia-related code was behind the C interface?) ########## c/sedona-libgpuspatial/libgpuspatial/test/index_test.cu: ########## @@ -0,0 +1,300 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "array_stream.hpp" +#include "gpuspatial/index/rt_spatial_index.cuh" +#include "test_common.hpp" + +#include <geos/geom/Envelope.h> +#include <geos/index/ItemVisitor.h> +#include <geos/index/strtree/STRtree.h> Review Comment: It may be helpful in your other test to also use the GEOS C++ API since you have it here. (Also your other test utilities, which here are nicely defined as functions with nice names!) ########## c/sedona-libgpuspatial/Cargo.toml: ########## @@ -40,8 +40,11 @@ which = "8.0" arrow-array = { workspace = true, features = ["ffi"] } arrow-schema = { workspace = true } thiserror = { workspace = true } +geo = { workspace = true } +wkt = { workspace = true } Review Comment: Can these additions be `[dev-dependencies]` (i.e., only used in tests) ########## c/sedona-libgpuspatial/src/lib.rs: ########## Review Comment: For the purposes of this getting this particular PR merged, I think it would be better to remove all the code that no longer compiles from this crate (you could even remove everything except the bindgen bindings) and open another PR for those changes after this one merges. I can also iterate on these changes with you here, but I think we will both be happier polishing up the C/C++ and tackling this next. A few high level comments if you'd like to go this route: There is a lot of `#[cfg(gpu_available)]` and we need to find a way to not sprinkle that everywhere. Maybe: ```rust #[cfg(gpu_available)] mod gpu_stuff { // any code that needs gpu stuff } #[cfg(not(gpu_available))] mod gpu_stuff { // dummy version of pub functions or structs from gpu_stuff that error } ``` There are also a lot of undocumented `pub` functions. It's not clear to me what the intended interface is that should be used outside this crate and some documentation would help highlight that. ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/gpuspatial_c.h: ########## @@ -14,60 +14,162 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#include <stdbool.h> #include <stdint.h> #ifdef __cplusplus extern "C" { #endif -struct GpuSpatialJoinerConfig { - uint32_t concurrency; +struct ArrowSchema; +struct ArrowArray; + +// Interfaces for ray-tracing engine (OptiX) +struct GpuSpatialRuntimeConfig { + /** Path to PTX files */ const char* ptx_root; + /** Device ID to use, 0 is the first GPU */ + int device_id; + /** Whether to use CUDA memory pool for allocations */ + bool use_cuda_memory_pool; + /** Ratio of initial memory pool size to total GPU memory, between 0 and 100 */ + int cuda_memory_pool_init_precent; }; -struct GpuSpatialJoinerContext { - const char* last_error; // Pointer to std::string to store last error message - void* private_data; // GPUSpatial context - void* build_indices; // Pointer to std::vector<uint32_t> to store results - void* stream_indices; +/** Opaque runtime for GPU spatial operations + * Each process should only has one instance of GpuSpatialRuntimeexactly + * + */ +struct GpuSpatialRuntime { + /** Initialize the runtime (OptiX) with the given configuration + * @return 0 on success, non-zero on failure + */ + int (*init)(struct GpuSpatialRuntime* self, struct GpuSpatialRuntimeConfig* config); + void (*release)(struct GpuSpatialRuntime* self); + const char* (*get_last_error)(struct GpuSpatialRuntime* self); + void* private_data; }; -enum GpuSpatialPredicate { - GpuSpatialPredicateEquals = 0, - GpuSpatialPredicateDisjoint, - GpuSpatialPredicateTouches, - GpuSpatialPredicateContains, - GpuSpatialPredicateCovers, - GpuSpatialPredicateIntersects, - GpuSpatialPredicateWithin, - GpuSpatialPredicateCoveredBy +/** Create an instance of GpuSpatialRuntime */ +void GpuSpatialRuntimeCreate(struct GpuSpatialRuntime* runtime); + +struct GpuSpatialIndexConfig { + /** Pointer to an initialized GpuSpatialRuntime struct */ + struct GpuSpatialRuntime* runtime; + /** How many threads will concurrently call Probe method */ + uint32_t concurrency; +}; + +// An opaque context for concurrent probing +struct SedonaSpatialIndexContext { + void* private_data; +}; + +struct SedonaFloatIndex2D { + /** Clear the spatial index, removing all built data */ + int (*clear)(struct SedonaFloatIndex2D* self); + /** Create a new context for concurrent probing */ + void (*create_context)(struct SedonaSpatialIndexContext* context); + /** Destroy a previously created context */ + void (*destroy_context)(struct SedonaSpatialIndexContext* context); + /** Push rectangles for building the spatial index, each rectangle is represented by 4 + * floats: [min_x, min_y, max_x, max_y] Points can also be indexed by providing [x, y, + * x, y] but points and rectangles cannot be mixed + * + * @return 0 on success, non-zero on failure + */ + int (*push_build)(struct SedonaFloatIndex2D* self, const float* buf, uint32_t n_rects); + /** + * Finish building the spatial index after all rectangles have been pushed + * + * @return 0 on success, non-zero on failure + */ + int (*finish_building)(struct SedonaFloatIndex2D* self); + /** + * Probe the spatial index with the given rectangles, each rectangle is represented by 4 + * floats: [min_x, min_y, max_x, max_y] Points can also be probed by providing [x, y, x, + * y] but points and rectangles cannot be mixed in one Probe call. The results of the + * probe will be stored in the context. + * + * @return 0 on success, non-zero on failure + */ + int (*probe)(struct SedonaFloatIndex2D* self, struct SedonaSpatialIndexContext* context, + const float* buf, uint32_t n_rects); + /** Get the build indices buffer from the context + * + * @return A pointer to the buffer and its length + */ + void (*get_build_indices_buffer)(struct SedonaSpatialIndexContext* context, + uint32_t** build_indices, + uint32_t* build_indices_length); + /** Get the probe indices buffer from the context + * + * @return A pointer to the buffer and its length + */ + void (*get_probe_indices_buffer)(struct SedonaSpatialIndexContext* context, + uint32_t** probe_indices, + uint32_t* probe_indices_length); + const char* (*get_last_error)(struct SedonaFloatIndex2D* self); + const char* (*context_get_last_error)(struct SedonaSpatialIndexContext* context); + /** Release the spatial index and free all resources */ + void (*release)(struct SedonaFloatIndex2D* self); + void* private_data; }; -struct GpuSpatialJoiner { - int (*init)(struct GpuSpatialJoiner* self, struct GpuSpatialJoinerConfig* config); - void (*clear)(struct GpuSpatialJoiner* self); - void (*create_context)(struct GpuSpatialJoiner* self, - struct GpuSpatialJoinerContext* context); - void (*destroy_context)(struct GpuSpatialJoinerContext* context); - int (*push_build)(struct GpuSpatialJoiner* self, const struct ArrowSchema* schema, - const struct ArrowArray* array, int64_t offset, int64_t length); - int (*finish_building)(struct GpuSpatialJoiner* self); - int (*push_stream)(struct GpuSpatialJoiner* self, - struct GpuSpatialJoinerContext* context, - const struct ArrowSchema* schema, const struct ArrowArray* array, - int64_t offset, int64_t length, enum GpuSpatialPredicate predicate, - int32_t array_index_offset); - void (*get_build_indices_buffer)(struct GpuSpatialJoinerContext* context, - void** build_indices, uint32_t* build_indices_length); - void (*get_stream_indices_buffer)(struct GpuSpatialJoinerContext* context, - void** stream_indices, - uint32_t* stream_indices_length); - void (*release)(struct GpuSpatialJoiner* self); +int GpuSpatialIndexFloat2DCreate(struct SedonaFloatIndex2D* index, + const struct GpuSpatialIndexConfig* config); + +struct GpuSpatialRefinerConfig { + /** Pointer to an initialized GpuSpatialRuntime struct */ + struct GpuSpatialRuntime* runtime; + /** How many threads will concurrently call Probe method */ + uint32_t concurrency; + /** Whether to compress the BVH structures to save memory */ + bool compress_bvh; + /** Number of batches to pipeline for parsing and refinement; setting to 1 disables + * pipelining */ + uint32_t pipeline_batches; +}; + +enum SedonaSpatialRelationPredicate { + SedonaSpatialPredicateEquals = 0, + SedonaSpatialPredicateDisjoint, + SedonaSpatialPredicateTouches, + SedonaSpatialPredicateContains, + SedonaSpatialPredicateCovers, + SedonaSpatialPredicateIntersects, + SedonaSpatialPredicateWithin, + SedonaSpatialPredicateCoveredBy +}; + +struct SedonaSpatialRefiner { + int (*clear)(struct SedonaSpatialRefiner* self); + + int (*push_build)(struct SedonaSpatialRefiner* self, + const struct ArrowSchema* build_schema, + const struct ArrowArray* build_array); Review Comment: Is it possible to add a callback `int (*init_schema)(struct ArrowSchema* build_schema, struct ArrowSchema* probe_schema)` and eliminate all the `const struct ArrowSchema*` arguments for the other callbacks? I know this doesn't help you in Rust because arrow-rs likes to export the schema whether you want it or not. But this might be fixed at some point and from a C API standpoint it's much cleaner. ########## c/sedona-libgpuspatial/src/lib.rs: ########## @@ -77,197 +152,480 @@ impl GpuSpatialContext { #[cfg(gpu_available)] { Ok(Self { - joiner: None, - context: None, - initialized: false, + runtime: None, + index: None, + refiner: None, }) } } - pub fn init(&mut self) -> Result<()> { + pub fn init(&mut self, options: GpuSpatialOptions) -> Result<()> { #[cfg(not(gpu_available))] { + let _ = options; Err(GpuSpatialError::GpuNotAvailable) } #[cfg(gpu_available)] { - let mut joiner = GpuSpatialJoinerWrapper::new(); - // Get PTX path from OUT_DIR - let out_path = std::path::PathBuf::from(env!("OUT_DIR")); - let ptx_root = out_path.join("share/gpuspatial/shaders"); - let ptx_root_str = ptx_root - .to_str() - .ok_or_else(|| GpuSpatialError::Init("Invalid PTX path".to_string()))?; - - // Initialize with concurrency of 1 for now - joiner.init(1, ptx_root_str)?; - - // Create context - let mut ctx = GpuSpatialJoinerContext { - last_error: std::ptr::null(), - private_data: std::ptr::null_mut(), - build_indices: std::ptr::null_mut(), - stream_indices: std::ptr::null_mut(), - }; - joiner.create_context(&mut ctx); + // Acquire the lock for the global shared runtime + let mut global_runtime_guard = GLOBAL_GPUSPATIAL_RUNTIME.lock().unwrap(); + + // Initialize the global runtime if it hasn't been initialized yet + if global_runtime_guard.is_none() { + // Get PTX path from OUT_DIR + let out_path = std::path::PathBuf::from(env!("OUT_DIR")); + let ptx_root = out_path.join("share/gpuspatial/shaders"); + let ptx_root_str = ptx_root + .to_str() + .ok_or_else(|| GpuSpatialError::Init("Invalid PTX path".to_string()))?; + + let runtime = GpuSpatialRuntimeWrapper::try_new( + options.device_id, + ptx_root_str, + options.cuda_use_memory_pool, + options.cuda_memory_pool_init_percent, + )?; + *global_runtime_guard = Some(Arc::new(Mutex::new(runtime))); + } + + // Get a clone of the Arc to the shared runtime + // safe to unwrap here because we just ensured it is Some + let runtime_ref = global_runtime_guard.as_ref().unwrap().clone(); + // Assign to self + self.runtime = Some(runtime_ref); + + let index = GpuSpatialIndexFloat2DWrapper::try_new( + self.runtime.as_ref().unwrap(), + options.concurrency, + )?; + + self.index = Some(index); + + let refiner = GpuSpatialRefinerWrapper::try_new( + self.runtime.as_ref().unwrap(), + options.concurrency, + options.compress_bvh, + options.pipeline_batches, + )?; + self.refiner = Some(refiner); - self.joiner = Some(joiner); - self.context = Some(ctx); - self.initialized = true; Ok(()) } } - #[cfg(gpu_available)] - pub fn get_joiner_mut(&mut self) -> Option<&mut GpuSpatialJoinerWrapper> { - self.joiner.as_mut() + pub fn is_gpu_available() -> bool { + #[cfg(not(gpu_available))] + { + false + } + #[cfg(gpu_available)] + { + let nvml = match Nvml::init() { + Ok(instance) => instance, + Err(_) => return false, + }; + + // Check if the device count is greater than zero + match nvml.device_count() { + Ok(count) => count > 0, + Err(_) => false, + } + } } - #[cfg(gpu_available)] - pub fn get_context_mut(&mut self) -> Option<&mut GpuSpatialJoinerContext> { - self.context.as_mut() + /// Clear previous build data + pub fn index_clear(&mut self) -> Result<()> { + #[cfg(not(gpu_available))] + { + Err(GpuSpatialError::GpuNotAvailable) + } + #[cfg(gpu_available)] + { + let index = self + .index + .as_mut() + .ok_or_else(|| GpuSpatialError::Init("GPU index is not available".into()))?; + + // Clear previous build data + index.clear(); + Ok(()) + } } - pub fn is_initialized(&self) -> bool { - self.initialized + pub fn index_push_build(&mut self, rects: &[Rect<f32>]) -> Result<()> { + #[cfg(not(gpu_available))] + { + let _ = rects; + Err(GpuSpatialError::GpuNotAvailable) + } + #[cfg(gpu_available)] + { + let index = self + .index + .as_mut() + .ok_or_else(|| GpuSpatialError::Init("GPU index not available".into()))?; + + unsafe { index.push_build(rects.as_ptr() as *const f32, rects.len() as u32) } + } + } + + pub fn index_finish_building(&mut self) -> Result<()> { + #[cfg(not(gpu_available))] + return Err(GpuSpatialError::GpuNotAvailable); + + #[cfg(gpu_available)] + self.index + .as_mut() + .ok_or_else(|| GpuSpatialError::Init("GPU index not available".into()))? + .finish_building() } - /// Perform spatial join between two geometry arrays - pub fn spatial_join( - &mut self, - left_geom: arrow_array::ArrayRef, - right_geom: arrow_array::ArrayRef, - predicate: SpatialPredicate, - ) -> Result<(Vec<u32>, Vec<u32>)> { + pub fn probe(&self, rects: &[Rect<f32>]) -> Result<(Vec<u32>, Vec<u32>)> { #[cfg(not(gpu_available))] { - let _ = (left_geom, right_geom, predicate); + let _ = rects; Err(GpuSpatialError::GpuNotAvailable) } #[cfg(gpu_available)] { - if !self.initialized { - return Err(GpuSpatialError::Init("Context not initialized".into())); - } + let index = self + .index + .as_ref() + .ok_or_else(|| GpuSpatialError::Init("GPU index not available".into()))?; - let joiner = self - .joiner - .as_mut() - .ok_or_else(|| GpuSpatialError::Init("GPU joiner not available".into()))?; + let mut ctx = SedonaSpatialIndexContext { + private_data: std::ptr::null_mut(), + }; + index.create_context(&mut ctx); - // Clear previous build data - joiner.clear(); - - // Push build data (left side) - log::info!( - "DEBUG: Pushing {} geometries to GPU (build side)", - left_geom.len() - ); - log::info!("DEBUG: Left array data type: {:?}", left_geom.data_type()); - if let Some(binary_arr) = left_geom - .as_any() - .downcast_ref::<arrow_array::BinaryArray>() - { - log::info!("DEBUG: Left binary array has {} values", binary_arr.len()); - if binary_arr.len() > 0 { - let first_wkb = binary_arr.value(0); - log::info!( - "DEBUG: First left WKB length: {}, first bytes: {:?}", - first_wkb.len(), - &first_wkb[..8.min(first_wkb.len())] - ); + let result = (|| -> Result<(Vec<u32>, Vec<u32>)> { + unsafe { + // If this fails, it returns Err from the *closure*, not the function + index.probe(&mut ctx, rects.as_ptr() as *const f32, rects.len() as u32)?; } - } - joiner.push_build(&left_geom, 0, left_geom.len() as i64)?; - joiner.finish_building()?; + // Copy results + let build_indices = index.get_build_indices_buffer(&mut ctx).to_vec(); + let probe_indices = index.get_probe_indices_buffer(&mut ctx).to_vec(); - // Recreate context after building (required by libgpuspatial) - let mut new_context = libgpuspatial_glue_bindgen::GpuSpatialJoinerContext { - last_error: std::ptr::null(), - private_data: std::ptr::null_mut(), - build_indices: std::ptr::null_mut(), - stream_indices: std::ptr::null_mut(), - }; - joiner.create_context(&mut new_context); - self.context = Some(new_context); - let context = self.context.as_mut().unwrap(); - // Push stream data (right side) and perform join - let gpu_predicate = predicate.into(); - joiner.push_stream( - context, - &right_geom, - 0, - right_geom.len() as i64, - gpu_predicate, - 0, // array_index_offset - )?; + Ok((build_indices, probe_indices)) + })(); - // Get results - let build_indices = joiner.get_build_indices_buffer(context).to_vec(); - let stream_indices = joiner.get_stream_indices_buffer(context).to_vec(); + index.destroy_context(&mut ctx); - Ok((build_indices, stream_indices)) + result } } -} -/// Spatial predicates for GPU operations -#[repr(u32)] -#[derive(Debug, PartialEq, Copy, Clone)] -pub enum SpatialPredicate { - Equals = 0, - Disjoint = 1, - Touches = 2, - Contains = 3, - Covers = 4, - Intersects = 5, - Within = 6, - CoveredBy = 7, -} + pub fn refiner_clear(&mut self) -> Result<()> { + #[cfg(not(gpu_available))] + { + Err(GpuSpatialError::GpuNotAvailable) + } + #[cfg(gpu_available)] + { + let refiner = self + .refiner + .as_mut() + .ok_or_else(|| GpuSpatialError::Init("GPU refiner is not available".into()))?; -#[cfg(gpu_available)] -impl From<SpatialPredicate> for GpuSpatialPredicateWrapper { - fn from(pred: SpatialPredicate) -> Self { - match pred { - SpatialPredicate::Equals => GpuSpatialPredicateWrapper::Equals, - SpatialPredicate::Disjoint => GpuSpatialPredicateWrapper::Disjoint, - SpatialPredicate::Touches => GpuSpatialPredicateWrapper::Touches, - SpatialPredicate::Contains => GpuSpatialPredicateWrapper::Contains, - SpatialPredicate::Covers => GpuSpatialPredicateWrapper::Covers, - SpatialPredicate::Intersects => GpuSpatialPredicateWrapper::Intersects, - SpatialPredicate::Within => GpuSpatialPredicateWrapper::Within, - SpatialPredicate::CoveredBy => GpuSpatialPredicateWrapper::CoveredBy, + // Clear previous build data + refiner.clear(); + Ok(()) } } -} -// Cleanup implementation -impl Drop for GpuSpatialContext { - fn drop(&mut self) { + pub fn refiner_push_build(&mut self, array: &arrow_array::ArrayRef) -> Result<()> { + #[cfg(not(gpu_available))] + { + let _ = array; + Err(GpuSpatialError::GpuNotAvailable) + } #[cfg(gpu_available)] { - if let (Some(mut joiner), Some(mut ctx)) = (self.joiner.take(), self.context.take()) { - joiner.destroy_context(&mut ctx); - joiner.release(); - } + let refiner = self + .refiner + .as_ref() + .ok_or_else(|| GpuSpatialError::Init("GPU refiner not available".into()))?; + + refiner.push_build(array) + } + } + + pub fn refiner_finish_building(&mut self) -> Result<()> { + #[cfg(not(gpu_available))] + { + Err(GpuSpatialError::GpuNotAvailable) + } + #[cfg(gpu_available)] + { + let refiner = self + .refiner + .as_mut() + .ok_or_else(|| GpuSpatialError::Init("GPU refiner not available".into()))?; + + refiner.finish_building() + } + } + + pub fn refine_loaded( + &self, + probe_array: &arrow_array::ArrayRef, + predicate: GpuSpatialRelationPredicate, + build_indices: &mut Vec<u32>, + probe_indices: &mut Vec<u32>, + ) -> Result<()> { + #[cfg(not(gpu_available))] + { + let _ = (probe_array, predicate, build_indices, probe_indices); + Err(GpuSpatialError::GpuNotAvailable) + } + #[cfg(gpu_available)] + { + let refiner = self + .refiner + .as_ref() + .ok_or_else(|| GpuSpatialError::Init("GPU refiner not available".into()))?; + + refiner.refine_loaded( + probe_array, + GpuSpatialRelationPredicateWrapper::from(predicate), + build_indices, + probe_indices, + ) + } + } + + pub fn refine( + &self, + array1: &arrow_array::ArrayRef, + array2: &arrow_array::ArrayRef, + predicate: GpuSpatialRelationPredicate, + indices1: &mut Vec<u32>, + indices2: &mut Vec<u32>, + ) -> Result<()> { + #[cfg(not(gpu_available))] + { + let _ = (array1, array2, predicate, indices1, indices2); + Err(GpuSpatialError::GpuNotAvailable) + } + #[cfg(gpu_available)] + { + let refiner = self + .refiner + .as_ref() + .ok_or_else(|| GpuSpatialError::Init("GPU refiner not available".into()))?; + + refiner.refine( + array1, + array2, + GpuSpatialRelationPredicateWrapper::from(predicate), + indices1, + indices2, + ) } } } +#[cfg(gpu_available)] #[cfg(test)] mod tests { use super::*; + use geo::{BoundingRect, Intersects, Point, Polygon}; + use sedona_expr::scalar_udf::SedonaScalarUDF; + use sedona_geos::register::scalar_kernels; + use sedona_schema::crs::lnglat; + use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY}; + use sedona_testing::create::create_array_storage; + use sedona_testing::testers::ScalarUdfTester; + use wkt::TryFromWkt; + + pub fn find_intersection_pairs( + vec_a: &[Rect<f32>], + vec_b: &[Rect<f32>], + ) -> (Vec<u32>, Vec<u32>) { + let mut ids_a = Vec::new(); + let mut ids_b = Vec::new(); + + // Iterate through A with index 'i' + for (i, rect_a) in vec_a.iter().enumerate() { + // Only proceed if 'a' exists + // Iterate through B with index 'j' + for (j, rect_b) in vec_b.iter().enumerate() { + // Check if 'b' exists and intersects 'a' + if rect_a.intersects(rect_b) { + ids_a.push(i as u32); + ids_b.push(j as u32); + } + } + } + (ids_a, ids_b) + } #[test] - fn test_context_creation() { - let ctx = GpuSpatialContext::new(); - #[cfg(gpu_available)] - assert!(ctx.is_ok()); - #[cfg(not(gpu_available))] - assert!(ctx.is_err()); + fn test_spatial_index() { + let mut gs = GpuSpatial::new().unwrap(); + let options = GpuSpatialOptions { + concurrency: 1, + device_id: 0, + compress_bvh: false, + pipeline_batches: 1, + cuda_use_memory_pool: true, + cuda_memory_pool_init_percent: 10, + }; + gs.init(options).expect("Failed to initialize GpuSpatial"); + + let polygon_values = &[ + Some("POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))"), + Some("POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))"), + Some("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0), (2 2, 3 2, 3 3, 2 3, 2 2), (6 6, 8 6, 8 8, 6 8, 6 6))"), + Some("POLYGON ((30 0, 60 20, 50 50, 10 50, 0 20, 30 0), (20 30, 25 40, 15 40, 20 30), (30 30, 35 40, 25 40, 30 30), (40 30, 45 40, 35 40, 40 30))"), + Some("POLYGON ((40 0, 50 30, 80 20, 90 70, 60 90, 30 80, 20 40, 40 0), (50 20, 65 30, 60 50, 45 40, 50 20), (30 60, 50 70, 45 80, 30 60))"), + ]; + let rects: Vec<Rect<f32>> = polygon_values + .iter() + .filter_map(|opt_wkt| { + let wkt_str = opt_wkt.as_ref()?; + let polygon: Polygon<f32> = Polygon::try_from_wkt_str(wkt_str).ok()?; + + polygon.bounding_rect() + }) + .collect(); + gs.index_push_build(&rects) + .expect("Failed to push build data"); + gs.index_finish_building() + .expect("Failed to finish building"); + let point_values = &[ + Some("POINT (30 20)"), + Some("POINT (20 20)"), + Some("POINT (1 1)"), + Some("POINT (70 70)"), + Some("POINT (55 35)"), + ]; + let points: Vec<Rect<f32>> = point_values + .iter() + .map(|opt_wkt| -> Rect<f32> { + let wkt_str = opt_wkt.unwrap(); + let point: Point<f32> = Point::try_from_wkt_str(wkt_str).ok().unwrap(); + point.bounding_rect() + }) + .collect(); + let (mut build_indices, mut probe_indices) = gs.probe(&points).unwrap(); + build_indices.sort(); + probe_indices.sort(); + + let (mut ans_build_indices, mut ans_probe_indices) = + find_intersection_pairs(&rects, &points); + + ans_build_indices.sort(); + ans_probe_indices.sort(); + + assert_eq!(build_indices, ans_build_indices); + assert_eq!(probe_indices, ans_probe_indices); + } + + #[test] + fn test_spatial_refiner() { + let mut gs = GpuSpatial::new().unwrap(); + let options = GpuSpatialOptions { + concurrency: 1, + device_id: 0, + compress_bvh: false, + pipeline_batches: 1, + cuda_use_memory_pool: true, + cuda_memory_pool_init_percent: 10, + }; + gs.init(options).expect("Failed to initialize GpuSpatial"); + + let polygon_values = &[ + Some("POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))"), + Some("POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))"), + Some("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0), (2 2, 3 2, 3 3, 2 3, 2 2), (6 6, 8 6, 8 8, 6 8, 6 6))"), + Some("POLYGON ((30 0, 60 20, 50 50, 10 50, 0 20, 30 0), (20 30, 25 40, 15 40, 20 30), (30 30, 35 40, 25 40, 30 30), (40 30, 45 40, 35 40, 40 30))"), + Some("POLYGON ((40 0, 50 30, 80 20, 90 70, 60 90, 30 80, 20 40, 40 0), (50 20, 65 30, 60 50, 45 40, 50 20), (30 60, 50 70, 45 80, 30 60))"), + ]; + let polygons = create_array_storage(polygon_values, &WKB_GEOMETRY); + + let rects: Vec<Rect<f32>> = polygon_values + .iter() + .map(|opt_wkt| -> Rect<f32> { + let wkt_str = opt_wkt.unwrap(); + let polygon: Polygon<f32> = Polygon::try_from_wkt_str(wkt_str).ok().unwrap(); + polygon.bounding_rect().unwrap() + }) + .collect(); + gs.index_push_build(&rects) + .expect("Failed to push build data"); + gs.index_finish_building() + .expect("Failed to finish building"); + let point_values = &[ + Some("POINT (30 20)"), + Some("POINT (20 20)"), + Some("POINT (1 1)"), + Some("POINT (70 70)"), + Some("POINT (55 35)"), + ]; + let points = create_array_storage(point_values, &WKB_GEOMETRY); + let point_rects: Vec<Rect<f32>> = point_values + .iter() + .map(|wkt| -> Rect<f32> { + let wkt_str = wkt.unwrap(); + + let point: Point<f32> = Point::try_from_wkt_str(wkt_str).unwrap(); + + point.bounding_rect() + }) + .collect(); + let (mut build_indices, mut probe_indices) = gs.probe(&point_rects).unwrap(); + + gs.refine( + &polygons, + &points, + GpuSpatialRelationPredicate::Intersects, + &mut build_indices, + &mut probe_indices, + ) + .expect("Failed to refine results"); + + build_indices.sort(); + probe_indices.sort(); + + let kernels = scalar_kernels(); + + // Iterate through the vector and find the one named "st_intersects" + let st_intersects = kernels + .into_iter() + .find(|(name, _)| *name == "st_intersects") + .map(|(_, kernel_ref)| kernel_ref) + .unwrap(); + + let sedona_type = SedonaType::Wkb(Edges::Planar, lnglat()); + let udf = SedonaScalarUDF::from_impl("st_intersects", st_intersects); + let tester = + ScalarUdfTester::new(udf.into(), vec![sedona_type.clone(), sedona_type.clone()]); + + let mut ans_build_indices: Vec<u32> = Vec::new(); + let mut ans_probe_indices: Vec<u32> = Vec::new(); + + for (poly_index, poly) in polygon_values.iter().enumerate() { + for (point_index, point) in point_values.iter().enumerate() { + let result = tester + .invoke_scalar_scalar(poly.unwrap(), point.unwrap()) + .unwrap(); + if result == true.into() { + ans_build_indices.push(poly_index as u32); + ans_probe_indices.push(point_index as u32); + } + } + } + + ans_build_indices.sort(); + ans_probe_indices.sort(); Review Comment: It might be nice to encapsulate this as a standalone function generating the output index pairs to avoid breaking up the flow of the test. ########## c/sedona-libgpuspatial/libgpuspatial/include/gpuspatial/gpuspatial_c.h: ########## @@ -14,60 +14,162 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#include <stdbool.h> #include <stdint.h> #ifdef __cplusplus extern "C" { #endif -struct GpuSpatialJoinerConfig { - uint32_t concurrency; +struct ArrowSchema; +struct ArrowArray; + +// Interfaces for ray-tracing engine (OptiX) +struct GpuSpatialRuntimeConfig { + /** Path to PTX files */ const char* ptx_root; + /** Device ID to use, 0 is the first GPU */ + int device_id; + /** Whether to use CUDA memory pool for allocations */ + bool use_cuda_memory_pool; + /** Ratio of initial memory pool size to total GPU memory, between 0 and 100 */ + int cuda_memory_pool_init_precent; }; -struct GpuSpatialJoinerContext { - const char* last_error; // Pointer to std::string to store last error message - void* private_data; // GPUSpatial context - void* build_indices; // Pointer to std::vector<uint32_t> to store results - void* stream_indices; +/** Opaque runtime for GPU spatial operations + * Each process should only has one instance of GpuSpatialRuntimeexactly + * + */ +struct GpuSpatialRuntime { + /** Initialize the runtime (OptiX) with the given configuration + * @return 0 on success, non-zero on failure + */ + int (*init)(struct GpuSpatialRuntime* self, struct GpuSpatialRuntimeConfig* config); + void (*release)(struct GpuSpatialRuntime* self); + const char* (*get_last_error)(struct GpuSpatialRuntime* self); + void* private_data; }; -enum GpuSpatialPredicate { - GpuSpatialPredicateEquals = 0, - GpuSpatialPredicateDisjoint, - GpuSpatialPredicateTouches, - GpuSpatialPredicateContains, - GpuSpatialPredicateCovers, - GpuSpatialPredicateIntersects, - GpuSpatialPredicateWithin, - GpuSpatialPredicateCoveredBy +/** Create an instance of GpuSpatialRuntime */ +void GpuSpatialRuntimeCreate(struct GpuSpatialRuntime* runtime); + +struct GpuSpatialIndexConfig { + /** Pointer to an initialized GpuSpatialRuntime struct */ + struct GpuSpatialRuntime* runtime; + /** How many threads will concurrently call Probe method */ + uint32_t concurrency; +}; + +// An opaque context for concurrent probing +struct SedonaSpatialIndexContext { + void* private_data; +}; + +struct SedonaFloatIndex2D { + /** Clear the spatial index, removing all built data */ + int (*clear)(struct SedonaFloatIndex2D* self); + /** Create a new context for concurrent probing */ + void (*create_context)(struct SedonaSpatialIndexContext* context); + /** Destroy a previously created context */ + void (*destroy_context)(struct SedonaSpatialIndexContext* context); + /** Push rectangles for building the spatial index, each rectangle is represented by 4 + * floats: [min_x, min_y, max_x, max_y] Points can also be indexed by providing [x, y, + * x, y] but points and rectangles cannot be mixed + * + * @return 0 on success, non-zero on failure + */ + int (*push_build)(struct SedonaFloatIndex2D* self, const float* buf, uint32_t n_rects); + /** + * Finish building the spatial index after all rectangles have been pushed + * + * @return 0 on success, non-zero on failure + */ + int (*finish_building)(struct SedonaFloatIndex2D* self); + /** + * Probe the spatial index with the given rectangles, each rectangle is represented by 4 + * floats: [min_x, min_y, max_x, max_y] Points can also be probed by providing [x, y, x, + * y] but points and rectangles cannot be mixed in one Probe call. The results of the + * probe will be stored in the context. + * + * @return 0 on success, non-zero on failure + */ + int (*probe)(struct SedonaFloatIndex2D* self, struct SedonaSpatialIndexContext* context, + const float* buf, uint32_t n_rects); + /** Get the build indices buffer from the context + * + * @return A pointer to the buffer and its length + */ + void (*get_build_indices_buffer)(struct SedonaSpatialIndexContext* context, + uint32_t** build_indices, + uint32_t* build_indices_length); + /** Get the probe indices buffer from the context + * + * @return A pointer to the buffer and its length + */ + void (*get_probe_indices_buffer)(struct SedonaSpatialIndexContext* context, + uint32_t** probe_indices, + uint32_t* probe_indices_length); + const char* (*get_last_error)(struct SedonaFloatIndex2D* self); + const char* (*context_get_last_error)(struct SedonaSpatialIndexContext* context); + /** Release the spatial index and free all resources */ + void (*release)(struct SedonaFloatIndex2D* self); + void* private_data; }; -struct GpuSpatialJoiner { - int (*init)(struct GpuSpatialJoiner* self, struct GpuSpatialJoinerConfig* config); - void (*clear)(struct GpuSpatialJoiner* self); - void (*create_context)(struct GpuSpatialJoiner* self, - struct GpuSpatialJoinerContext* context); - void (*destroy_context)(struct GpuSpatialJoinerContext* context); - int (*push_build)(struct GpuSpatialJoiner* self, const struct ArrowSchema* schema, - const struct ArrowArray* array, int64_t offset, int64_t length); - int (*finish_building)(struct GpuSpatialJoiner* self); - int (*push_stream)(struct GpuSpatialJoiner* self, - struct GpuSpatialJoinerContext* context, - const struct ArrowSchema* schema, const struct ArrowArray* array, - int64_t offset, int64_t length, enum GpuSpatialPredicate predicate, - int32_t array_index_offset); - void (*get_build_indices_buffer)(struct GpuSpatialJoinerContext* context, - void** build_indices, uint32_t* build_indices_length); - void (*get_stream_indices_buffer)(struct GpuSpatialJoinerContext* context, - void** stream_indices, - uint32_t* stream_indices_length); - void (*release)(struct GpuSpatialJoiner* self); +int GpuSpatialIndexFloat2DCreate(struct SedonaFloatIndex2D* index, + const struct GpuSpatialIndexConfig* config); + +struct GpuSpatialRefinerConfig { + /** Pointer to an initialized GpuSpatialRuntime struct */ + struct GpuSpatialRuntime* runtime; + /** How many threads will concurrently call Probe method */ + uint32_t concurrency; + /** Whether to compress the BVH structures to save memory */ + bool compress_bvh; + /** Number of batches to pipeline for parsing and refinement; setting to 1 disables + * pipelining */ + uint32_t pipeline_batches; +}; + +enum SedonaSpatialRelationPredicate { + SedonaSpatialPredicateEquals = 0, + SedonaSpatialPredicateDisjoint, + SedonaSpatialPredicateTouches, + SedonaSpatialPredicateContains, + SedonaSpatialPredicateCovers, + SedonaSpatialPredicateIntersects, + SedonaSpatialPredicateWithin, + SedonaSpatialPredicateCoveredBy +}; + +struct SedonaSpatialRefiner { + int (*clear)(struct SedonaSpatialRefiner* self); + + int (*push_build)(struct SedonaSpatialRefiner* self, + const struct ArrowSchema* build_schema, + const struct ArrowArray* build_array); + + int (*finish_building)(struct SedonaSpatialRefiner* self); + + int (*refine_loaded)(struct SedonaSpatialRefiner* self, + const struct ArrowSchema* probe_schema, + const struct ArrowArray* probe_array, + enum SedonaSpatialRelationPredicate predicate, + uint32_t* build_indices, uint32_t* probe_indices, + uint32_t indices_size, uint32_t* new_indices_size); + + int (*refine)(struct SedonaSpatialRefiner* self, const struct ArrowSchema* schema1, + const struct ArrowArray* array1, const struct ArrowSchema* schema2, + const struct ArrowArray* array2, + enum SedonaSpatialRelationPredicate predicate, uint32_t* indices1, + uint32_t* indices2, uint32_t indices_size, uint32_t* new_indices_size); Review Comment: Some documentation for these parameters would be helpful (are these schemas the same as probe/build?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
