alex-plekhanov commented on code in PR #11977:
URL: https://github.com/apache/ignite/pull/11977#discussion_r2097629960


##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java:
##########
@@ -1020,13 +959,17 @@ else if (cmp > 0) {
     }
 
     /** */
-    private static class SemiJoin<Row> extends MergeJoinNode<Row> {
-        /** */
-        private Row left;
+    protected void tryToRequestInputs() throws Exception {
+        // Check `requested` to avoid endless buffer filling while unable to 
cycle.
+        if (waitingLeft == 0 && requested > 0)

Review Comment:
   Still can overflow buffers. Can be reproduced with execution test:
   ``` 
       @Test
       public void testJoinBuffers() throws Exception {
           ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
   
           Iterator<Object[]> leftIter = IntStream.range(0, 
1_000_000).boxed().map(i -> new Object[] {i}).iterator();
           Iterator<Object[]> rightIter = IntStream.range(1_000_000, 
2_000_000).boxed().map(i -> new Object[] {i}).iterator();
   
           RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class);
           ScanNode<Object[]> leftNode = new ScanNode<>(ctx, leftType, () -> 
leftIter);
   
           RelDataType rightType = 
TypeUtils.createRowType(ctx.getTypeFactory(), int.class);
           ScanNode<Object[]> rightNode = new ScanNode<>(ctx, rightType, () -> 
rightIter);
   
           RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, int.class);
   
           MergeJoinNode<Object[]> join = MergeJoinNode.create(ctx, outType, 
leftType, rightType, INNER,
               Comparator.comparingInt(r -> (Integer)r[0]), true);
   
           join.register(F.asList(leftNode, rightNode));
   
           List<Object[]> res = new ArrayList<>();
           AtomicBoolean finished = new AtomicBoolean();
   
           join.onRegister(new Downstream<>() {
               @Override public void push(Object[] objects) {
                   res.add(objects);
               }
   
               @Override public void end() {
                   finished.set(true);
               }
   
               @Override public void onError(Throwable e) {
                   // No-op.
               }
           });
   
           join.request(1);
   
           assertTrue(GridTestUtils.waitForCondition(finished::get, 5_000L));
           assertTrue(join.leftInBuf.size() <= IN_BUFFER_SIZE);
           assertTrue(join.rightInBuf.size() <= IN_BUFFER_SIZE);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to