godfreyhe commented on code in PR #20359:
URL: https://github.com/apache/flink/pull/20359#discussion_r939500620


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.hint.JoinStrategy;
+
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.util.Util;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Resolve and validate Hints, currently only join hints are supported.
+ *
+ * <p>Here the duplicated join hints will not be checked.
+ */
+public class JoinHintResolver extends RelShuttleImpl {
+    private final Set<RelHint> allHints = new HashSet<>();
+    private final Set<RelHint> validHints = new HashSet<>();
+
+    /** Transforms a relational expression into another relational expression. 
*/
+    public List<RelNode> resolve(List<RelNode> roots) {
+        List<RelNode> resolvedRoots =
+                roots.stream().map(node -> 
node.accept(this)).collect(Collectors.toList());
+        validateHints();
+        return resolvedRoots;
+    }
+
+    @Override
+    public RelNode visit(LogicalJoin join) {
+        return visitBiRel(join);
+    }
+
+    private RelNode visitBiRel(BiRel biRel) {
+        Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
+        Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
+
+        Set<RelHint> existentKVHints = new HashSet<>();
+
+        List<RelHint> newHints =

Review Comment:
   we can use for-loop instead of stream-style. The current format is poorly 
readable



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -75,4 +90,88 @@ public static Map<String, String> mergeTableOptions(
         newProps.putAll(hints);
         return Collections.unmodifiableMap(newProps);
     }
+
+    public static Optional<String> getTableAlias(RelNode node) {
+        if (node instanceof Hintable) {
+            Hintable aliasNode = (Hintable) node;
+            List<String> aliasNames =
+                    aliasNode.getHints().stream()
+                            .filter(h -> 
h.hintName.equalsIgnoreCase(FlinkHints.HINT_ALIAS))
+                            .flatMap(h -> h.listOptions.stream())
+                            .collect(Collectors.toList());
+            if (aliasNames.size() > 0) {
+                return Optional.of(aliasNames.get(0));
+            } else {
+                if (canTransposeToTableScan(node)) {
+                    return getTableAlias(node.getInput(0));
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    public static boolean canTransposeToTableScan(RelNode node) {
+        // TODO support look up join
+        return node instanceof LogicalProject // computed column on table
+                || node instanceof LogicalFilter;
+    }
+
+    /** Returns the qualified name of a table scan, otherwise returns empty. */
+    public static Optional<String> getTableName(RelOptTable table) {
+        if (table == null) {
+            return Optional.empty();
+        }
+
+        String tableName;
+        if (table instanceof TableSourceTable) {

Review Comment:
   just use original table names



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -75,4 +90,88 @@ public static Map<String, String> mergeTableOptions(
         newProps.putAll(hints);
         return Collections.unmodifiableMap(newProps);
     }
+
+    public static Optional<String> getTableAlias(RelNode node) {
+        if (node instanceof Hintable) {
+            Hintable aliasNode = (Hintable) node;
+            List<String> aliasNames =
+                    aliasNode.getHints().stream()
+                            .filter(h -> 
h.hintName.equalsIgnoreCase(FlinkHints.HINT_ALIAS))
+                            .flatMap(h -> h.listOptions.stream())
+                            .collect(Collectors.toList());
+            if (aliasNames.size() > 0) {
+                return Optional.of(aliasNames.get(0));
+            } else {
+                if (canTransposeToTableScan(node)) {
+                    return getTableAlias(node.getInput(0));
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    public static boolean canTransposeToTableScan(RelNode node) {
+        // TODO support look up join
+        return node instanceof LogicalProject // computed column on table
+                || node instanceof LogicalFilter;
+    }
+
+    /** Returns the qualified name of a table scan, otherwise returns empty. */
+    public static Optional<String> getTableName(RelOptTable table) {
+        if (table == null) {
+            return Optional.empty();
+        }
+
+        String tableName;
+        if (table instanceof TableSourceTable) {
+            tableName =
+                    ((TableSourceTable) table)
+                            .contextResolvedTable()
+                            .getIdentifier()
+                            .asSummaryString();
+        } else if (table instanceof LegacyTableSourceTable) {
+            tableName = ((LegacyTableSourceTable<?>) 
table).tableIdentifier().asSummaryString();
+        } else if (table instanceof DataStreamTable) {
+            tableName = StringUtils.join(((DataStreamTable<?>) 
table).getNames(), '.');
+        } else {
+            throw new TableException(
+                    String.format(
+                            "Could not get the table name with the unknown 
table class `%s`",
+                            table.getClass().getCanonicalName()));
+        }
+
+        return Optional.of(tableName);
+    }
+
+    public static String stringifyHints(List<RelHint> hints) {
+        StringBuilder sb = new StringBuilder();
+        boolean first = true;
+        for (RelHint h : hints) {
+            if (h.hintName.equalsIgnoreCase(FlinkHints.HINT_ALIAS)) {
+                continue;
+            }
+            if (!first) {
+                sb.append(", ");
+            }
+            sb.append(h.hintName);
+            if (h.listOptions.size() > 0) {
+                sb.append("(").append(String.join(", ", 
h.listOptions)).append(")");
+            } else if (h.kvOptions.size() > 0) {
+                String mapStr =
+                        h.kvOptions.entrySet().stream()
+                                .map(e -> e.getKey() + "=" + e.getValue())
+                                .collect(Collectors.joining(", "));
+                sb.append("(").append(mapStr).append(")");

Review Comment:
   nit: use ` joining(CharSequence delimiter, CharSequence prefix, CharSequence 
suffix)`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.hint.JoinStrategy;
+
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.util.Util;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Resolve and validate Hints, currently only join hints are supported.
+ *
+ * <p>Here the duplicated join hints will not be checked.
+ */
+public class JoinHintResolver extends RelShuttleImpl {
+    private final Set<RelHint> allHints = new HashSet<>();
+    private final Set<RelHint> validHints = new HashSet<>();
+
+    /** Transforms a relational expression into another relational expression. 
*/
+    public List<RelNode> resolve(List<RelNode> roots) {
+        List<RelNode> resolvedRoots =
+                roots.stream().map(node -> 
node.accept(this)).collect(Collectors.toList());
+        validateHints();
+        return resolvedRoots;
+    }
+
+    @Override
+    public RelNode visit(LogicalJoin join) {
+        return visitBiRel(join);
+    }
+
+    private RelNode visitBiRel(BiRel biRel) {
+        Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
+        Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
+
+        Set<RelHint> existentKVHints = new HashSet<>();
+
+        List<RelHint> newHints =
+                ((Hintable) biRel)
+                        .getHints().stream()
+                                .flatMap(
+                                        h -> {
+                                            if 
(JoinStrategy.isJoinStrategy(h.hintName)) {
+                                                
allHints.add(trimInheritPath(h));
+                                                // if the hint is valid
+                                                List<String> newOptions =
+                                                        getNewJoinHintOptions(
+                                                                leftName,
+                                                                rightName,
+                                                                h.listOptions,
+                                                                h.hintName);
+
+                                                // check whether the join 
hints options are valid
+                                                boolean isValidOption =
+                                                        
JoinStrategy.validOptions(
+                                                                h.hintName, 
newOptions);
+                                                if (isValidOption) {
+                                                    
validHints.add(trimInheritPath(h));
+                                                    // if the hint defines 
more than one args, only
+                                                    // retain the first one
+                                                    return Stream.of(
+                                                            
RelHint.builder(h.hintName)
+                                                                    
.hintOptions(
+                                                                            
singletonList(
+                                                                               
     newOptions.get(
+                                                                               
             0)))
+                                                                    .build());
+                                                } else {
+                                                    // invalid hint
+                                                    return Stream.of();
+                                                }
+                                            } else {
+                                                //                             
                   //
+                                                // filter alias hints
+                                                //                             
                   if
+                                                // 
(h.hintName.equals(FlinkHints.HINT_ALIAS)) {
+                                                //
+                                                //  return Stream.of();
+                                                //                             
                   }
+                                                if 
(existentKVHints.contains(h)) {
+                                                    return Stream.of();
+                                                } else {
+                                                    existentKVHints.add(h);
+                                                    return Stream.of(h);
+                                                }
+                                            }
+                                        })
+                                .collect(Collectors.toList());
+        RelNode newNode = super.visitChildren(biRel);
+
+        List<RelHint> oldJoinHints =
+                ((Hintable) biRel)
+                        .getHints().stream()
+                                // ignore the alias hint
+                                .filter(hint -> 
JoinStrategy.isJoinStrategy(hint.hintName))
+                                .collect(Collectors.toList());
+        if (!oldJoinHints.isEmpty()) {
+            // replace the table name as LEFT or RIGHT
+            return ((Hintable) newNode).withHints(newHints);
+        }
+        // has no hints, return original node directly.
+        return newNode;
+    }
+
+    private List<String> getNewJoinHintOptions(
+            Optional<String> leftName,
+            Optional<String> rightName,
+            List<String> listOptions,
+            String hintName) {
+        return listOptions.stream()
+                .map(
+                        option -> {
+                            if (leftName.isPresent()
+                                    && rightName.isPresent()
+                                    && matchIdentifier(option, leftName.get())
+                                    && matchIdentifier(option, 
rightName.get())) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Ambitious option: %s in hint: 
%s, the input "
+                                                        + "relations are: %s, 
%s",
+                                                option, hintName, leftName, 
rightName));
+                            } else if (leftName.isPresent()
+                                    && matchIdentifier(option, 
leftName.get())) {
+                                return JoinStrategy.LEFT_INPUT;
+                            } else if (rightName.isPresent()
+                                    && matchIdentifier(option, 
rightName.get())) {
+                                return JoinStrategy.RIGHT_INPUT;
+                            } else {
+                                return "";
+                            }
+                        })
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toList());
+    }
+
+    private void validateHints() {
+        Set<RelHint> invalidHints = new HashSet<>(allHints);
+        invalidHints.removeAll(validHints);
+        if (!invalidHints.isEmpty()) {
+            String errorMsg =
+                    invalidHints.stream()
+                            .map(
+                                    hint ->
+                                            "\n`"
+                                                    + hint.hintName
+                                                    + "("
+                                                    + 
StringUtils.join(hint.listOptions, ", ")
+                                                    + ")`")
+                            .reduce("", (msg, hintMsg) -> msg + hintMsg);

Review Comment:
   reduce -> collect(Collectors.joining()) ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -75,4 +90,88 @@ public static Map<String, String> mergeTableOptions(
         newProps.putAll(hints);
         return Collections.unmodifiableMap(newProps);
     }
+
+    public static Optional<String> getTableAlias(RelNode node) {
+        if (node instanceof Hintable) {
+            Hintable aliasNode = (Hintable) node;
+            List<String> aliasNames =
+                    aliasNode.getHints().stream()
+                            .filter(h -> 
h.hintName.equalsIgnoreCase(FlinkHints.HINT_ALIAS))
+                            .flatMap(h -> h.listOptions.stream())
+                            .collect(Collectors.toList());
+            if (aliasNames.size() > 0) {
+                return Optional.of(aliasNames.get(0));
+            } else {
+                if (canTransposeToTableScan(node)) {
+                    return getTableAlias(node.getInput(0));
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    public static boolean canTransposeToTableScan(RelNode node) {
+        // TODO support look up join
+        return node instanceof LogicalProject // computed column on table
+                || node instanceof LogicalFilter;
+    }
+
+    /** Returns the qualified name of a table scan, otherwise returns empty. */
+    public static Optional<String> getTableName(RelOptTable table) {
+        if (table == null) {
+            return Optional.empty();
+        }
+
+        String tableName;
+        if (table instanceof TableSourceTable) {
+            tableName =
+                    ((TableSourceTable) table)
+                            .contextResolvedTable()
+                            .getIdentifier()
+                            .asSummaryString();
+        } else if (table instanceof LegacyTableSourceTable) {
+            tableName = ((LegacyTableSourceTable<?>) 
table).tableIdentifier().asSummaryString();
+        } else if (table instanceof DataStreamTable) {

Review Comment:
   we can ignore DataStreamTable, which is deprecated and only used for testing



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashJoinRule.scala:
##########
@@ -50,23 +47,10 @@ class BatchPhysicalHashJoinRule
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val join: Join = call.rel(0)
-    val joinInfo = join.analyzeCondition
-    // join keys must not be empty
-    if (joinInfo.pairs().isEmpty) {
-      return false
-    }
-
     val tableConfig = unwrapTableConfig(call)
-    val isShuffleHashJoinEnabled = !isOperatorDisabled(tableConfig, 
OperatorType.ShuffleHashJoin)
-    val isBroadcastHashJoinEnabled =
-      !isOperatorDisabled(tableConfig, OperatorType.BroadcastHashJoin)
-
-    val leftSize = binaryRowRelNodeSize(join.getLeft)
-    val rightSize = binaryRowRelNodeSize(join.getRight)
-    val (isBroadcast, _) = canBroadcast(join.getJoinType, leftSize, rightSize, 
tableConfig)
 
-    // TODO use shuffle hash join if isBroadcast is true and 
isBroadcastHashJoinEnabled is false ?
-    if (isBroadcast) isBroadcastHashJoinEnabled else isShuffleHashJoinEnabled
+    canUseJoinStrategy(join, tableConfig, JoinStrategy.BROADCAST) ||

Review Comment:
   we can get TableConfig from RelNode using unwrapTableConfig(join). So 
tableConfig is not necessary parameter



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.hint.JoinStrategy;
+
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.util.Util;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Resolve and validate Hints, currently only join hints are supported.
+ *
+ * <p>Here the duplicated join hints will not be checked.
+ */
+public class JoinHintResolver extends RelShuttleImpl {

Review Comment:
   do we have any test to verify the plan after hint resolved ? I think it's a 
very important test to check the hint propagation



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -75,4 +90,88 @@ public static Map<String, String> mergeTableOptions(
         newProps.putAll(hints);
         return Collections.unmodifiableMap(newProps);
     }
+
+    public static Optional<String> getTableAlias(RelNode node) {
+        if (node instanceof Hintable) {
+            Hintable aliasNode = (Hintable) node;
+            List<String> aliasNames =
+                    aliasNode.getHints().stream()
+                            .filter(h -> 
h.hintName.equalsIgnoreCase(FlinkHints.HINT_ALIAS))
+                            .flatMap(h -> h.listOptions.stream())
+                            .collect(Collectors.toList());
+            if (aliasNames.size() > 0) {
+                return Optional.of(aliasNames.get(0));
+            } else {
+                if (canTransposeToTableScan(node)) {
+                    return getTableAlias(node.getInput(0));
+                }
+            }

Review Comment:
   nit: else if



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.hint.JoinStrategy;
+
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.util.Util;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Resolve and validate Hints, currently only join hints are supported.
+ *
+ * <p>Here the duplicated join hints will not be checked.
+ */
+public class JoinHintResolver extends RelShuttleImpl {
+    private final Set<RelHint> allHints = new HashSet<>();
+    private final Set<RelHint> validHints = new HashSet<>();
+
+    /** Transforms a relational expression into another relational expression. 
*/
+    public List<RelNode> resolve(List<RelNode> roots) {
+        List<RelNode> resolvedRoots =
+                roots.stream().map(node -> 
node.accept(this)).collect(Collectors.toList());
+        validateHints();
+        return resolvedRoots;
+    }
+
+    @Override
+    public RelNode visit(LogicalJoin join) {
+        return visitBiRel(join);
+    }
+
+    private RelNode visitBiRel(BiRel biRel) {
+        Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
+        Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
+
+        Set<RelHint> existentKVHints = new HashSet<>();
+
+        List<RelHint> newHints =
+                ((Hintable) biRel)
+                        .getHints().stream()
+                                .flatMap(
+                                        h -> {
+                                            if 
(JoinStrategy.isJoinStrategy(h.hintName)) {
+                                                
allHints.add(trimInheritPath(h));
+                                                // if the hint is valid
+                                                List<String> newOptions =
+                                                        getNewJoinHintOptions(
+                                                                leftName,
+                                                                rightName,
+                                                                h.listOptions,
+                                                                h.hintName);
+
+                                                // check whether the join 
hints options are valid
+                                                boolean isValidOption =
+                                                        
JoinStrategy.validOptions(
+                                                                h.hintName, 
newOptions);
+                                                if (isValidOption) {
+                                                    
validHints.add(trimInheritPath(h));
+                                                    // if the hint defines 
more than one args, only
+                                                    // retain the first one
+                                                    return Stream.of(
+                                                            
RelHint.builder(h.hintName)
+                                                                    
.hintOptions(
+                                                                            
singletonList(
+                                                                               
     newOptions.get(
+                                                                               
             0)))
+                                                                    .build());
+                                                } else {
+                                                    // invalid hint
+                                                    return Stream.of();
+                                                }
+                                            } else {
+                                                //                             
                   //
+                                                // filter alias hints
+                                                //                             
                   if
+                                                // 
(h.hintName.equals(FlinkHints.HINT_ALIAS)) {
+                                                //
+                                                //  return Stream.of();
+                                                //                             
                   }
+                                                if 
(existentKVHints.contains(h)) {
+                                                    return Stream.of();
+                                                } else {
+                                                    existentKVHints.add(h);
+                                                    return Stream.of(h);
+                                                }
+                                            }
+                                        })
+                                .collect(Collectors.toList());
+        RelNode newNode = super.visitChildren(biRel);
+
+        List<RelHint> oldJoinHints =
+                ((Hintable) biRel)
+                        .getHints().stream()
+                                // ignore the alias hint
+                                .filter(hint -> 
JoinStrategy.isJoinStrategy(hint.hintName))
+                                .collect(Collectors.toList());
+        if (!oldJoinHints.isEmpty()) {
+            // replace the table name as LEFT or RIGHT
+            return ((Hintable) newNode).withHints(newHints);
+        }
+        // has no hints, return original node directly.
+        return newNode;
+    }
+
+    private List<String> getNewJoinHintOptions(
+            Optional<String> leftName,
+            Optional<String> rightName,
+            List<String> listOptions,
+            String hintName) {
+        return listOptions.stream()
+                .map(
+                        option -> {
+                            if (leftName.isPresent()
+                                    && rightName.isPresent()
+                                    && matchIdentifier(option, leftName.get())
+                                    && matchIdentifier(option, 
rightName.get())) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Ambitious option: %s in hint: 
%s, the input "
+                                                        + "relations are: %s, 
%s",
+                                                option, hintName, leftName, 
rightName));
+                            } else if (leftName.isPresent()
+                                    && matchIdentifier(option, 
leftName.get())) {
+                                return JoinStrategy.LEFT_INPUT;
+                            } else if (rightName.isPresent()
+                                    && matchIdentifier(option, 
rightName.get())) {
+                                return JoinStrategy.RIGHT_INPUT;
+                            } else {
+                                return "";
+                            }
+                        })
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toList());
+    }
+
+    private void validateHints() {
+        Set<RelHint> invalidHints = new HashSet<>(allHints);
+        invalidHints.removeAll(validHints);
+        if (!invalidHints.isEmpty()) {
+            String errorMsg =
+                    invalidHints.stream()
+                            .map(
+                                    hint ->
+                                            "\n`"
+                                                    + hint.hintName
+                                                    + "("
+                                                    + 
StringUtils.join(hint.listOptions, ", ")
+                                                    + ")`")
+                            .reduce("", (msg, hintMsg) -> msg + hintMsg);
+            throw new ValidationException(
+                    String.format(
+                            "The options of following hints cannot match the 
name of "
+                                    + "input tables or views: %s",
+                            errorMsg));
+        }
+    }
+
+    private RelHint trimInheritPath(RelHint hint) {
+        RelHint.Builder builder = RelHint.builder(hint.hintName);
+        if (hint.listOptions.isEmpty()) {
+            return builder.hintOptions(hint.kvOptions).build();
+        } else {
+            return builder.hintOptions(hint.listOptions).build();
+        }
+    }
+
+    private Optional<String> extractAliasOrTableName(RelNode node) {
+        // check whether the input relation is converted from a view
+        Optional<String> aliasName = FlinkHints.getTableAlias(node);
+        if (aliasName.isPresent()) {
+            return aliasName;
+        }
+        // otherwise, the option may be a table name
+        Optional<TableScan> tableScan = getTableScan(node);
+        if (tableScan.isPresent()) {
+            Optional<String> tableName = 
FlinkHints.getTableName(tableScan.get().getTable());
+            if (tableName.isPresent()) {
+                return tableName;
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<TableScan> getTableScan(RelNode node) {
+        if (node instanceof TableScan) {
+            return Optional.of((TableScan) node);
+        } else {
+            if (FlinkHints.canTransposeToTableScan(node)) {
+                return getTableScan(trimHep(node.getInput(0)));
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
+
+    private RelNode trimHep(RelNode node) {
+        if (node instanceof HepRelVertex) {
+            return ((HepRelVertex) node).getCurrentRel();
+        } else if (node instanceof RelSubset) {
+            RelSubset subset = ((RelSubset) node);
+            return Util.first(subset.getBest(), subset.getOriginal());
+        } else {

Review Comment:
   why we should consider HepRelVertex and RelSubset they only exist in 
optimization phase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to