godfreyhe commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r935060142


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -220,6 +221,15 @@ public class ExecutionConfigOptions {
                                     + "The larger the memory, the higher the 
compression ratio, "
                                     + "but more memory resource will be 
consumed by the job.");
 
+    @Experimental
+    public static final ConfigOption<Long> 
TABLE_EXEC_HASH_JOIN_SPILL_THRESHOLD =

Review Comment:
   if this config option is `Experimental`, I think we can move this into 
`InternalConfigOptions`.  and add annotation `Documentation.Section`.
   
   Whether this configuration is an absolute or relative value ?  
`table.exec.resource.hash-join.memory` is just a memory weight(relative 
value),absolute value is impractical here
   



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinOperator.java:
##########
@@ -62,488 +36,44 @@
 public class SortMergeJoinOperator extends TableStreamOperator<RowData>
         implements TwoInputStreamOperator<RowData, RowData, RowData>, 
BoundedMultiInput {
 
-    private final double externalBufferMemRatio;
-    private final FlinkJoinType type;
-    private final boolean leftIsSmaller;
-    private final boolean[] filterNulls;
-
-    // generated code to cook
-    private GeneratedJoinCondition condFuncCode;
-    private GeneratedProjection projectionCode1;
-    private GeneratedProjection projectionCode2;
-    private GeneratedNormalizedKeyComputer computer1;
-    private GeneratedRecordComparator comparator1;
-    private GeneratedNormalizedKeyComputer computer2;
-    private GeneratedRecordComparator comparator2;
-    private GeneratedRecordComparator genKeyComparator;
+    private final SortMergeJoinFunction sortMergeJoinFunction;

Review Comment:
   It's better we can create an independent commit to do the refactor (no new 
feature), and apply the changes in new commit. This could make review more happy



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -189,6 +191,44 @@ protected Transformation<RowData> translateToPlanInternal(
                         joinType.isRightOuter(),
                         joinType == FlinkJoinType.SEMI,
                         joinType == FlinkJoinType.ANTI);
+
+        long hashJoinManagedMemory =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes();
+
+        long externalBufferMemory =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
+                        .getBytes();
+        long sortMemory =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }
+        long sortMergeJoinManagedMemory = externalBufferMemory * 
externalBufferNum + sortMemory * 2;
+
+        // Due to hash join maybe fallback to sort merge join, so here managed 
memory choose the
+        // large one
+        long managedMemory =

