alex-plekhanov commented on code in PR #11770:
URL: https://github.com/apache/ignite/pull/11770#discussion_r2726446578


##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java:
##########
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.BiPredicate;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.MappingRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/** Hash join implementor. */
+public abstract class HashJoinNode<Row> extends 
AbstractRightMaterializedJoinNode<Row> {
+    /** */
+    private static final int INITIAL_CAPACITY = 128;
+
+    /** */
+    private final RowHandler<Row> leftRowHnd;
+
+    /** */
+    private final RowHandler<Row> rightRowHnd;
+
+    /** */
+    private final boolean keepRowsWithNull;
+
+    /** Output row handler. */
+    protected final RowHandler<Row> outRowHnd;
+
+    /** Right rows storage. */
+    protected Map<GroupKey<Row>, TouchedArrayList<Row>> hashStore = new 
HashMap<>(INITIAL_CAPACITY);
+
+    /** */
+    protected Iterator<Row> rightIt = Collections.emptyIterator();
+
+    /** */
+    @Nullable protected final BiPredicate<Row, Row> nonEqCond;
+
+    /**
+     * Creates hash join node.
+     *
+     * @param ctx Execution context.
+     * @param rowType Out row type.
+     * @param info Join info.
+     * @param outRowHnd Output row handler.
+     * @param keepRowsWithNull {@code True} if we need to store the row from 
right shoulder even if it contains NULL in
+     *                         any of join key position. This is required for 
joins which emit unmatched part
+     *                         of the right shoulder, such as RIGHT JOIN and 
FULL OUTER JOIN.
+     * @param nonEqCond If provided, only rows matching the predicate will be 
emitted as matched rows.
+     */
+    protected HashJoinNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        IgniteJoinInfo info,
+        RowHandler<Row> outRowHnd,
+        boolean keepRowsWithNull,
+        @Nullable BiPredicate<Row, Row> nonEqCond
+    ) {
+        super(ctx, rowType);
+
+        this.keepRowsWithNull = keepRowsWithNull;
+
+        leftRowHnd = new MappingRowHandler<>(ctx.rowHandler(), 
info.leftKeys.toIntArray());
+        rightRowHnd = new MappingRowHandler<>(ctx.rowHandler(), 
info.rightKeys.toIntArray());
+
+        this.outRowHnd = outRowHnd;
+
+        this.nonEqCond = nonEqCond;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        super.rewindInternal();
+
+        rightIt = Collections.emptyIterator();
+
+        hashStore.clear();
+    }
+
+    /** Creates certain join node. */
+    public static <RowT> HashJoinNode<RowT> create(
+        ExecutionContext<RowT> ctx,
+        RelDataType rowType,
+        RelDataType leftRowType,
+        RelDataType rightRowType,
+        JoinRelType type,
+        IgniteJoinInfo info,
+        @Nullable BiPredicate<RowT, RowT> nonEqCond
+    ) {
+        assert !info.pairs().isEmpty() && (info.isEqui() || type == 
JoinRelType.INNER || type == JoinRelType.SEMI);
+        assert nonEqCond == null || type == JoinRelType.INNER || type == 
JoinRelType.SEMI || type == JoinRelType.LEFT;
+
+        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
+        RowHandler<RowT> rowHnd = ctx.rowHandler();
+
+        switch (type) {
+            case INNER:
+                return new InnerHashJoin<>(ctx, rowType, info, rowHnd, 
nonEqCond);
+
+            case LEFT:
+                return new LeftHashJoin<>(ctx, rowType, info, rowHnd, 
rowHnd.factory(typeFactory, rightRowType), nonEqCond);
+
+            case RIGHT:
+                return new RightHashJoin<>(ctx, rowType, info, rowHnd, 
rowHnd.factory(typeFactory, leftRowType));
+
+            case FULL: {
+                return new FullOuterHashJoin<>(ctx, rowType, info, rowHnd, 
rowHnd.factory(typeFactory, leftRowType),
+                    rowHnd.factory(typeFactory, rightRowType), nonEqCond);
+            }
+
+            case SEMI:
+                return new SemiHashJoin<>(ctx, rowType, info, rowHnd, 
nonEqCond);
+
+            case ANTI:
+                return new AntiHashJoin<>(ctx, rowType, info, rowHnd, 
nonEqCond);
+
+            default:
+                throw new IllegalArgumentException("Join of type '" + type + 
"' isn't supported.");
+        }
+    }
+
+    /** */
+    protected Collection<Row> lookup(Row row) {
+        GroupKey<Row> key = GroupKey.of(row, leftRowHnd, false);
+
+        if (key == null)
+            return Collections.emptyList();
+
+        TouchedArrayList<Row> res = hashStore.get(key);
+
+        if (res == null)
+            return Collections.emptyList();
+
+        res.touched = true;
+
+        return res;
+    }
+
+    /** */
+    protected Iterator<Row> untouched() {
+        return F.flat(F.iterator(hashStore.values(), c0 -> c0, true, c1 -> 
!c1.touched));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void pushRight(Row row) throws Exception {
+        assert downstream() != null;
+        assert waitingRight > 0;
+
+        nodeMemoryTracker.onRowAdded(row);
+
+        waitingRight--;
+
+        GroupKey<Row> key = GroupKey.of(row, rightRowHnd, keepRowsWithNull);
+
+        if (key != null)
+            hashStore.computeIfAbsent(key, k -> new 
TouchedArrayList<>()).add(row);
+
+        if (waitingRight == 0)
+            rightSource().request(waitingRight = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    protected boolean leftFinished() {
+        return waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty();
+    }
+
+    /** */
+    protected boolean rightFinished() {
+        return waitingRight == NOT_WAITING && !rightIt.hasNext();
+    }
+
+    /** */
+    protected boolean checkJoinFinished() throws Exception {
+        if (requested > 0 && leftFinished() && rightFinished()) {
+            requested = 0;
+
+            hashStore.clear();
+
+            downstream().end();
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** */
+    private static final class InnerHashJoin<RowT> extends HashJoinNode<RowT> {
+        /**
+         * Creates node for INNER JOIN.
+         *
+         * @param ctx Execution context.
+         * @param rowType Out row type.
+         * @param info Join info.
+         * @param outRowHnd Output row handler.
+         * @param nonEqCond If provided, only rows matching the predicate will 
be emitted as matched rows.
+         */
+        private InnerHashJoin(ExecutionContext<RowT> ctx,
+            RelDataType rowType,
+            IgniteJoinInfo info,
+            RowHandler<RowT> outRowHnd,
+            @Nullable BiPredicate<RowT, RowT> nonEqCond
+        ) {
+            super(ctx, rowType, info, outRowHnd, false, nonEqCond);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+
+                try {
+                    while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
+                        // Proceed with next left row, if previous was fully 
processed.
+                        if (left == null) {
+                            left = leftInBuf.remove();
+
+                            rightIt = lookup(left).iterator();
+                        }
+
+                        if (rightIt.hasNext()) {
+                            // Emits matched rows.
+                            while (requested > 0 && rightIt.hasNext()) {
+                                if (rescheduleJoin())
+                                    return;
+
+                                RowT right = rightIt.next();
+
+                                if (nonEqCond != null && !nonEqCond.test(left, 
right))
+                                    continue;
+
+                                --requested;
+
+                                downstream().push(outRowHnd.concat(left, 
right));
+                            }
+
+                            if (!rightIt.hasNext())
+                                left = null;
+                        }
+                        else
+                            left = null;
+                    }
+                }
+                finally {
+                    inLoop = false;
+                }
+            }
+
+            if (checkJoinFinished())
+                return;
+
+            tryToRequestInputs();
+        }
+    }
+
+    /** */
+    private static final class LeftHashJoin<RowT> extends HashJoinNode<RowT> {
+        /** Right row factory. */
+        private final RowHandler.RowFactory<RowT> rightRowFactory;
+
+        /**
+         * Creates node for LEFT OUTER JOIN.
+         *
+         * @param ctx Execution context.
+         * @param info Join info.
+         * @param rowType Out row type.
+         * @param outRowHnd Output row handler.
+         * @param rightRowFactory Right row factory.
+         * @param nonEqCond Non-equi conditions.
+         */
+        private LeftHashJoin(
+            ExecutionContext<RowT> ctx,
+            RelDataType rowType,
+            IgniteJoinInfo info,
+            RowHandler<RowT> outRowHnd,
+            RowHandler.RowFactory<RowT> rightRowFactory,
+            @Nullable BiPredicate<RowT, RowT> nonEqCond
+        ) {
+            super(ctx, rowType, info, outRowHnd, false, nonEqCond);
+
+            this.rightRowFactory = rightRowFactory;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+
+                try {
+                    while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
+                        // Proceed with next left row, if previous was fully 
processed.
+                        if (left == null) {
+                            left = leftInBuf.remove();
+
+                            Collection<RowT> rightRows = lookup(left);
+
+                            if (rightRows.isEmpty()) {
+                                requested--;
+
+                                downstream().push(outRowHnd.concat(left, 
rightRowFactory.create()));
+                            }
+
+                            rightIt = rightRows.iterator();
+                        }
+
+                        if (rightIt.hasNext()) {
+                            // Emit unmatched left row.
+                            while (requested > 0 && rightIt.hasNext()) {
+                                if (rescheduleJoin())
+                                    return;
+
+                                RowT right = rightIt.next();
+
+                                if (nonEqCond != null && !nonEqCond.test(left, 
right))
+                                    continue;
+
+                                 --requested;

Review Comment:
   Wrong indent



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java:
##########
@@ -0,0 +1,750 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.BiPredicate;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.MappingRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/** Hash join implementor. */
+public abstract class HashJoinNode<Row> extends 
AbstractRightMaterializedJoinNode<Row> {
+    /** */
+    private static final int INITIAL_CAPACITY = 128;
+
+    /** */
+    private final RowHandler<Row> leftRowHnd;
+
+    /** */
+    private final RowHandler<Row> rightRowHnd;
+
+    /** */
+    private final boolean keepRowsWithNull;
+
+    /** Output row handler. */
+    protected final RowHandler<Row> outRowHnd;
+
+    /** Right rows storage. */
+    protected Map<GroupKey<Row>, TouchedArrayList<Row>> hashStore = new 
HashMap<>(INITIAL_CAPACITY);
+
+    /** */
+    protected Iterator<Row> rightIt = Collections.emptyIterator();
+
+    /** */
+    @Nullable protected final BiPredicate<Row, Row> nonEqCond;
+
+    /**
+     * Creates hash join node.
+     *
+     * @param ctx Execution context.
+     * @param rowType Out row type.
+     * @param info Join info.
+     * @param outRowHnd Output row handler.
+     * @param keepRowsWithNull {@code True} if we need to store the row from 
right shoulder even if it contains NULL in
+     *                         any of join key position. This is required for 
joins which emit unmatched part
+     *                         of the right shoulder, such as RIGHT JOIN and 
FULL OUTER JOIN.
+     * @param nonEqCond If provided, only rows matching the predicate will be 
emitted as matched rows.
+     */
+    protected HashJoinNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        IgniteJoinInfo info,
+        RowHandler<Row> outRowHnd,
+        boolean keepRowsWithNull,
+        @Nullable BiPredicate<Row, Row> nonEqCond
+    ) {
+        super(ctx, rowType);
+
+        this.keepRowsWithNull = keepRowsWithNull;
+
+        leftRowHnd = new MappingRowHandler<>(ctx.rowHandler(), 
info.leftKeys.toIntArray());
+        rightRowHnd = new MappingRowHandler<>(ctx.rowHandler(), 
info.rightKeys.toIntArray());
+
+        this.outRowHnd = outRowHnd;
+
+        this.nonEqCond = nonEqCond;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        super.rewindInternal();
+
+        rightIt = Collections.emptyIterator();
+
+        hashStore.clear();
+    }
+
+    /** Creates certain join node. */
+    public static <RowT> HashJoinNode<RowT> create(
+        ExecutionContext<RowT> ctx,
+        RelDataType rowType,
+        RelDataType leftRowType,
+        RelDataType rightRowType,
+        JoinRelType type,
+        IgniteJoinInfo info,
+        @Nullable BiPredicate<RowT, RowT> nonEqCond
+    ) {
+        assert !info.pairs().isEmpty() && (info.isEqui() || type == 
JoinRelType.INNER || type == JoinRelType.SEMI);
+        assert nonEqCond == null || type == JoinRelType.INNER || type == 
JoinRelType.SEMI || type == JoinRelType.LEFT;
+
+        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
+        RowHandler<RowT> rowHnd = ctx.rowHandler();
+
+        switch (type) {
+            case INNER:
+                return new InnerHashJoin<>(ctx, rowType, info, rowHnd, 
nonEqCond);
+
+            case LEFT:
+                return new LeftHashJoin<>(ctx, rowType, info, rowHnd, 
rowHnd.factory(typeFactory, rightRowType), nonEqCond);
+
+            case RIGHT:
+                return new RightHashJoin<>(ctx, rowType, info, rowHnd, 
rowHnd.factory(typeFactory, leftRowType));
+
+            case FULL: {
+                return new FullOuterHashJoin<>(ctx, rowType, info, rowHnd, 
rowHnd.factory(typeFactory, leftRowType),
+                    rowHnd.factory(typeFactory, rightRowType), nonEqCond);
+            }
+
+            case SEMI:
+                return new SemiHashJoin<>(ctx, rowType, info, rowHnd, 
nonEqCond);
+
+            case ANTI:
+                return new AntiHashJoin<>(ctx, rowType, info, rowHnd, 
nonEqCond);
+
+            default:
+                throw new IllegalArgumentException("Join of type '" + type + 
"' isn't supported.");
+        }
+    }
+
+    /** */
+    protected Collection<Row> lookup(Row row) {
+        GroupKey<Row> key = GroupKey.of(row, leftRowHnd, false);
+
+        if (key == null)
+            return Collections.emptyList();
+
+        TouchedArrayList<Row> res = hashStore.get(key);
+
+        if (res == null)
+            return Collections.emptyList();
+
+        res.touched = true;
+
+        return res;
+    }
+
+    /** */
+    protected Iterator<Row> untouched() {
+        return F.flat(F.iterator(hashStore.values(), c0 -> c0, true, c1 -> 
!c1.touched));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void pushRight(Row row) throws Exception {
+        assert downstream() != null;
+        assert waitingRight > 0;
+
+        nodeMemoryTracker.onRowAdded(row);
+
+        waitingRight--;
+
+        GroupKey<Row> key = GroupKey.of(row, rightRowHnd, keepRowsWithNull);
+
+        if (key != null)
+            hashStore.computeIfAbsent(key, k -> new 
TouchedArrayList<>()).add(row);
+
+        if (waitingRight == 0)
+            rightSource().request(waitingRight = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    protected boolean leftFinished() {
+        return waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty();
+    }
+
+    /** */
+    protected boolean rightFinished() {
+        return waitingRight == NOT_WAITING && !rightIt.hasNext();
+    }
+
+    /** */
+    protected boolean checkJoinFinished() throws Exception {
+        if (requested > 0 && leftFinished() && rightFinished()) {
+            requested = 0;
+
+            hashStore.clear();
+
+            downstream().end();
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** */
+    private static final class InnerHashJoin<RowT> extends HashJoinNode<RowT> {
+        /**
+         * Creates node for INNER JOIN.
+         *
+         * @param ctx Execution context.
+         * @param rowType Out row type.
+         * @param info Join info.
+         * @param outRowHnd Output row handler.
+         * @param nonEqCond If provided, only rows matching the predicate will 
be emitted as matched rows.
+         */
+        private InnerHashJoin(ExecutionContext<RowT> ctx,
+            RelDataType rowType,
+            IgniteJoinInfo info,
+            RowHandler<RowT> outRowHnd,
+            @Nullable BiPredicate<RowT, RowT> nonEqCond
+        ) {
+            super(ctx, rowType, info, outRowHnd, false, nonEqCond);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+
+                try {
+                    while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
+                        // Proceed with next left row, if previous was fully 
processed.
+                        if (left == null) {
+                            left = leftInBuf.remove();
+
+                            rightIt = lookup(left).iterator();
+                        }
+
+                        if (rightIt.hasNext()) {
+                            // Emits matched rows.
+                            while (requested > 0 && rightIt.hasNext()) {
+                                if (rescheduleJoin())
+                                    return;
+
+                                RowT right = rightIt.next();
+
+                                if (nonEqCond != null && !nonEqCond.test(left, 
right))
+                                    continue;
+
+                                --requested;
+
+                                downstream().push(outRowHnd.concat(left, 
right));
+                            }
+
+                            if (!rightIt.hasNext())
+                                left = null;
+                        }
+                        else
+                            left = null;
+                    }
+                }
+                finally {
+                    inLoop = false;
+                }
+            }
+
+            if (checkJoinFinished())
+                return;
+
+            tryToRequestInputs();
+        }
+    }
+
+    /** */
+    private static final class LeftHashJoin<RowT> extends HashJoinNode<RowT> {
+        /** Right row factory. */
+        private final RowHandler.RowFactory<RowT> rightRowFactory;
+
+        /**
+         * Creates node for LEFT OUTER JOIN.
+         *
+         * @param ctx Execution context.
+         * @param info Join info.
+         * @param rowType Out row type.
+         * @param outRowHnd Output row handler.
+         * @param rightRowFactory Right row factory.
+         * @param nonEqCond Non-equi conditions.
+         */
+        private LeftHashJoin(
+            ExecutionContext<RowT> ctx,
+            RelDataType rowType,
+            IgniteJoinInfo info,
+            RowHandler<RowT> outRowHnd,
+            RowHandler.RowFactory<RowT> rightRowFactory,
+            @Nullable BiPredicate<RowT, RowT> nonEqCond
+        ) {
+            super(ctx, rowType, info, outRowHnd, false, nonEqCond);
+
+            this.rightRowFactory = rightRowFactory;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void join() throws Exception {
+            if (waitingRight == NOT_WAITING) {
+                inLoop = true;
+
+                try {
+                    while (requested > 0 && (left != null || 
!leftInBuf.isEmpty())) {
+                        // Proceed with next left row, if previous was fully 
processed.
+                        if (left == null) {
+                            left = leftInBuf.remove();
+
+                            Collection<RowT> rightRows = lookup(left);
+
+                            if (rightRows.isEmpty()) {
+                                requested--;
+
+                                downstream().push(outRowHnd.concat(left, 
rightRowFactory.create()));
+                            }
+
+                            rightIt = rightRows.iterator();
+                        }
+
+                        if (rightIt.hasNext()) {
+                            // Emit unmatched left row.
+                            while (requested > 0 && rightIt.hasNext()) {
+                                if (rescheduleJoin())
+                                    return;
+
+                                RowT right = rightIt.next();
+
+                                if (nonEqCond != null && !nonEqCond.test(left, 
right))
+                                    continue;

Review Comment:
   For left join we should emit row with empty right part, but only once per 
one left row



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinExecutionTest.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.function.BiPredicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.calcite.rel.core.JoinRelType.ANTI;
+import static org.apache.calcite.rel.core.JoinRelType.FULL;
+import static org.apache.calcite.rel.core.JoinRelType.INNER;
+import static org.apache.calcite.rel.core.JoinRelType.LEFT;
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
+import static org.apache.calcite.rel.core.JoinRelType.SEMI;
+
+/** */
+public class HashJoinExecutionTest extends AbstractExecutionTest {
+    /** */
+    @Test
+    public void testHashJoinRewind() {
+        ExecutionContext<Object[]> ctx = executionContext();
+
+        RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
Integer.class, String.class);
+        RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), 
Integer.class, String.class, Integer.class);
+
+        ScanNode<Object[]> deps = new ScanNode<>(
+            ctx,
+            rightType,
+            Arrays.asList(
+                new Object[] {1, "Core"},
+                new Object[] {2, "SQL"},
+                new Object[] {3, "QA"}
+            ));
+
+        ScanNode<Object[]> persons = new ScanNode<>(
+            ctx,
+            leftType,
+            Arrays.asList(
+                new Object[] {0, "Igor", 1},
+                new Object[] {1, "Roman", 2},
+                new Object[] {2, "Ivan", 5},
+                new Object[] {3, "Alexey", 1}
+            ));
+
+        HashJoinNode<Object[]> join = createJoinNode(ctx, LEFT, leftType, 
rightType, null);
+
+        join.register(F.asList(persons, deps));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, join.rowType(), 
r -> new Object[] {r[2], r[3], r[1]});
+        project.register(join);
+
+        RootRewindable<Object[]> node = new RootRewindable<>(ctx, 
project.rowType());
+
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        Object[][] expected = {
+            {1, 1, "Igor"},
+            {1, 1, "Alexey"},
+            {2, 2, "Roman"},
+            {5, null, "Ivan"},
+        };
+
+        checkResults(expected, rows);
+
+        node.rewind();
+
+        assert node.hasNext();
+
+        rows.clear();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        checkResults(expected, rows);
+    }
+
+    /** */
+    @Test
+    public void testEquiJoinWithDifferentBufferSize() {
+        for (JoinRelType joinType : F.asList(INNER, LEFT, RIGHT, FULL, SEMI, 
ANTI)) {
+            validateEquiJoin(joinType, 0, 0);
+            validateEquiJoin(joinType, 0, 1);
+            validateEquiJoin(joinType, 0, 10);
+            validateEquiJoin(joinType, 1, 0);
+            validateEquiJoin(joinType, 1, 1);
+            validateEquiJoin(joinType, 1, 10);
+            validateEquiJoin(joinType, 10, 0);
+            validateEquiJoin(joinType, 10, 1);
+            validateEquiJoin(joinType, 10, 10);
+
+            int testSize = IN_BUFFER_SIZE;
+
+            validateEquiJoin(joinType, 0, testSize - 1);
+            validateEquiJoin(joinType, 0, testSize);
+            validateEquiJoin(joinType, 0, testSize + 1);
+
+            validateEquiJoin(joinType, testSize - 1, 0);
+            validateEquiJoin(joinType, testSize - 1, testSize - 1);
+            validateEquiJoin(joinType, testSize - 1, testSize);
+            validateEquiJoin(joinType, testSize - 1, testSize + 1);
+
+            validateEquiJoin(joinType, testSize, 0);
+            validateEquiJoin(joinType, testSize, testSize - 1);
+            validateEquiJoin(joinType, testSize, testSize);
+            validateEquiJoin(joinType, testSize, testSize + 1);
+
+            validateEquiJoin(joinType, testSize + 1, 0);
+            validateEquiJoin(joinType, testSize + 1, testSize - 1);
+            validateEquiJoin(joinType, testSize + 1, testSize);
+            validateEquiJoin(joinType, testSize + 1, testSize + 1);
+
+            validateEquiJoin(joinType, 2 * testSize, 0);
+            validateEquiJoin(joinType, 0, 2 * testSize);
+            validateEquiJoin(joinType, 2 * testSize, 2 * testSize);
+        }
+    }
+
+    /** */
+    @Test
+    public void testNonEquiJoinWithDifferentBufferSize() {
+        JoinRelType joinType = INNER;
+
+        validateNonEquiJoin(joinType, 0, 0);
+        validateNonEquiJoin(joinType, 0, 1);
+        validateNonEquiJoin(joinType, 0, 10);
+        validateNonEquiJoin(joinType, 1, 0);
+        validateNonEquiJoin(joinType, 1, 1);
+        validateNonEquiJoin(joinType, 1, 10);
+        validateNonEquiJoin(joinType, 10, 0);
+        validateNonEquiJoin(joinType, 10, 1);
+        validateNonEquiJoin(joinType, 10, 10);
+
+        int testSize = IN_BUFFER_SIZE;
+
+        validateNonEquiJoin(joinType, 0, testSize - 1);
+        validateNonEquiJoin(joinType, 0, testSize);
+        validateNonEquiJoin(joinType, 0, testSize + 1);
+
+        validateNonEquiJoin(joinType, testSize - 1, 0);
+        validateNonEquiJoin(joinType, testSize - 1, testSize - 1);
+        validateNonEquiJoin(joinType, testSize - 1, testSize);
+        validateNonEquiJoin(joinType, testSize - 1, testSize + 1);
+
+        validateNonEquiJoin(joinType, testSize, 0);
+        validateNonEquiJoin(joinType, testSize, testSize - 1);
+        validateNonEquiJoin(joinType, testSize, testSize);
+        validateNonEquiJoin(joinType, testSize, testSize + 1);
+
+        validateNonEquiJoin(joinType, testSize + 1, 0);
+        validateNonEquiJoin(joinType, testSize + 1, testSize - 1);
+        validateNonEquiJoin(joinType, testSize + 1, testSize);
+        validateNonEquiJoin(joinType, testSize + 1, testSize + 1);
+
+        validateNonEquiJoin(joinType, 2 * testSize, 0);
+        validateNonEquiJoin(joinType, 0, 2 * testSize);
+        validateNonEquiJoin(joinType, 2 * testSize, 2 * testSize);
+    }
+
+    /** */
+    @Test
+    public void testInnerJoinWithPostFiltration() {
+        doTestJoinWithPostFiltration(INNER, new Object[][]{{3, "Alexey", 1, 1, 
"Core"}});
+    }
+
+    /** */
+    @Test
+    public void testLeftJoinWithPostFiltration() {
+        Object[][] expected = new Object[][]{

Review Comment:
   Every row from the left side should be in the set



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinExecutionTest.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.function.BiPredicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.calcite.rel.core.JoinRelType.ANTI;
+import static org.apache.calcite.rel.core.JoinRelType.FULL;
+import static org.apache.calcite.rel.core.JoinRelType.INNER;
+import static org.apache.calcite.rel.core.JoinRelType.LEFT;
+import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
+import static org.apache.calcite.rel.core.JoinRelType.SEMI;
+
+/** */
+public class HashJoinExecutionTest extends AbstractExecutionTest {
+    /** */
+    @Test
+    public void testHashJoinRewind() {
+        ExecutionContext<Object[]> ctx = executionContext();
+
+        RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
Integer.class, String.class);
+        RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), 
Integer.class, String.class, Integer.class);
+
+        ScanNode<Object[]> deps = new ScanNode<>(
+            ctx,
+            rightType,
+            Arrays.asList(
+                new Object[] {1, "Core"},
+                new Object[] {2, "SQL"},
+                new Object[] {3, "QA"}
+            ));
+
+        ScanNode<Object[]> persons = new ScanNode<>(
+            ctx,
+            leftType,
+            Arrays.asList(
+                new Object[] {0, "Igor", 1},
+                new Object[] {1, "Roman", 2},
+                new Object[] {2, "Ivan", 5},
+                new Object[] {3, "Alexey", 1}
+            ));
+
+        HashJoinNode<Object[]> join = createJoinNode(ctx, LEFT, leftType, 
rightType, null);
+
+        join.register(F.asList(persons, deps));
+
+        ProjectNode<Object[]> project = new ProjectNode<>(ctx, join.rowType(), 
r -> new Object[] {r[2], r[3], r[1]});
+        project.register(join);
+
+        RootRewindable<Object[]> node = new RootRewindable<>(ctx, 
project.rowType());
+
+        node.register(project);
+
+        assert node.hasNext();
+
+        ArrayList<Object[]> rows = new ArrayList<>();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        Object[][] expected = {
+            {1, 1, "Igor"},
+            {1, 1, "Alexey"},
+            {2, 2, "Roman"},
+            {5, null, "Ivan"},
+        };
+
+        checkResults(expected, rows);
+
+        node.rewind();
+
+        assert node.hasNext();
+
+        rows.clear();
+
+        while (node.hasNext())
+            rows.add(node.next());
+
+        checkResults(expected, rows);
+    }
+
+    /** */
+    @Test
+    public void testEquiJoinWithDifferentBufferSize() {
+        for (JoinRelType joinType : F.asList(INNER, LEFT, RIGHT, FULL, SEMI, 
ANTI)) {
+            validateEquiJoin(joinType, 0, 0);
+            validateEquiJoin(joinType, 0, 1);
+            validateEquiJoin(joinType, 0, 10);
+            validateEquiJoin(joinType, 1, 0);
+            validateEquiJoin(joinType, 1, 1);
+            validateEquiJoin(joinType, 1, 10);
+            validateEquiJoin(joinType, 10, 0);
+            validateEquiJoin(joinType, 10, 1);
+            validateEquiJoin(joinType, 10, 10);
+
+            int testSize = IN_BUFFER_SIZE;
+
+            validateEquiJoin(joinType, 0, testSize - 1);
+            validateEquiJoin(joinType, 0, testSize);
+            validateEquiJoin(joinType, 0, testSize + 1);
+
+            validateEquiJoin(joinType, testSize - 1, 0);
+            validateEquiJoin(joinType, testSize - 1, testSize - 1);
+            validateEquiJoin(joinType, testSize - 1, testSize);
+            validateEquiJoin(joinType, testSize - 1, testSize + 1);
+
+            validateEquiJoin(joinType, testSize, 0);
+            validateEquiJoin(joinType, testSize, testSize - 1);
+            validateEquiJoin(joinType, testSize, testSize);
+            validateEquiJoin(joinType, testSize, testSize + 1);
+
+            validateEquiJoin(joinType, testSize + 1, 0);
+            validateEquiJoin(joinType, testSize + 1, testSize - 1);
+            validateEquiJoin(joinType, testSize + 1, testSize);
+            validateEquiJoin(joinType, testSize + 1, testSize + 1);
+
+            validateEquiJoin(joinType, 2 * testSize, 0);
+            validateEquiJoin(joinType, 0, 2 * testSize);
+            validateEquiJoin(joinType, 2 * testSize, 2 * testSize);
+        }
+    }
+
+    /** */
+    @Test
+    public void testNonEquiJoinWithDifferentBufferSize() {
+        JoinRelType joinType = INNER;
+
+        validateNonEquiJoin(joinType, 0, 0);
+        validateNonEquiJoin(joinType, 0, 1);
+        validateNonEquiJoin(joinType, 0, 10);
+        validateNonEquiJoin(joinType, 1, 0);
+        validateNonEquiJoin(joinType, 1, 1);
+        validateNonEquiJoin(joinType, 1, 10);
+        validateNonEquiJoin(joinType, 10, 0);
+        validateNonEquiJoin(joinType, 10, 1);
+        validateNonEquiJoin(joinType, 10, 10);
+
+        int testSize = IN_BUFFER_SIZE;
+
+        validateNonEquiJoin(joinType, 0, testSize - 1);
+        validateNonEquiJoin(joinType, 0, testSize);
+        validateNonEquiJoin(joinType, 0, testSize + 1);
+
+        validateNonEquiJoin(joinType, testSize - 1, 0);
+        validateNonEquiJoin(joinType, testSize - 1, testSize - 1);
+        validateNonEquiJoin(joinType, testSize - 1, testSize);
+        validateNonEquiJoin(joinType, testSize - 1, testSize + 1);
+
+        validateNonEquiJoin(joinType, testSize, 0);
+        validateNonEquiJoin(joinType, testSize, testSize - 1);
+        validateNonEquiJoin(joinType, testSize, testSize);
+        validateNonEquiJoin(joinType, testSize, testSize + 1);
+
+        validateNonEquiJoin(joinType, testSize + 1, 0);
+        validateNonEquiJoin(joinType, testSize + 1, testSize - 1);
+        validateNonEquiJoin(joinType, testSize + 1, testSize);
+        validateNonEquiJoin(joinType, testSize + 1, testSize + 1);
+
+        validateNonEquiJoin(joinType, 2 * testSize, 0);
+        validateNonEquiJoin(joinType, 0, 2 * testSize);
+        validateNonEquiJoin(joinType, 2 * testSize, 2 * testSize);
+    }
+
+    /** */
+    @Test
+    public void testInnerJoinWithPostFiltration() {
+        doTestJoinWithPostFiltration(INNER, new Object[][]{{3, "Alexey", 1, 1, 
"Core"}});
+    }
+
+    /** */
+    @Test
+    public void testLeftJoinWithPostFiltration() {
+        Object[][] expected = new Object[][]{
+            {2, "Ival", 5, null, null},

Review Comment:
   `Ival`? There are no such value in input data, something wrong with the 
check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to