alamb commented on code in PR #10304:
URL: https://github.com/apache/datafusion/pull/10304#discussion_r1598870577


##########
datafusion/sqllogictest/test_files/sort_merge_join.slt:
##########
@@ -263,5 +263,139 @@ DROP TABLE t1;
 statement ok
 DROP TABLE t2;
 
+

Review Comment:
   To verify that these tests cover the code changes, I ran them locally 
without the code changes in this PR and they failed as expected 👍 
   
   
   ```
   Running "sort_merge_join.slt"
   thread 'tokio-runtime-worker' panicked at 
datafusion/physical-plan/src/joins/sort_merge_join.rs:1356:22:
   index out of bounds: the len is 0 but the index is 1
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   External error: task 17 panicked
   Error: Execution("1 failures")
   error: test failed, to rerun pass `-p datafusion-sqllogictest --test 
sqllogictests`
   
   Caused by:
     process didn't exit successfully: 
`/Users/andrewlamb/Software/datafusion/target/debug/deps/sqllogictests-ce3a36cfeab74789
 sort_merge` (exit status: 1)
   ```



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
     filter_columns
 }
 
+// Get buffered data sliece by specific batch index and for specified column 
indices only
+#[inline(always)]
+fn get_buffered_columns(
+    buffered_data: &BufferedData,
+    buffered_batch_idx: usize,
+    buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+    buffered_data.batches[buffered_batch_idx]
+        .batch
+        .columns()
+        .iter()
+        .map(|column| take(column, &buffered_indices, None))
+        .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics

Review Comment:
   I think it would help to also document what the input parameters 
`streamed_indicies` and `mask` represents here (I think it is the rows in 
streamed_indices that match the join predicate?)



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
     filter_columns
 }
 
+// Get buffered data sliece by specific batch index and for specified column 
indices only

Review Comment:
   I think `buffered_indices` are row indices (not column indices), which 
confused me for a moment
   
   Maybe this would be clearer:
   
   ```suggestion
   /// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]`
   ```



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -2639,6 +2710,70 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn left_semi_join_filtered_mask() -> Result<()> {
+        assert_eq!(
+            get_filtered_join_mask(
+                LeftSemi,

Review Comment:
   maybe we should test a type other than LeftSemi as negative test coverage 🤔 



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1161,6 +1162,15 @@ impl SMJStream {
             let filter_columns = if chunk.buffered_batch_idx.is_some() {
                 if matches!(self.join_type, JoinType::Right) {
                     get_filter_column(&self.filter, &buffered_columns, 
&streamed_columns)
+                } else if matches!(self.join_type, JoinType::LeftSemi) {

Review Comment:
   I wonder if this should also check for `JoinType::Left` (and the clause 
above also check for `JoinType::RightSemi` 🤔 



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
     filter_columns
 }
 
+// Get buffered data sliece by specific batch index and for specified column 
indices only
+#[inline(always)]
+fn get_buffered_columns(
+    buffered_data: &BufferedData,
+    buffered_batch_idx: usize,
+    buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+    buffered_data.batches[buffered_batch_idx]
+        .batch
+        .columns()
+        .iter()
+        .map(|column| take(column, &buffered_indices, None))
+        .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics
+fn get_filtered_join_mask(
+    join_type: JoinType,
+    streamed_indices: UInt64Array,
+    mask: &BooleanArray,
+) -> Option<BooleanArray> {
+    // for LeftSemi Join the filter mask should be calculated in its own way:
+    // if we find at least one matching row for specific streaming index
+    // we dont need to check any others for the same index
+    if matches!(join_type, JoinType::LeftSemi) {
+        // have we seen a filter match for a streaming index before
+        let mut seen_as_true: bool = false;
+        let streamed_indices_length = streamed_indices.len();
+        let mut corrected_mask: Vec<bool> = vec![false; 
streamed_indices_length];
+
+        #[allow(clippy::needless_range_loop)]

Review Comment:
   I wonder why ignore clippy here?



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
     filter_columns
 }
 
+// Get buffered data sliece by specific batch index and for specified column 
indices only
+#[inline(always)]
+fn get_buffered_columns(
+    buffered_data: &BufferedData,
+    buffered_batch_idx: usize,
+    buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+    buffered_data.batches[buffered_batch_idx]
+        .batch
+        .columns()
+        .iter()
+        .map(|column| take(column, &buffered_indices, None))
+        .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics
+fn get_filtered_join_mask(
+    join_type: JoinType,
+    streamed_indices: UInt64Array,
+    mask: &BooleanArray,
+) -> Option<BooleanArray> {
+    // for LeftSemi Join the filter mask should be calculated in its own way:
+    // if we find at least one matching row for specific streaming index
+    // we dont need to check any others for the same index
+    if matches!(join_type, JoinType::LeftSemi) {

Review Comment:
   I find it strange this doesn't' also handle RightSemi (as in I would expect 
the code to be symmetrical)



##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
     filter_columns
 }
 
+// Get buffered data sliece by specific batch index and for specified column 
indices only
+#[inline(always)]
+fn get_buffered_columns(
+    buffered_data: &BufferedData,
+    buffered_batch_idx: usize,
+    buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+    buffered_data.batches[buffered_batch_idx]
+        .batch
+        .columns()
+        .iter()
+        .map(|column| take(column, &buffered_indices, None))
+        .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics
+fn get_filtered_join_mask(
+    join_type: JoinType,
+    streamed_indices: UInt64Array,
+    mask: &BooleanArray,
+) -> Option<BooleanArray> {
+    // for LeftSemi Join the filter mask should be calculated in its own way:
+    // if we find at least one matching row for specific streaming index
+    // we dont need to check any others for the same index
+    if matches!(join_type, JoinType::LeftSemi) {
+        // have we seen a filter match for a streaming index before
+        let mut seen_as_true: bool = false;
+        let streamed_indices_length = streamed_indices.len();
+        let mut corrected_mask: Vec<bool> = vec![false; 
streamed_indices_length];

Review Comment:
   FWIW it might be faster / easier to followto create the `BooleanArray` 
directly using 
[`BooleanBuilder`](https://docs.rs/arrow/latest/arrow/array/struct.BooleanBuilder.html)
 rather than Vec<bool>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to