jonathanc-n opened a new issue, #17267: URL: https://github.com/apache/datafusion/issues/17267
### Is your feature request related to a problem or challenge? I wanted to share the idea for hash join spilling here as I would like to get input on this as hash join spilling is one of the core functionalities for query engines. ## Hybrid Hash Join The core idea follows a hybrid hash join -> track memory for partitions + spill the largest batches until memory is back under a threshold. ## Build Phase During build phase we collect each batch we use each batch to update the hash map before iterating to the next one. The order would be as such: 1. Partition the build side values. Every partition is to have its memory tracked to check for spilling (partitioned hash join problems are discussed below). 2. If there is a spill, we calculate the partitions that will be spilled greedily until we are under *~80%* (configurable) of the threshold. The partitions that are spilled will be recorded to later specify which partitions need to be spilled at the end of the build process process + to know which partitioned probe batches need to be spilled (keeping one spill file per partition). 3. Resume as normal -> use the remaining values to update the hash table. Memory threshold should also be made to be *around 90%* of memory for transient buffers when spilling. ## Probe phase As each probe-side batch arrives, compute its hash values on the join key(s) using the same hash function/partitioning logic as the build. For each probe row, determine its partition. These are the two scenarios occur: If the partition was *not spilled* in the build phase (meaning the build side’s corresponding partition hash table is in memory), the in-memory hash table is probed to find matches and produce joined output rows. If the partition was spilled on the build side, then the build side’s rows for that partition are on disk (not in the hash table). In this case, the probe row is spilled into a probe-side spill file for that partition. This defers processing this partition’s matches until later, so we write all probe-side rows for that partition to disk. If no partitions were spilled during build, then we skip any partitioning overhead on the probe side altogether and just do a normal hash join in memory for all batches. ## Recursive hash join spilling Do the same thing in the probe phase, check if there is spilled data again. ## Problems ### 1. Splitting `RecordBatch` We would need to do two `take()` operations on the `RecordBatch`, one for spilled and another for the batch kept in memory. I might need someone with more expertise on Arrow to offer some insight on this. ### 2. Partitioned Hash Join Partitioned hash join execution is a little weird, due to having a shared memory pool + each partition having their own `MemoryReservation`. Datafusion is different to other engines due to other engines being able to spill entire partitions but Datafusion has no way of coordinating that. We could add a coordinator between partitions which is a similar idea to [this PR](https://github.com/apache/datafusion/pull/17197) @adriangb added, however syncing the spilling between the partitions may be nontrivial. It is difficult to sync when record batches are being spilled due to the construction of hash tables not consistently happening at the same time. Another solution I was thinking of was isolating a single coordinator per partition, and having it keep track of memory per partition within the partition + using the `FairSpillPool` for should be a fair solution. This would be quite limiting for memory therefore we could use the idea of a dynamic memory distributor to pass more memory back to the pool if a partition doesn't seem to be using it. *Some discussion on this part would be much appreciated*. ### 3. Dynamic filter pushdown Dynamic filter pushdown will conflict with this feature. The coordinator that is added keeps track of the filters per spilled/unspilled partitions. ## Edge cases - For skewed join keys that cause a single partition to collect an overwhelming amount of inputs, we fall back to a sort merge join for that individual partition. ## Follow up improvements - In cases where there is guaranteed to be spills in a partitioned hash join, we pass the computed hash values and take parts of the hash to use to avoid rehashing. The same thing be done if recursive partitioning is done. - This implementation will be done for regular hash join first before symmetric hash join. - tbd... (there are many things that can be done) If more context is needed for the coordinator and refer to `HashBuild` portion here: https://facebookincubator.github.io/velox/develop/spilling.html - However Velox is quite different as Datafusion needs to stream batches ### Describe the solution you'd like _No response_ ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org