alex-plekhanov commented on code in PR #11977: URL: https://github.com/apache/ignite/pull/11977#discussion_r2194461837
########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinExecutionTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.calcite.rel.core.JoinRelType.ANTI; +import static org.apache.calcite.rel.core.JoinRelType.FULL; +import static org.apache.calcite.rel.core.JoinRelType.INNER; +import static org.apache.calcite.rel.core.JoinRelType.LEFT; +import static org.apache.calcite.rel.core.JoinRelType.RIGHT; +import static org.apache.calcite.rel.core.JoinRelType.SEMI; + +/** + * + */ +@RunWith(Parameterized.class) +abstract class AbstractJoinExecutionTest extends AbstractExecutionTest { + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract JoinCreator joinCreator(); + + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract Consumer<AbstractNode<?>> joinFinalChecker(); + + /** Tests join of certain type with input bigger that the buffer size. */ + @Test + public void testJoinBufer() throws Exception { + for (JoinRelType joinType : F.asList(LEFT, INNER, RIGHT, FULL, SEMI, ANTI)) { + if (log.isInfoEnabled()) + log.info("Testing join of type '" + joinType + "'..."); + + doTestJoinBuffer(joinType, joinCreator(), joinFinalChecker()); + } + } + + /** + * @param jType Join type. + * @param jCreator Creates certain join node. + * @param finalChecker Finally check node after successfull run + */ + private void doTestJoinBuffer(JoinRelType jType, JoinCreator jCreator, Consumer<AbstractNode<?>> finalChecker) throws Exception { + int size = IN_BUFFER_SIZE * 2 + IN_BUFFER_SIZE / 2; + int intersect = Math.max(10, IN_BUFFER_SIZE / 10); + + int leftTo = size + intersect; + int rightTo = size * 2; + + ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0); + + Iterator<Object[]> leftIter = IntStream.range(0, leftTo).boxed().map(i -> new Object[] {i}).iterator(); + Iterator<Object[]> rightIter = IntStream.range(size, rightTo).boxed().map(i -> new Object[] {i}).iterator(); + + RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class); + ScanNode<Object[]> leftNode = new ScanNode<>(ctx, leftType, () -> leftIter); + + RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class); + ScanNode<Object[]> rightNode = new ScanNode<>(ctx, rightType, () -> rightIter); + + RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class, int.class); + + AbstractNode<Object[]> join = jCreator.create(ctx, outType, leftType, rightType, jType, + (r1, r2) -> r1[0].equals(r2[0])); + + join.register(F.asList(leftNode, rightNode)); + + List<Object[]> res = new ArrayList<>(); + AtomicBoolean finished = new AtomicBoolean(); + + join.onRegister(new Downstream<>() { + @Override public void push(Object[] objects) { + res.add(objects); + } + + @Override public void end() { + finished.set(true); + } + + @Override public void onError(Throwable e) { + // No-op. + } + }); + Review Comment: Let's also check intermidiate result without `Downstream.end()` here: ``` join.request(1); assertTrue(GridTestUtils.waitForCondition(() -> !res.isEmpty(), getTestTimeout())); bufChecker.accept(join); ``` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java: ########## @@ -151,7 +165,8 @@ private void pushLeft(Row row) throws Exception { leftInBuf.add(row); - join(); + if (waitingLeft <= 0) Review Comment: Can delay rows output (see comment for merge join) ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java: ########## @@ -809,16 +710,41 @@ public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType, BiPredicate<Row, } } - if (waitingRight == 0) - rightSource().request(waitingRight = IN_BUFFER_SIZE); + tryGetMoreOrEnd(); + } + } - if (waitingLeft == 0 && leftInBuf.isEmpty()) - leftSource().request(waitingLeft = IN_BUFFER_SIZE); + /** */ + protected void tryGetMoreOrEnd(@Nullable Supplier<Boolean> extCheck) throws Exception { + if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && left == null && leftInBuf.isEmpty() + && (extCheck == null || extCheck.get()) + ) { + requested = 0; + rightMaterialized.clear(); - if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && left == null && leftInBuf.isEmpty()) { - requested = 0; - downstream().end(); - } + downstream().end(); + + return; } + + if (waitingLeft == 0 && requested > 0 && leftInBuf.size() <= HALF_BUF_SIZE) + leftSource().request(waitingLeft = IN_BUFFER_SIZE - leftInBuf.size()); + + if (waitingRight == 0 && requested > 0 && rightMaterialized.size() <= HALF_BUF_SIZE) Review Comment: Incorrect condition, rightMaterialized should not be limited ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/tpch/AbstractTpchTest.java: ########## @@ -26,15 +26,21 @@ /** */ @RunWith(Parameterized.class) -public class TpchTest extends AbstractBasicIntegrationTest { +public abstract class AbstractTpchTest extends AbstractBasicIntegrationTest { + /** */ + protected static final Collection<Integer> USED_TESTS = F.asList(17, 19, 20); + /** Query ID. */ @Parameterized.Parameter public int qryId; + /** */ + protected abstract double scale(); + /** */ @Parameterized.Parameters(name = "queryId={0}") - public static Collection<Object> params() { - return F.asList(16, 19, 20); Review Comment: Why 16 is removed? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java: ########## @@ -673,28 +622,12 @@ public FullOuterJoin( } } - if (waitingRight == 0) - rightSource().request(waitingRight = IN_BUFFER_SIZE); - - if (waitingLeft == 0 && leftInBuf.isEmpty()) - leftSource().request(waitingLeft = IN_BUFFER_SIZE); - - if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && left == null - && leftInBuf.isEmpty() && rightNotMatchedIndexes.isEmpty()) { - requested = 0; - downstream().end(); - } + tryGetMoreOrEnd(() -> rightNotMatchedIndexes.isEmpty()); Review Comment: Let's simplify to something: ``` if (rightNotMatchedIndexes.isEmpty() && checkJoinFinished()) return; tryToRequestInputs(); ``` (WIll look similar to merge join and simplier to read) ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinExecutionTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.calcite.rel.core.JoinRelType.ANTI; +import static org.apache.calcite.rel.core.JoinRelType.FULL; +import static org.apache.calcite.rel.core.JoinRelType.INNER; +import static org.apache.calcite.rel.core.JoinRelType.LEFT; +import static org.apache.calcite.rel.core.JoinRelType.RIGHT; +import static org.apache.calcite.rel.core.JoinRelType.SEMI; + +/** + * + */ +@RunWith(Parameterized.class) +abstract class AbstractJoinExecutionTest extends AbstractExecutionTest { + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract JoinCreator joinCreator(); + + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract Consumer<AbstractNode<?>> joinFinalChecker(); + + /** Tests join of certain type with input bigger that the buffer size. */ + @Test + public void testJoinBufer() throws Exception { + for (JoinRelType joinType : F.asList(LEFT, INNER, RIGHT, FULL, SEMI, ANTI)) { + if (log.isInfoEnabled()) + log.info("Testing join of type '" + joinType + "'..."); + + doTestJoinBuffer(joinType, joinCreator(), joinFinalChecker()); + } + } + + /** + * @param jType Join type. + * @param jCreator Creates certain join node. + * @param finalChecker Finally check node after successfull run + */ + private void doTestJoinBuffer(JoinRelType jType, JoinCreator jCreator, Consumer<AbstractNode<?>> finalChecker) throws Exception { Review Comment: 'j' is not a common abbreviation for 'join', let's use 'join' in variable names ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinExecutionTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.calcite.rel.core.JoinRelType.ANTI; +import static org.apache.calcite.rel.core.JoinRelType.FULL; +import static org.apache.calcite.rel.core.JoinRelType.INNER; +import static org.apache.calcite.rel.core.JoinRelType.LEFT; +import static org.apache.calcite.rel.core.JoinRelType.RIGHT; +import static org.apache.calcite.rel.core.JoinRelType.SEMI; + +/** + * + */ +@RunWith(Parameterized.class) +abstract class AbstractJoinExecutionTest extends AbstractExecutionTest { + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract JoinCreator joinCreator(); + + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract Consumer<AbstractNode<?>> joinFinalChecker(); + + /** Tests join of certain type with input bigger that the buffer size. */ + @Test + public void testJoinBufer() throws Exception { + for (JoinRelType joinType : F.asList(LEFT, INNER, RIGHT, FULL, SEMI, ANTI)) { + if (log.isInfoEnabled()) + log.info("Testing join of type '" + joinType + "'..."); + + doTestJoinBuffer(joinType, joinCreator(), joinFinalChecker()); + } + } + + /** + * @param jType Join type. + * @param jCreator Creates certain join node. + * @param finalChecker Finally check node after successfull run Review Comment: point at the EOL ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java: ########## @@ -213,12 +242,33 @@ protected Node<Row> rightSource() { /** */ protected abstract void join() throws Exception; - /** */ - protected void checkJoinFinished() throws Exception { - if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING)) { + /** Checks if finished considering strictly one shoulder. */ + protected boolean checkFinished(boolean leftShoulder) throws Exception { + return checkFinished(leftShoulder ? -1 : 1, false); + } + + /** + * Checks if finished. Can take in account one or both shoulders. + * + * @param shoulder If <0, checks the only left input. If is 0, checks both inputs. If >0, checks only right input. + * @param strict Works with only with {@code input} == 0. If {@code true}, checks both inputs. Otherwise, checks any input. + */ + protected boolean checkFinished(int shoulder, boolean strict) throws Exception { Review Comment: - Overcomplicated API (hard to understand what is really checked) - Inconsistent API (boolean in one method, integer in another method) - Condition in checkFinished fully covered by condition of each join, except "!distributed", so effectively can be removed. - I've proposed the patch earlier, to reduce buffer changes for distributed join (we need to drain remote sources but no need to store and join data): ``` if (!finishing) { finishing = true; leftInBuf.clear(); rightInBuf.clear(); } ``` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java: ########## @@ -213,12 +242,33 @@ protected Node<Row> rightSource() { /** */ protected abstract void join() throws Exception; - /** */ - protected void checkJoinFinished() throws Exception { - if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING)) { + /** Checks if finished considering strictly one shoulder. */ + protected boolean checkFinished(boolean leftShoulder) throws Exception { Review Comment: For joins usually used word 'hand', not 'shoulder' ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java: ########## @@ -809,16 +710,41 @@ public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType, BiPredicate<Row, } } - if (waitingRight == 0) - rightSource().request(waitingRight = IN_BUFFER_SIZE); + tryGetMoreOrEnd(); + } + } - if (waitingLeft == 0 && leftInBuf.isEmpty()) - leftSource().request(waitingLeft = IN_BUFFER_SIZE); + /** */ + protected void tryGetMoreOrEnd(@Nullable Supplier<Boolean> extCheck) throws Exception { + if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && left == null && leftInBuf.isEmpty() + && (extCheck == null || extCheck.get()) + ) { + requested = 0; + rightMaterialized.clear(); - if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && left == null && leftInBuf.isEmpty()) { - requested = 0; - downstream().end(); - } + downstream().end(); + + return; } + + if (waitingLeft == 0 && requested > 0 && leftInBuf.size() <= HALF_BUF_SIZE) Review Comment: See comment about `requested > 0` for merge join ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinExecutionTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.calcite.rel.core.JoinRelType.ANTI; +import static org.apache.calcite.rel.core.JoinRelType.FULL; +import static org.apache.calcite.rel.core.JoinRelType.INNER; +import static org.apache.calcite.rel.core.JoinRelType.LEFT; +import static org.apache.calcite.rel.core.JoinRelType.RIGHT; +import static org.apache.calcite.rel.core.JoinRelType.SEMI; + +/** + * + */ +@RunWith(Parameterized.class) +abstract class AbstractJoinExecutionTest extends AbstractExecutionTest { + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract JoinCreator joinCreator(); + + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract Consumer<AbstractNode<?>> joinFinalChecker(); Review Comment: It looks not very consistent. These methods required only for one test (with parameters related only to this test), but redefined for children classes. Maybe it's better to have dedicated test class to check buffers with parameters for MERGE and NL join? Also JoinCreator interface and Consumer for buffer checker are redundant, you can use methods like `createJoin` and `checkBuffers` without additional wrapping. ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractJoinExecutionTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.calcite.rel.core.JoinRelType.ANTI; +import static org.apache.calcite.rel.core.JoinRelType.FULL; +import static org.apache.calcite.rel.core.JoinRelType.INNER; +import static org.apache.calcite.rel.core.JoinRelType.LEFT; +import static org.apache.calcite.rel.core.JoinRelType.RIGHT; +import static org.apache.calcite.rel.core.JoinRelType.SEMI; + +/** + * + */ +@RunWith(Parameterized.class) +abstract class AbstractJoinExecutionTest extends AbstractExecutionTest { + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract JoinCreator joinCreator(); + + /** @see #doTestJoinBuffer(JoinRelType, JoinCreator, Consumer) */ + protected abstract Consumer<AbstractNode<?>> joinFinalChecker(); + + /** Tests join of certain type with input bigger that the buffer size. */ + @Test + public void testJoinBufer() throws Exception { + for (JoinRelType joinType : F.asList(LEFT, INNER, RIGHT, FULL, SEMI, ANTI)) { + if (log.isInfoEnabled()) + log.info("Testing join of type '" + joinType + "'..."); + + doTestJoinBuffer(joinType, joinCreator(), joinFinalChecker()); Review Comment: joinCreator -> joinFactory joinFinalChecker -> joinBufChecker (final checker sounds like it checks the final result) ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java: ########## @@ -1006,27 +943,27 @@ else if (cmp > 0) { inLoop = false; } - if (waitingRight == 0) - rightSource().request(waitingRight = IN_BUFFER_SIZE); - - if (waitingLeft == 0) - leftSource().request(waitingLeft = IN_BUFFER_SIZE); - if (requested > 0 && waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty() && waitingRight == NOT_WAITING && right == null && rightInBuf.isEmpty() && rightMaterialization == null + && checkFinished(0, true) ) - checkJoinFinished(); + return; + + tryToRequestInputs(); } } /** */ - private static class SemiJoin<Row> extends MergeJoinNode<Row> { - /** */ - private Row left; + protected void tryToRequestInputs() throws Exception { + if (waitingLeft == 0 && requested > 0 && leftInBuf.size() <= HALF_BUF_SIZE) Review Comment: Why do we need `requested > 0` here? Condition with `leftInBuf.size()` already covers all problems related to buffer overflow. Any tests that can prove that additional condition is required? IMO this condition can add another problems (for example, we need to drain remote sources even if requested is 0). ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java: ########## @@ -159,7 +184,8 @@ private void pushLeft(Row row) throws Exception { leftInBuf.add(row); - join(); + if (waitingLeft == 0 && waitingRight <= 0) Review Comment: These conditions will delay rows output and will increase request latancy. Also, when we need only one row (CNLJ) we will wait for the whole right and left batches before row will be produced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org