viirya commented on code in PR #10304:
URL: https://github.com/apache/datafusion/pull/10304#discussion_r1606910619
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1365,6 +1402,69 @@ fn get_filter_column(
filter_columns
}
+/// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]`
+#[inline(always)]
+fn get_buffered_columns(
+ buffered_data: &BufferedData,
+ buffered_batch_idx: usize,
+ buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+ buffered_data.batches[buffered_batch_idx]
+ .batch
+ .columns()
+ .iter()
+ .map(|column| take(column, &buffered_indices, None))
+ .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics
+// `streamed_indices` - array of streamed datasource JOINED row indices
+// `mask` - array booleans representing computed join filter expression eval
result:
+// true = the row index matches the join filter
+// false = the row index doesn't match the join filter
+// `streamed_indices` have the same length as `mask`
+fn get_filtered_join_mask(
+ join_type: JoinType,
+ streamed_indices: UInt64Array,
+ mask: &BooleanArray,
+) -> Option<(BooleanArray, Vec<u64>)> {
+ // for LeftSemi Join the filter mask should be calculated in its own way:
+ // if we find at least one matching row for specific streaming index
+ // we don't need to check any others for the same index
+ if matches!(join_type, JoinType::LeftSemi) {
+ // have we seen a filter match for a streaming index before
+ let mut seen_as_true: bool = false;
+ let streamed_indices_length = streamed_indices.len();
+ let mut corrected_mask: BooleanBuilder =
+ BooleanBuilder::with_capacity(streamed_indices_length);
+
+ let mut filter_matched_indices: Vec<u64> = vec![];
+
+ #[allow(clippy::needless_range_loop)]
+ for i in 0..streamed_indices_length {
+ // LeftSemi respects only first true values for specific streaming
index,
+ // others true values for the same index must be false
+ if mask.value(i) && !seen_as_true {
+ seen_as_true = true;
+ corrected_mask.append_value(true);
+ filter_matched_indices.push(streamed_indices.value(i));
Review Comment:
Could you explain why it is "same streaming index"? `streamed_indices`
contains all joined indices of current streamed batch. Why you think they are
pointing to same index?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]