rluvaton commented on code in PR #20482:
URL: https://github.com/apache/datafusion/pull/20482#discussion_r2845674949


##########
datafusion/physical-plan/src/joins/sort_merge_join/stream.rs:
##########
@@ -62,14 +62,16 @@ use 
datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
 use futures::{Stream, StreamExt};
 
 /// State of SMJ stream
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Clone)]
 pub(super) enum SortMergeJoinState {
     /// Init joining with a new streamed row or a new buffered batches
     Init,
     /// Polling one streamed row or one buffered batch, or both
     Polling,
     /// Joining polled data and making output
     JoinOutput,
+    /// Emit ready data if have any
+    EmitReady { next_state: Box<SortMergeJoinState> },

Review Comment:
   renamed



##########
datafusion/physical-plan/src/joins/sort_merge_join/tests.rs:
##########
@@ -3130,6 +3133,420 @@ fn test_partition_statistics() -> Result<()> {
     Ok(())
 }
 
+fn build_batches(
+    a: (&str, &[Vec<bool>]),
+    b: (&str, &[Vec<i32>]),
+    c: (&str, &[Vec<i32>]),
+) -> (Vec<RecordBatch>, SchemaRef) {
+    assert_eq!(a.1.len(), b.1.len());
+    let mut batches = vec![];
+
+    for i in 0..a.1.len() {
+        let schema = Schema::new(vec![
+            Field::new(a.0, DataType::Boolean, false),
+            Field::new(b.0, DataType::Int32, false),
+            Field::new(c.0, DataType::Int32, false),
+        ]);

Review Comment:
   thanks, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to