Kontinuation opened a new issue, #436:
URL: https://github.com/apache/sedona-db/issues/436

   ## Summary
   
   SedonaDB's `SpatialJoinExec` currently buffers the entire build side, builds 
a single in-memory R-tree, and streams the probe side. This approach works well 
when the build side and its index fit in memory but fails with memory 
reservation failures or OOM errors for large datasets.
   
   We propose introducing a **spatial-partitioned out-of-core execution path** 
that keeps memory usage bounded by:
   
   1.  Partitioning the build side into disjoint partitions that each fit in 
memory.
   2.  Building one in-memory R-tree per partition and probing it before moving 
to the next partition.
   
   The design intentionally mirrors hybrid/grace hash join but adapts it to 
SedonaDB’s spatial partitioning requirements.
   
   ## Design Goals
   
   *   **Work within data larger than memory budget**: Enable joins where the 
total size of the build side exceed memory budget, just like other spillable 
operators in DataFusion.
   *   **Do not compromise performance when memory is sufficient**: Preserve 
the performance of the fast in-memory path when the build side fits, and only 
switch to spatial partitioned execution when it does not.
   *   **Full Join Support**: Maintain support for all join types (inner, 
left/right outer/semi/anti, full outer).
   
   ## Proposed Design
   
   ### Execution Modes
   
   `SpatialJoinExec::execute` will be updated to decide between two modes based 
on memory estimates:
   
   *   **In-memory mode** (Current behavior): Build one R-tree and stream probe 
rows. Used when the build side fits in the memory budget.
   *   **Partitioned mode** (Proposed): Triggered when the estimated memory for 
the build index exceeds the budget. Partitions are processed sequentially, with 
each probe thread handling one partition at a time.
   
   The actual implementation does not have to be split into two separate code 
paths; rather, we can treat the in-memory mode as a special case of partitioned 
mode where the number of spatial partitions is 1. We do need to ensure that we 
don't incur the overhead of spilling and repartitioning in this case.
   
   ### Partitioning Model
   
   We will introduce a `SpatialPartition` concept with three variants:
   
   *   `Regular(u32)`: A regular partition ID in `[0, num_partitions)`.
   *   `None`: The row does not intersect any partition boundary.
   *   `Multi`: The row intersects multiple partitions.
   
   A `SpatialPartitioner` trait will be implemented to:
   1.  Partition the build-side bounding box into `M` regular regions (e.g., 
using a KDB tree).
   2.  Assign each build row to **exactly one** `Regular(i)` partition.
   3.  Assign probe rows to:
       *   `Regular(i)` if they intersect exactly one region.
       *   `None` if they intersect no partition.
       *   `Multi` if they intersect multiple regions.
   
   This approach avoids duplicating build or probe rows at the cost of 
potentially re-reading `Multi` rows multiple times (bounded by `M`).
   
   The `SpatialPartitioner` is usually constructed from sampled bounding boxes 
