alex-plekhanov commented on code in PR #11977: URL: https://github.com/apache/ignite/pull/11977#discussion_r2097629960
########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java: ########## @@ -1020,13 +959,17 @@ else if (cmp > 0) { } /** */ - private static class SemiJoin<Row> extends MergeJoinNode<Row> { - /** */ - private Row left; + protected void tryToRequestInputs() throws Exception { + // Check `requested` to avoid endless buffer filling while unable to cycle. + if (waitingLeft == 0 && requested > 0) Review Comment: Still can overflow buffers. Can be reproduced with execution test: ``` @Test public void testJoinBuffers() throws Exception { ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0); Iterator<Object[]> leftIter = IntStream.range(0, 1_000_000).boxed().map(i -> new Object[] {i}).iterator(); Iterator<Object[]> rightIter = IntStream.range(1_000_000, 2_000_000).boxed().map(i -> new Object[] {i}).iterator(); RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class); ScanNode<Object[]> leftNode = new ScanNode<>(ctx, leftType, () -> leftIter); RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class); ScanNode<Object[]> rightNode = new ScanNode<>(ctx, rightType, () -> rightIter); RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class, int.class); MergeJoinNode<Object[]> join = MergeJoinNode.create(ctx, outType, leftType, rightType, INNER, Comparator.comparingInt(r -> (Integer)r[0]), true); join.register(F.asList(leftNode, rightNode)); List<Object[]> res = new ArrayList<>(); AtomicBoolean finished = new AtomicBoolean(); join.onRegister(new Downstream<>() { @Override public void push(Object[] objects) { res.add(objects); } @Override public void end() { finished.set(true); } @Override public void onError(Throwable e) { // No-op. } }); join.request(1); assertTrue(GridTestUtils.waitForCondition(finished::get, 5_000L)); assertTrue(join.leftInBuf.size() <= IN_BUFFER_SIZE); assertTrue(join.rightInBuf.size() <= IN_BUFFER_SIZE); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org