Review Comment:
   nit: Math.max()



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -144,15 +165,34 @@ public void open() throws Exception {
 
     @Override
     public void processElement1(StreamRecord<RowData> element) throws 
Exception {
-        checkState(!buildEnd, "Should not build ended.");
-        this.table.putBuildRow(element.getValue());
+        // If the data size spilled to disk more than 
spilledDataThresholdInBytes during build hash
+        // table, fallback to sort merge join early
+        if (!fallbackSMJInBuild) {

Review Comment:
   It's better we avoid checking the fallback operation for each record, which 
will cause performance degradation. We can check it for partition level when a 
partition need to spill to disk



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -214,16 +276,121 @@ void collect(RowData row1, RowData row2) throws 
Exception {
     @Override
     public void close() throws Exception {
         super.close();
+        closeHashTable();
+        condition.close();
+
+        // If fallback to sort merge join during hash join, also need to close 
the operator
+        if (fallbackSMJInBuild || fallbackSMJInProbe) {
+            sortMergeJoinFunction.close();
+        }
+    }
+
+    private void closeHashTable() {
         if (this.table != null) {
             this.table.close();
             this.table.free();
             this.table = null;
         }
-        condition.close();
+    }
+
+    /**
+     * In the process of building a hash table, if the data written to disk 
exceeds the threshold,
+     * it means that the build side is larger or there may be a more serious 
data skew, so fallback
+     * to sort merge join algorithm to deal with it in advance.
+     */
+    private void fallbackSMJProcessPartitionBuildSide(RowData rowData) throws 
Exception {
+        // spill all the in-memory partitions to disk firstly for return the 
memory which used to
+        // sort
+        this.table.spillAllInMemoryPartition();
+        LOG.info(
+                "Spill all in memory partitions to disk successfully, fallback 
to sort merge join.");
+        // initialize sort merge join function
+        initialSortMergeJoinFunction();
+        fallbackSMJInBuild = true;
+
+        // read build side data of all spilled partitions
+        for (BinaryHashPartition p : table.getPartitionsPendingForSMJ()) {
+            // process build side only
+            RowIterator<BinaryRowData> buildSideIter = 
table.getSpilledPartitionBuildSideIter(p);
+            while (buildSideIter.advanceNext()) {
+                processSortMergeJoinElement1(buildSideIter.getRow());
+            }
+        }
+
+        // close the HashTable
+        closeHashTable();
+
+        // process current record lastly
+        processSortMergeJoinElement1(rowData);
+    }
+
+    /**
+     * If here also exists partitions which spilled to disk more than three 
time when hash join end,
+     * means that the key in these partitions is very skewed, so fallback to 
sort merge join
+     * algorithm to process it.
+     */
+    private void fallbackSMJProcessPartition() throws Exception {
+        if (!table.getPartitionsPendingForSMJ().isEmpty()) {
+            // initialize sort merge join operator
+            LOG.info("Fallback to sort merge join.");
+            initialSortMergeJoinFunction();
+            fallbackSMJInProbe = true;
+
+            for (BinaryHashPartition p : table.getPartitionsPendingForSMJ()) {
+                // process build side
+                RowIterator<BinaryRowData> buildSideIter =
+                        table.getSpilledPartitionBuildSideIter(p);
+                while (buildSideIter.advanceNext()) {
+                    processSortMergeJoinElement1(buildSideIter.getRow());
+                }
+
+                // process probe side
+                ProbeIterator probeIter = 
table.getSpilledPartitionProbeSideIter(p);
+                BinaryRowData probeNext;
+                while ((probeNext = probeIter.next()) != null) {
+                    processSortMergeJoinElement2(probeNext);
+                }
+            }
+
+            // close the HashTable
+            closeHashTable();
+
+            // finish build and probe
+            sortMergeJoinFunction.endInput(1);
+            sortMergeJoinFunction.endInput(2);
+            LOG.info("Finish sort merge join.");

Review Comment:
   The log leads to  misunderstandings, just the hash partitions fallback is 
finished here.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -189,6 +191,44 @@ protected Transformation<RowData> translateToPlanInternal(
                         joinType.isRightOuter(),
                         joinType == FlinkJoinType.SEMI,
                         joinType == FlinkJoinType.ANTI);
+
+        long hashJoinManagedMemory =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes();
+
+        long externalBufferMemory =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
+                        .getBytes();
+        long sortMemory =
+                
config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }

Review Comment:
   please extract them into a util class. If we use the max value of HJ memory 
and SMJ memory, how users define the value of 
`table.exec.hash-join.spill-threshold` ?
   
   



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -214,16 +276,121 @@ void collect(RowData row1, RowData row2) throws 
Exception {
     @Override
     public void close() throws Exception {
         super.close();
+        closeHashTable();
+        condition.close();
+
+        // If fallback to sort merge join during hash join, also need to close 
the operator
+        if (fallbackSMJInBuild || fallbackSMJInProbe) {
+            sortMergeJoinFunction.close();
+        }
+    }
+
+    private void closeHashTable() {
         if (this.table != null) {
             this.table.close();
             this.table.free();
             this.table = null;
         }
-        condition.close();
+    }
+
+    /**
+     * In the process of building a hash table, if the data written to disk 
exceeds the threshold,
+     * it means that the build side is larger or there may be a more serious 
data skew, so fallback
+     * to sort merge join algorithm to deal with it in advance.
+     */
+    private void fallbackSMJProcessPartitionBuildSide(RowData rowData) throws 
Exception {
+        // spill all the in-memory partitions to disk firstly for return the 
memory which used to
+        // sort
+        this.table.spillAllInMemoryPartition();

Review Comment:
   Whether it is possible to avoid writing to the disk for the memory data, 
just write them into sorter and then read the data in the disk into sorter ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/JoinOperatorUtil.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.stream.IntStream;
+
+/** Utility for {@link SortMergeJoinOperator}. */
+public class JoinOperatorUtil {

Review Comment:
   If this util is used for SMJ, why don't we rename it to 
SortMergeJoinOperatorUtil ?
   



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/JoinUtil.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import static org.apache.flink.table.runtime.operators.join.FlinkJoinType.FULL;
+import static 
org.apache.flink.table.runtime.operators.join.FlinkJoinType.INNER;
+import static 
org.apache.flink.table.runtime.operators.join.FlinkJoinType.RIGHT;
+
+/** Utility for join. */
+public class JoinUtil {
+
+    public static FlinkJoinType getJoinType(boolean leftOuter, boolean 
rightOuter) {
+        if (leftOuter && rightOuter) {

Review Comment:
   how to represent SEMI and ANTI ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to