of the build side. The approach is similar to [Apache Sedona's KDB tree 
partitioner](https://github.com/apache/sedona/blob/1909171502fe16cd3d7dec6128b9b29e372819da/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/KDB.java).
   
   ### Algorithm Steps
   
   Given build input `B` and probe input `P`:
   
   1.  **Collect Build Side**:
       *   Collect batches from the build side.
       *   Sample bounding boxes to construct the partitioner.
       *   **Memory Reservation**:  Reserve memory for collected build side 
batches and extra memory required for loading the spatial index. If memory 
reservation failed during collection, spill all batches to disk. We pushes the 
memory reservation watermark as batches are collected. This high-water mark is 
retained (reservation is not freed) to ensure sufficient memory is available 
for subsequent index building.
   
   2.  **Decide the Number of Spatial Partitions**:
       *   **Spatial Partition Sizing**: Calculate the required number of 
partitions to ensure each partition fits in the memory budget. The number of 
partitions is chosen specifically so that the estimated index size for any 
single partition does not exceed the reserved memory watermark established in 
Step 1.
   
   3.  **Partition Build Side**:
       *   Construct the partitioner (e.g., `KDBPartitioner`) from sampled 
bounding boxes.
       *   Repartition the collected build data into separate spill files for 
each partition.
       *   **Probe Side Partitioner**: Create a probe-side partitioner using 
the *actual boundaries* of the build-side spatial partitions.
   
   4.  **Process Partitions Sequentially**:
       *   For each partition `i`:
           1.  **Build Index**: Load the partition data from disk and build an 
in-memory R-tree.
           2.  **Probe Pass**:
               *   **First Pass (Partition 0)**: Process the original probe 
stream. Rows matching `Regular(0)` and `None` partitions are probed 
immediately. Rows belonging to other partitions (`Regular(k where k > 0)` or 
`Multi`) are spilled to disk.
               *   **Subsequent Passes**: Read the spilled probe batches for 
the current partition.
           3.  **Multi Partition Pass**: After each regular partition is 
processed, process rows classified as `Multi`.
           4.  **Cleanup**: Drop the index to free memory before moving to the 
next partition.
   
   5.  **Handle Unmatched Rows (Outer Joins)**:
       *   **Build Side**: Track visited rows using bitmaps. After the last 
probe stream completes for a partition, emit unmatched build rows.
       *   **Probe Side**: Use bitmaps to track matches for `Multi` partition 
rows across all partitions. Emit unmatched probe rows during the last `Multi` 
partition pass.
   
   ### Visualizing the Algorithm
   
   This figure shows the spatial partitioning schemes for the build and probe 
sides:
   
   
![Image](https://github.com/user-attachments/assets/c72b59d1-d1cf-4ffd-8be4-9acedc4e0ee2)
   
   1. A KDB tree is built to partition the build side into 4 regular partitions 
(0-3).
   2. The partitioner for the probe side is constructed from the actual 
boundaries of the build partitions. These boundaries may be larger or smaller 
than the original KDB partitions. The boundaries may also overlap with each 
other or have gaps.
   3. Probe rows are assigned to partitions based on their intersection with 
the probe-side partition boundaries.
   
   ### Notable Design Decisions
   
   #### Sedona style spatial partitioning (with duplication) vs. proposed 
partitioning scheme
   
   Sedona's existing spatial partitioning duplicates rows that overlap multiple 
partitions, which is a valid design choice for distributing the spatial join 
workload to multiple executors that do not share any data. In contrast, our 
proposed scheme avoids duplication by picking one of the intersected partitions 
for build side and using `Multi` partition for probe side, which is more 
suitable for a single-node execution engine like SedonaDB. This design makes 
implementation of outer joins more straightforward, and reduces the risk of 
excessive data duplication caused by large geometries.
   
   A degenerate case is that the spatial partitions for partitioning the probe 
side has too much overlap with each other and most of the probe rows end up in 
the `Multi` partition. In this case, the performance will degrade to no worse 
than running multiple in-memory spatial joins sequentially by splitting the 
build side into multiple pieces, which should be acceptable.
   
   The following figure illustrates the difference between Sedona's build-side 
partitioning scheme (approach 1) and the proposed build-side partitioning 
scheme (approach 2).
   
   
![Image](https://github.com/user-attachments/assets/f4b81a59-105c-4631-874b-ac51ef3cf06a)
   
   ### API
   
   We need to expose APIs for setting the memory limit of the SedonaContext:
   
   * **Rust API**: `SedonaContext` need to have a new constructor for setting 
the [DataFusion 
`RuntimeEnv`](https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnv.html).
 This allows configuring the memory pool with a specific memory limit as well 
as the temporary directory for spill files.
   * **Python API**: Update `sedona.connect` function and the `SedonaContext` 
constructor to accept an optional `memory_limit` parameter (in bytes) and pass 
it to the underlying Rust `SedonaContext`.
   * **R API**: The R API reuses a [global singleton 
`SedonaContext`](https://github.com/apache/sedona-db/blob/f38e00f378a6a1e1dea4bcab3c7d0494cebb039e/r/sedonadb/R/context.R#L100-L112).
 We can add a new function `sd_ctx_config` to set configuration such as the 
memory limit for the global context. This function should be called before any 
Sedona operations to take effect.
   
   ## Implementation Plan
   
   We plan to implement this feature in a series of self-contained PRs:
   
   1.  **Partitioning Primitives & Config**: Implement `SpatialPartition` enum, 
`SpatialPartitioner` trait, KDB-tree partitioner, and configuration options.
   2.  **Bounding Box Sampler**: Implement a bounding box sampler to collect 
bounding boxes from the build side for partitioner construction.
   3.  **Spilling Primitives**: Implement spill writers and readers for build 
and probe batches 
([`EvaluatedBatch`](https://github.com/apache/sedona-db/blob/f38e00f378a6a1e1dea4bcab3c7d0494cebb039e/rust/sedona-spatial-join/src/evaluated_batch.rs)).
   4.  **Revisit Memory Reservation Logic**: Update the memory reservation 
logic in `SpatialJoinExec` to reserve memory during build side collection and 
retain the watermark for the rest of the execution.
   5.  **Build-side Collection & Memory Decision**: Implement the collector 
with spilling support and the memory estimation logic to trigger partitioned 
mode.
   6.  **Repartitioner**: A repartitioner for writing a stream of batches into 
multiple spill files, each corresponding to a spatial partition.
   7.  **Spatial index provider**: A provider for building spatial index for a 
given partition. Manages synchronization between probe threads.
   8.  **Probe-side stream provider**: Implement the probe stream providers 
that handle repartitioning and spilling probe rows and reading them back for 
specific partitions.
   9.  **Partition Iteration & Execution**: Implement the state machine to 
iterate through partitions. This step puts everything together to make inner 
joins and left outer joins work.
   10.  **Outer Join Support**: Add the necessary bookkeeping (bitmaps) to 
track visited rows in the `Multi` partition of probe side and handle unmatched 
records for all kinds of right outer joins.
   11.  **KNN Join Support**: Extend the partitioned join to support KNN joins.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to