Kontinuation opened a new issue, #436:
URL: https://github.com/apache/sedona-db/issues/436
## Summary
SedonaDB's `SpatialJoinExec` currently buffers the entire build side, builds
a single in-memory R-tree, and streams the probe side. This approach works well
when the build side and its index fit in memory but fails with memory
reservation failures or OOM errors for large datasets.
We propose introducing a **spatial-partitioned out-of-core execution path**
that keeps memory usage bounded by:
1. Partitioning the build side into disjoint partitions that each fit in
memory.
2. Building one in-memory R-tree per partition and probing it before moving
to the next partition.
The design intentionally mirrors hybrid/grace hash join but adapts it to
SedonaDB’s spatial partitioning requirements.
## Design Goals
* **Work within data larger than memory budget**: Enable joins where the
total size of the build side exceed memory budget, just like other spillable
operators in DataFusion.
* **Do not compromise performance when memory is sufficient**: Preserve
the performance of the fast in-memory path when the build side fits, and only
switch to spatial partitioned execution when it does not.
* **Full Join Support**: Maintain support for all join types (inner,
left/right outer/semi/anti, full outer).
## Proposed Design
### Execution Modes
`SpatialJoinExec::execute` will be updated to decide between two modes based
on memory estimates:
* **In-memory mode** (Current behavior): Build one R-tree and stream probe
rows. Used when the build side fits in the memory budget.
* **Partitioned mode** (Proposed): Triggered when the estimated memory for
the build index exceeds the budget. Partitions are processed sequentially, with
each probe thread handling one partition at a time.
The actual implementation does not have to be split into two separate code
paths; rather, we can treat the in-memory mode as a special case of partitioned
mode where the number of spatial partitions is 1. We do need to ensure that we
don't incur the overhead of spilling and repartitioning in this case.
### Partitioning Model
We will introduce a `SpatialPartition` concept with three variants:
* `Regular(u32)`: A regular partition ID in `[0, num_partitions)`.
* `None`: The row does not intersect any partition boundary.
* `Multi`: The row intersects multiple partitions.
A `SpatialPartitioner` trait will be implemented to:
1. Partition the build-side bounding box into `M` regular regions (e.g.,
using a KDB tree).
2. Assign each build row to **exactly one** `Regular(i)` partition.
3. Assign probe rows to:
* `Regular(i)` if they intersect exactly one region.
* `None` if they intersect no partition.
* `Multi` if they intersect multiple regions.
This approach avoids duplicating build or probe rows at the cost of
potentially re-reading `Multi` rows multiple times (bounded by `M`).
The `SpatialPartitioner` is usually constructed from sampled bounding boxes
of the build side. The approach is similar to [Apache Sedona's KDB tree
partitioner](https://github.com/apache/sedona/blob/1909171502fe16cd3d7dec6128b9b29e372819da/spark/common/src/main/java/org/apache/sedona/core/spatialPartitioning/KDB.java).
### Algorithm Steps
Given build input `B` and probe input `P`:
1. **Collect Build Side**:
* Collect batches from the build side.
* Sample bounding boxes to construct the partitioner.
* **Memory Reservation**: Reserve memory for collected build side
batches and extra memory required for loading the spatial index. If memory
reservation failed during collection, spill all batches to disk. We pushes the
memory reservation watermark as batches are collected. This high-water mark is
retained (reservation is not freed) to ensure sufficient memory is available
for subsequent index building.
2. **Decide the Number of Spatial Partitions**:
* **Spatial Partition Sizing**: Calculate the required number of
partitions to ensure each partition fits in the memory budget. The number of
partitions is chosen specifically so that the estimated index size for any
single partition does not exceed the reserved memory watermark established in
Step 1.
3. **Partition Build Side**:
* Construct the partitioner (e.g., `KDBPartitioner`) from sampled
bounding boxes.
* Repartition the collected build data into separate spill files for
each partition.
* **Probe Side Partitioner**: Create a probe-side partitioner using
the *actual boundaries* of the build-side spatial partitions.
4. **Process Partitions Sequentially**:
* For each partition `i`:
1. **Build Index**: Load the partition data from disk and build an
in-memory R-tree.
2. **Probe Pass**:
* **First Pass (Partition 0)**: Process the original probe
stream. Rows matching `Regular(0)` and `None` partitions are probed
immediately. Rows belonging to other partitions (`Regular(k where k > 0)` or
`Multi`) are spilled to disk.
* **Subsequent Passes**: Read the spilled probe batches for
the current partition.
3. **Multi Partition Pass**: After each regular partition is
processed, process rows classified as `Multi`.
4. **Cleanup**: Drop the index to free memory before moving to the
next partition.
5. **Handle Unmatched Rows (Outer Joins)**:
* **Build Side**: Track visited rows using bitmaps. After the last
probe stream completes for a partition, emit unmatched build rows.
* **Probe Side**: Use bitmaps to track matches for `Multi` partition
rows across all partitions. Emit unmatched probe rows during the last `Multi`
partition pass.
### Visualizing the Algorithm
This figure shows the spatial partitioning schemes for the build and probe
sides:

1. A KDB tree is built to partition the build side into 4 regular partitions
(0-3).
2. The partitioner for the probe side is constructed from the actual
boundaries of the build partitions. These boundaries may be larger or smaller
than the original KDB partitions. The boundaries may also overlap with each
other or have gaps.
3. Probe rows are assigned to partitions based on their intersection with
the probe-side partition boundaries.
### Notable Design Decisions
#### Sedona style spatial partitioning (with duplication) vs. proposed
partitioning scheme
Sedona's existing spatial partitioning duplicates rows that overlap multiple
partitions, which is a valid design choice for distributing the spatial join
workload to multiple executors that do not share any data. In contrast, our
proposed scheme avoids duplication by picking one of the intersected partitions
for build side and using `Multi` partition for probe side, which is more
suitable for a single-node execution engine like SedonaDB. This design makes
implementation of outer joins more straightforward, and reduces the risk of
excessive data duplication caused by large geometries.
A degenerate case is that the spatial partitions for partitioning the probe
side has too much overlap with each other and most of the probe rows end up in
the `Multi` partition. In this case, the performance will degrade to no worse
than running multiple in-memory spatial joins sequentially by splitting the
build side into multiple pieces, which should be acceptable.
The following figure illustrates the difference between Sedona's build-side
partitioning scheme (approach 1) and the proposed build-side partitioning
scheme (approach 2).

### API
We need to expose APIs for setting the memory limit of the SedonaContext:
* **Rust API**: `SedonaContext` need to have a new constructor for setting
the [DataFusion
`RuntimeEnv`](https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnv.html).
This allows configuring the memory pool with a specific memory limit as well
as the temporary directory for spill files.
* **Python API**: Update `sedona.connect` function and the `SedonaContext`
constructor to accept an optional `memory_limit` parameter (in bytes) and pass
it to the underlying Rust `SedonaContext`.
* **R API**: The R API reuses a [global singleton
`SedonaContext`](https://github.com/apache/sedona-db/blob/f38e00f378a6a1e1dea4bcab3c7d0494cebb039e/r/sedonadb/R/context.R#L100-L112).
We can add a new function `sd_ctx_config` to set configuration such as the
memory limit for the global context. This function should be called before any
Sedona operations to take effect.
## Implementation Plan
We plan to implement this feature in a series of self-contained PRs:
1. **Partitioning Primitives & Config**: Implement `SpatialPartition` enum,
`SpatialPartitioner` trait, KDB-tree partitioner, and configuration options.
2. **Bounding Box Sampler**: Implement a bounding box sampler to collect
bounding boxes from the build side for partitioner construction.
3. **Spilling Primitives**: Implement spill writers and readers for build
and probe batches
([`EvaluatedBatch`](https://github.com/apache/sedona-db/blob/f38e00f378a6a1e1dea4bcab3c7d0494cebb039e/rust/sedona-spatial-join/src/evaluated_batch.rs)).
4. **Revisit Memory Reservation Logic**: Update the memory reservation
logic in `SpatialJoinExec` to reserve memory during build side collection and
retain the watermark for the rest of the execution.
5. **Build-side Collection & Memory Decision**: Implement the collector
with spilling support and the memory estimation logic to trigger partitioned
mode.
6. **Repartitioner**: A repartitioner for writing a stream of batches into
multiple spill files, each corresponding to a spatial partition.
7. **Spatial index provider**: A provider for building spatial index for a
given partition. Manages synchronization between probe threads.
8. **Probe-side stream provider**: Implement the probe stream providers
that handle repartitioning and spilling probe rows and reading them back for
specific partitions.
9. **Partition Iteration & Execution**: Implement the state machine to
iterate through partitions. This step puts everything together to make inner
joins and left outer joins work.
10. **Outer Join Support**: Add the necessary bookkeeping (bitmaps) to
track visited rows in the `Multi` partition of probe side and handle unmatched
records for all kinds of right outer joins.
11. **KNN Join Support**: Extend the partitioned join to support KNN joins.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]