jonathanc-n opened a new issue, #17267:
URL: https://github.com/apache/datafusion/issues/17267

   ### Is your feature request related to a problem or challenge?
   
   I wanted to share the idea for hash join spilling here as I would like to 
get input on this as hash join spilling is one of the core functionalities for 
query engines. 
   
   ## Hybrid Hash Join
   The core idea follows a hybrid hash join -> track memory for partitions + 
spill the largest batches until memory is back under a threshold.
   
   ## Build Phase
   During build phase we collect each batch we use each batch to update the 
hash map before iterating to the next one. 
   
   The order would be as such:
   1. Partition the build side values. Every partition is to have its memory 
tracked to check for spilling (partitioned hash join problems are discussed 
below).
   2. If there is a spill, we calculate the partitions that will be spilled 
greedily until we are under *~80%* (configurable) of the threshold. The 
partitions that are spilled will be recorded to later specify which partitions 
need to be spilled at the end of the build process process + to know which 
partitioned probe batches need to be spilled (keeping one spill file per 
partition). 
   3. Resume as normal -> use the remaining values to update the hash table.
   
   Memory threshold should also be made to be *around 90%* of memory for 
transient buffers when spilling.
   
   ## Probe phase
   As each probe-side batch arrives, compute its hash values on the join key(s) 
using the same hash function/partitioning logic as the build. For each probe 
row, determine its partition. These are the two scenarios occur:
   
   If the partition was *not spilled* in the build phase (meaning the build 
side’s corresponding partition hash table is in memory), the in-memory hash 
table is probed to find matches and produce joined output rows. 
   
   If the partition was spilled on the build side, then the build side’s rows 
for that partition are on disk (not in the hash table). In this case, the probe 
row is spilled into a probe-side spill file for that partition. This defers 
processing this partition’s matches until later, so we write all probe-side 
rows for that partition to disk.
   
   If no partitions were spilled during build, then we skip any partitioning 
overhead on the probe side altogether and just do a normal hash join in memory 
for all batches.
   
   ## Recursive hash join spilling
   Do the same thing in the probe phase, check if there is spilled data again. 
   
   ## Problems
   ### 1. Splitting `RecordBatch`
   We would need to do two `take()` operations on the `RecordBatch`, one for 
spilled and another for the batch kept in memory. I might need someone with 
more expertise on Arrow to offer some insight on this.
   
   ### 2. Partitioned Hash Join
   Partitioned hash join execution is a little weird, due to having a shared 
memory pool + each partition having their own `MemoryReservation`. Datafusion 
is different to other engines due to other engines being able to spill entire 
partitions but Datafusion has no way of coordinating that. We could add a 
coordinator between partitions which is a similar idea to [this 
PR](https://github.com/apache/datafusion/pull/17197) @adriangb added, however 
syncing the spilling between the partitions may be nontrivial. It is difficult 
to sync when record batches are being spilled due to the construction of hash 
tables not consistently happening at the same time.
   
   Another solution I was thinking of was isolating a single coordinator per 
partition, and having it keep track of memory per partition within the 
partition + using the `FairSpillPool` for should be a fair solution. This would 
be quite limiting for memory therefore we could use the idea of a dynamic 
memory distributor to pass more memory back to the pool if a partition doesn't 
seem to be using it. 
   
   *Some discussion on this part would be much appreciated*.
   
   ### 3. Dynamic filter pushdown
   Dynamic filter pushdown will conflict with this feature. The coordinator 
that is added keeps track of the filters per spilled/unspilled partitions. 
   
   ## Edge cases
    - For skewed join keys that cause a single partition to collect an 
overwhelming amount of inputs, we fall back to a sort merge join for that 
individual partition.
   
   ## Follow up improvements
    - In cases where there is guaranteed to be spills in a partitioned hash 
join, we pass the computed hash values and take parts of the hash to use to 
avoid rehashing. The same thing be done if recursive partitioning is done.
    - This implementation will be done for regular hash join first before 
symmetric hash join.
    - tbd... (there are many things that can be done)
   
   If more context is needed for the coordinator and refer to `HashBuild` 
portion here:
   https://facebookincubator.github.io/velox/develop/spilling.html - However 
Velox is quite different as Datafusion needs to stream batches
   
   ### Describe the solution you'd like
   
   _No response_
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to