alamb commented on code in PR #12082:
URL: https://github.com/apache/datafusion/pull/12082#discussion_r1725492996
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1356,16 +1392,82 @@ impl SMJStream {
pre_mask.clone()
};
+ // Try to calculate if the buffered batch we scan is the
last one for specific stream row and join key
+ // for Batchsize == 1
self.buffered_data.scanning_finished() works well
+ // For other scenarios its an attempt to figure out there
is no more rows matching the same join key
+ let last_batch = if self.batch_size == 1 {
Review Comment:
I would expect the right check, based on function names, to be
```rust
let last_batch = self.buffered_data.scanning_finished()
```
However, I tried that and the test still fails.
@richox I wonder if you have any ideas (as it appears you are the original
author of SortMergeJoin in https://github.com/apache/datafusion/pull/2242)
I am having a hard time following the logic in such a large function (looks
like `freeze_streamed` is something like `300` lines long).
If I were debugging this issue more, what I would probably do is
1. to break the logic down into a few more named functions so the logic
boundaries were clearer and the intended action is clearer.
2. try and document, in comments, what the intended invariants of
BufferedBatch / ScanningBatch are. My hope would be that in the process of
writing that documentation I would learn the code more so I could have a better
idea of what invariant isn't being upheld in this ufunction
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]