lincoln-lil commented on code in PR #27734:
URL: https://github.com/apache/flink/pull/27734#discussion_r2905027596


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DeltaJoinAssociation.java:
##########
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.spec;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A class to store the join association between source (stream side) and dest 
(lookup side) for
+ * delta join.
+ */
+public class DeltaJoinAssociation {
+
+    /** The info of the binary input. */
+    private final Map<Integer, BinaryInputInfo> binaryInputInfos;
+
+    /**
+     * The join association from one binary input to one binary input.
+     *
+     * <p>For example, the join tree is like:
+     *
+     * <pre>{@code
+     *    DeltaJoin
+     *   /        \
+     * #0 A      #1 B
+     * }</pre>
+     *
+     * <p>This map will contain '<0, <1, association>>' and '<1, <0, 
association>>'.
+     */
+    private final Map<Integer, Map<Integer, Association>> 
binary2BinaryJoinAssociation;
+
+    /**
+     * The join association from multi binary inputs to one binary input.
+     *
+     * <p>For example, the join tree is like:
+     *
+     * <pre>{@code
+     *       DeltaJoin
+     *      /        \
+     *   DeltaJoin  #2 C
+     *    /      \
+     * #0 A     #1 B
+     * }</pre>
+     *
+     * <p>This map will contain '<[0, 1], <2, association>>'.
+     */
+    private final Map<Set<Integer>, Map<Integer, Association>>
+            compositeBinary2BinaryJoinAssociation;
+
+    private final DeltaJoinTree joinTree;
+
+    private DeltaJoinAssociation(
+            Map<Integer, BinaryInputInfo> binaryInputInfos,
+            Map<Integer, Map<Integer, Association>> 
binary2BinaryJoinAssociation,
+            Map<Set<Integer>, Map<Integer, Association>> 
compositeBinary2BinaryJoinAssociation,
+            DeltaJoinTree joinTree) {
+        this.binaryInputInfos = binaryInputInfos;
+        this.binary2BinaryJoinAssociation = binary2BinaryJoinAssociation;
+        this.compositeBinary2BinaryJoinAssociation = 
compositeBinary2BinaryJoinAssociation;
+        this.joinTree = joinTree;
+
+        // check binaryInputInfos
+        // should begin with 0 to binaryInputInfos.size() - 1
+        Preconditions.checkArgument(
+                IntStream.range(0, binaryInputInfos.size())
+                        .allMatch(binaryInputInfos::containsKey));
+
+        // check binary2BinaryJoinAssociation
+        // all source and dest should be in binaryInputTables
+        int totalInputBinaryTableSize = binaryInputInfos.size();
+        Preconditions.checkArgument(
+                binary2BinaryJoinAssociation.keySet().stream()
+                        .allMatch(source -> source >= 0 && source < 
totalInputBinaryTableSize));
+        Preconditions.checkArgument(
+                binary2BinaryJoinAssociation.values().stream()
+                        .flatMap(v -> v.keySet().stream())
+                        .allMatch(target -> target >= 0 && target < 
totalInputBinaryTableSize));
+        Preconditions.checkArgument(
+                binary2BinaryJoinAssociation.values().stream().allMatch(v -> 
v.size() == 1),
+                "Currently, each binary input can only be directly associated 
with at most one binary input.");
+
+        // check compositeBinary2BinaryJoinAssociation
+        // all source and dest should be in binaryInputTables
+        Preconditions.checkArgument(
+                compositeBinary2BinaryJoinAssociation.keySet().stream()
+                        .allMatch(s -> s.size() > 1));
+        Preconditions.checkArgument(
+                compositeBinary2BinaryJoinAssociation.keySet().stream()
+                        .allMatch(
+                                s ->
+                                        s.stream()
+                                                .allMatch(
+                                                        i ->
+                                                                i >= 0
+                                                                        && i
+                                                                               
 < totalInputBinaryTableSize)));
+        Preconditions.checkArgument(
+                compositeBinary2BinaryJoinAssociation.values().stream()
+                        .flatMap(v -> v.keySet().stream())
+                        .allMatch(target -> target >= 0 && target < 
totalInputBinaryTableSize));
+    }
+
+    public static DeltaJoinAssociation create(
+            StreamPhysicalTableSourceScan binaryInputTable, @Nullable 
RexProgram calcOnTable) {
+        DeltaJoinTree.BinaryInputNode inputNode =
+                new DeltaJoinTree.BinaryInputNode(
+                        0,
+                        
FlinkTypeFactory.toLogicalRowType(binaryInputTable.getRowType()),
+                        calcOnTable);
+        return new DeltaJoinAssociation(
+                Collections.singletonMap(0, 
BinaryInputInfo.of(binaryInputTable, calcOnTable)),
+                new HashMap<>(),
+                new HashMap<>(),
+                new DeltaJoinTree(inputNode));
+    }
+
+    public static DeltaJoinAssociation create(
+            FlinkJoinType joinType,
+            RexNode joinCondition,
+            int[] leftJoinKey,
+            int[] rightJoinKey,
+            StreamPhysicalTableSourceScan leftBinaryInputTable,
+            @Nullable RexProgram calcOnLeftTable,
+            StreamPhysicalTableSourceScan rightBinaryInputTable,
+            @Nullable RexProgram calcOnRightTable,
+            Association left2RightAssociation,
+            Association right2LeftAssociation) {
+        Map<Integer, BinaryInputInfo> binaryInputTables = new HashMap<>();
+        binaryInputTables.put(0, BinaryInputInfo.of(leftBinaryInputTable, 
calcOnLeftTable));
+        binaryInputTables.put(1, BinaryInputInfo.of(rightBinaryInputTable, 
calcOnRightTable));
+
+        Map<Integer, Map<Integer, Association>> allJoinAssociation = new 
HashMap<>();
+        allJoinAssociation.put(0, new HashMap<>());
+        allJoinAssociation.get(0).put(1, left2RightAssociation);
+        allJoinAssociation.put(1, new HashMap<>());
+        allJoinAssociation.get(1).put(0, right2LeftAssociation);
+
+        DeltaJoinTree.BinaryInputNode leftInputNode =
+                new DeltaJoinTree.BinaryInputNode(
+                        0,
+                        
FlinkTypeFactory.toLogicalRowType(leftBinaryInputTable.getRowType()),
+                        calcOnLeftTable);
+        DeltaJoinTree.BinaryInputNode rightInputNode =
+                new DeltaJoinTree.BinaryInputNode(
+                        1,
+                        
FlinkTypeFactory.toLogicalRowType(rightBinaryInputTable.getRowType()),
+                        calcOnRightTable);
+        DeltaJoinTree.JoinNode root =
+                new DeltaJoinTree.JoinNode(
+                        joinType,
+                        joinCondition,
+                        leftJoinKey,
+                        rightJoinKey,
+                        leftInputNode,
+                        rightInputNode,
+                        null);
+
+        return new DeltaJoinAssociation(
+                binaryInputTables, allJoinAssociation, new HashMap<>(), new 
DeltaJoinTree(root));
+    }
+
+    public DeltaJoinAssociation merge(
+            DeltaJoinAssociation other,
+            FlinkJoinType joinType,
+            RexNode condition,
+            int[] leftJoinKey,
+            int[] rightJoinKey,
+            @Nullable RexProgram calcOnLeftBottomDeltaJoin,
+            @Nullable RexProgram calcOnRightBottomDeltaJoin) {
+        int shift = this.getBinaryInputCount();
+        Map<Integer, BinaryInputInfo> newBinaryInputInfos = new 
HashMap<>(this.binaryInputInfos);
+        newBinaryInputInfos.putAll(
+                other.binaryInputInfos.entrySet().stream()
+                        .collect(Collectors.toMap(e -> e.getKey() + shift, 
Map.Entry::getValue)));
+
+        Map<Integer, Map<Integer, Association>> newAllJoinAssociation =
+                new HashMap<>(this.binary2BinaryJoinAssociation);
+        for (Map.Entry<Integer, Map<Integer, Association>> entryOnEachSource :
+                other.binary2BinaryJoinAssociation.entrySet()) {
+            Map<Integer, Association> newAssociation = new HashMap<>();
+            for (Map.Entry<Integer, Association> entryOnEachDest :
+                    entryOnEachSource.getValue().entrySet()) {
+                newAssociation.put(entryOnEachDest.getKey() + shift, 
entryOnEachDest.getValue());
+            }
+
+            newAllJoinAssociation.put(entryOnEachSource.getKey() + shift, 
newAssociation);
+        }
+
+        Map<Set<Integer>, Map<Integer, Association>> 
newCompositeBinary2BinaryJoinAssociation =
+                new HashMap<>(this.compositeBinary2BinaryJoinAssociation);
+        for (Map.Entry<Set<Integer>, Map<Integer, Association>> 
entryOnEachComposite :
+                other.compositeBinary2BinaryJoinAssociation.entrySet()) {
+            Set<Integer> newComposite =
+                    entryOnEachComposite.getKey().stream()
+                            .map(i -> i + shift)
+                            .collect(Collectors.toSet());
+            Map<Integer, Association> newAssociation = new HashMap<>();
+            for (Map.Entry<Integer, Association> entryOnEachDest :
+                    entryOnEachComposite.getValue().entrySet()) {
+                newAssociation.put(entryOnEachDest.getKey() + shift, 
entryOnEachDest.getValue());
+            }
+            newCompositeBinary2BinaryJoinAssociation.put(newComposite, 
newAssociation);
+        }
+
+        DeltaJoinTree.Node thisRootNode = this.joinTree.root;
+        if (calcOnLeftBottomDeltaJoin != null) {
+            Preconditions.checkState(thisRootNode instanceof 
DeltaJoinTree.JoinNode);
+            thisRootNode =
+                    ((DeltaJoinTree.JoinNode) thisRootNode)
+                            .addCalcOnJoinNode(calcOnLeftBottomDeltaJoin);
+        }
+
+        DeltaJoinTree otherTree = other.joinTree.shiftInputIndex(shift);
+        DeltaJoinTree.Node otherRootNode = otherTree.root;
+        if (calcOnRightBottomDeltaJoin != null) {
+            Preconditions.checkState(otherRootNode instanceof 
DeltaJoinTree.JoinNode);
+            otherRootNode =
+                    ((DeltaJoinTree.JoinNode) otherRootNode)
+                            .addCalcOnJoinNode(calcOnRightBottomDeltaJoin);
+        }
+
+        DeltaJoinTree newTree =
+                new DeltaJoinTree(
+                        new DeltaJoinTree.JoinNode(
+                                joinType,
+                                condition,
+                                leftJoinKey,
+                                rightJoinKey,
+                                thisRootNode,
+                                otherRootNode,
+                                null));
+
+        return new DeltaJoinAssociation(
+                newBinaryInputInfos,
+                newAllJoinAssociation,
+                newCompositeBinary2BinaryJoinAssociation,
+                newTree);
+    }
+
+    public int getBinaryInputCount() {
+        return binaryInputInfos.size();
+    }
+
+    public DeltaJoinTree getJoinTree() {
+        return joinTree;
+    }
+
+    public void addJoinAssociation(int sourceOrdinal, int destOrdinal, 
Association association) {
+        binary2BinaryJoinAssociation
+                .computeIfAbsent(sourceOrdinal, k -> new HashMap<>())
+                .put(destOrdinal, association);
+    }
+
+    public void addJoinAssociation(
+            Set<Integer> sourceOrdinals, int destOrdinal, Association 
association) {
+        if (sourceOrdinals.size() == 1) {
+            addJoinAssociation(sourceOrdinals.iterator().next(), destOrdinal, 
association);
+            return;
+        }
+        compositeBinary2BinaryJoinAssociation
+                .compute(

Review Comment:
   Can be simplified to `.computeIfAbsent(sourceOrdinals, k -> new 
HashMap<>())` ?
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java:
##########
@@ -152,24 +197,308 @@ protected RelDataType deriveRowType() {
     }
 
     @Override
-    public com.google.common.collect.ImmutableList<RelHint> getHints() {
-        return hints;
+    public RelWriter explainTerms(RelWriter pw) {
+        pw =
+                pw.input("left", left)
+                        .input("right", right)
+                        .item("joinType", 
JoinTypeUtil.getFlinkJoinType(joinType).toString())
+                        .item(
+                                "where",
+                                getExpressionString(
+                                        condition,
+                                        JavaScalaConversionUtil.toScala(
+                                                        
this.getRowType().getFieldNames())
+                                                .toList(),
+                                        
JavaScalaConversionUtil.toScala(Optional.empty()),
+                                        
RelExplainUtil.preferExpressionFormat(pw),
+                                        
RelExplainUtil.preferExpressionDetail(pw)));
+
+        switch (deltaJoinPattern) {
+            case BINARY:
+                break;

Review Comment:
   nit: this makes the digest different from others, maybe we can have a 
unified format(or do you just want to make it shorter?)



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -270,10 +279,548 @@ public static boolean isJoinTypeSupported(FlinkJoinType 
flinkJoinType) {
     }
 
     /**
-     * get the lookup key from the join keys.
+     * Try to build lookup chain for delta join to do lookup.
+     *
+     * <p>Take the following join tree as example. Each leaf table has columns 
named with its
+     * lowercase letter and a number, e.g., A(a0, a1), B(b0, b1, b2), C(c0, 
c1), D(d0, d1, d2).
+     *
+     * <pre>{@code
+     *                     Top
+     *            (a1 = c1 and b2 = d2)
+     *            /                       \
+     *       Bottom1                      Bottom2
+     *      (a0 = b0)                    (c0 = d0)
+     *    /         \                    /       \
+     * A(a0,a1)  B(b0,b1,b2)        C(c0,c1)  D(d0,d1,d2)
+     *
+     * }</pre>
+     *
+     * <p>If Bottom1 is treated as stream side and Bottom2 is treated as 
lookup side, the lookup
+     * chain will be like this:
+     *
+     * <p>use A + B to lookup C with (a1 = c1) -> use C to lookup D with (c0 = 
d0).
+     */
+    public static DeltaJoinLookupChain 
buildLookupChainAndUpdateTopJoinAssociation(
+            JoinSpec topJoinSpec,
+            List<IntPair> joinKeysForLeftToRight,
+            DeltaJoinAssociation joinAssociationOnLeft,
+            DeltaJoinAssociation joinAssociationOnRight,
+            RelNode topLeftSide,
+            RelNode topRightSide,
+            // if true, left is treated as stream side with bottom1;
+            // otherwise right is treated as stream side with bottom1
+            boolean leftIsStreamSide,
+            DeltaJoinAssociation topJoinAssociation,
+            @Nullable RexProgram calcOnLookupSide,
+            FlinkTypeFactory typeFactory) {
+
+        Preconditions.checkArgument(
+                !joinKeysForLeftToRight.isEmpty(),
+                "There must be at least one equality condition on the join 
condition.");
+
+        DeltaJoinAssociation joinAssociationInBottom2 =
+                leftIsStreamSide ? joinAssociationOnRight : 
joinAssociationOnLeft;
+        IntPair[] joinKeysForBottom1To2 =
+                leftIsStreamSide
+                        ? joinKeysForLeftToRight.toArray(new IntPair[0])
+                        : reverseIntPairs(joinKeysForLeftToRight.toArray(new 
IntPair[0]));
+
+        IntPair[] joinKeysFromBottom1To2TransposedFromCalc =
+                getJoinKeyPassThroughCalc(joinKeysForBottom1To2, 
calcOnLookupSide);
+
+        List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, IntPair[]>>
+                joinKeyOfDifferentBinaryTablesOnBottom2 =
+                        splitJoinKeysOfDifferentBinaryTablesOnLookupSide(
+                                joinKeysFromBottom1To2TransposedFromCalc,
+                                joinAssociationInBottom2,
+                                typeFactory);
+
+        String joinKeyErrorMessage =
+                joinKeysToString(
+                        joinKeysForLeftToRight,
+                        topLeftSide.getRowType().getFieldNames(),
+                        topRightSide.getRowType().getFieldNames());
+
+        Tuple2<LookupBinaryInputInfo, DeltaJoinSpec> pickedBinaryInput =
+                pickAnyBinaryTableOnLookupSideToLookup(
+                        topJoinSpec,
+                        joinAssociationInBottom2,
+                        joinKeyOfDifferentBinaryTablesOnBottom2,
+                        joinKeyErrorMessage);
+        DeltaJoinSpec pickedBinaryInputDeltaJoinSpec = pickedBinaryInput.f1;
+
+        int[] streamSideBinaryInputOrdinalsWithOffset =
+                leftIsStreamSide
+                        ? 
joinAssociationOnLeft.getAllBinaryInputOrdinals().stream()
+                                .mapToInt(i -> i)
+                                .toArray()
+                        : joinAssociationOnRight
+                                .getAllBinaryInputOrdinalsWithOffset(
+                                        
joinAssociationOnLeft.getBinaryInputCount())
+                                .stream()
+                                .mapToInt(i -> i)
+                                .toArray();
+
+        final FlinkJoinType stream2LookupSideJoinType =
+                leftIsStreamSide
+                        ? topJoinSpec.getJoinType()
+                        : swapJoinType(topJoinSpec.getJoinType());
+        int lookupSideBinaryOrdinalShift =
+                leftIsStreamSide ? joinAssociationOnLeft.getBinaryInputCount() 
: 0;
+        int pickedBinaryInputOrdinal = pickedBinaryInput.f0.binaryInputOrdinal;
+        int pickedBinaryInputOrdinalOnTopJoin =
+                pickedBinaryInputOrdinal + lookupSideBinaryOrdinalShift;
+        int totalLookupCount = joinAssociationInBottom2.getBinaryInputCount();
+
+        topJoinAssociation.addJoinAssociation(
+                Arrays.stream(streamSideBinaryInputOrdinalsWithOffset)
+                        .boxed()
+                        .collect(Collectors.toSet()),
+                pickedBinaryInputOrdinalOnTopJoin,
+                DeltaJoinAssociation.Association.of(
+                        stream2LookupSideJoinType, 
pickedBinaryInputDeltaJoinSpec));
+
+        return buildLookupChain(
+                streamSideBinaryInputOrdinalsWithOffset,
+                pickedBinaryInput.f0,
+                pickedBinaryInputDeltaJoinSpec,
+                joinAssociationInBottom2,
+                lookupSideBinaryOrdinalShift,
+                stream2LookupSideJoinType,
+                totalLookupCount);
+    }
+
+    /**
+     * Split the join keys on the lookup side into different binary inputs.
+     *
+     * <p>If the lookup side has multi calc between top join and scan, the 
returned join keys will
+     * transpose all these calc.
+     *
+     * <p>Take the following join tree as example. Each leaf table has columns 
named with its
+     * lowercase letter and a number, e.g., A(a0, a1), B(b0, b1, b2), C(c0, 
c1), D(d0, d1, d2).
+     *
+     * <pre>{@code
+     *                    Top
+     *            (a1 = c1 and b2 = d2)
+     *         /                        \
+     *       Bottom1                   Bottom2
+     *      (a0 = b0)                  (c0 = d0)
+     *     /       \                    /      \
+     * A(a0,a1)  B(b0,b1,b2)        C(c0,c1)  D(d0,d1,d2)
+     *
+     * }</pre>
+     *
+     * <p>If Bottom1 is stream side, the result will be {@code [(C, <a1, c1>), 
(D, <b2, d2>)]}.
+     *
+     * <p>If there are no join keys on one binary table {@code i}, the result 
will contain {@code i}
+     * with empty list.
+     */
+    private static List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, 
IntPair[]>>
+            splitJoinKeysOfDifferentBinaryTablesOnLookupSide(
+                    IntPair[] joinKeysForStreamSide2LookupSide,
+                    DeltaJoinAssociation joinAssociationInLookupSide,
+                    FlinkTypeFactory typeFactory) {
+        SplitJoinKeyVisitor visitor =
+                new SplitJoinKeyVisitor(typeFactory, 
joinKeysForStreamSide2LookupSide);
+
+        visitor.visit(joinAssociationInLookupSide.getJoinTree());
+        LinkedHashMap<Integer, IntPair[]> splitResult = visitor.result;
+        Preconditions.checkState(
+                splitResult.size() == 
joinAssociationInLookupSide.getBinaryInputCount());
+        List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, IntPair[]>> result = 
new ArrayList<>();
+        for (Map.Entry<Integer, IntPair[]> inputOrdWithJoinKey : 
splitResult.entrySet()) {
+            int inputOrd = inputOrdWithJoinKey.getKey();
+            IntPair[] joinKey = inputOrdWithJoinKey.getValue();
+            result.add(
+                    
Tuple2.of(joinAssociationInLookupSide.getBinaryInputInfo(inputOrd), joinKey));
+        }
+        return result;
+    }
+
+    private static Tuple2<LookupBinaryInputInfo, DeltaJoinSpec>
+            pickAnyBinaryTableOnLookupSideToLookup(
+                    JoinSpec topJoinSpec,
+                    DeltaJoinAssociation joinAssociationInLookupSide,
+                    List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, 
IntPair[]>>
+                            joinKeyOfDifferentBinaryTablesOnLookupSide,
+                    String joinKeyErrorMessage) {
+        LookupBinaryInputInfo pickedBinaryInputInfo =
+                pickAnyBinaryTableOnLookupSideToLookup(
+                        joinKeyOfDifferentBinaryTablesOnLookupSide, 
joinKeyErrorMessage);
+
+        Map<Integer, LookupJoinUtil.FunctionParam> 
lookupKeysOnThisLookupBinaryInput =
+                pickedBinaryInputInfo.lookupKeysOnThisBinaryInput;
+        DeltaJoinAssociation.BinaryInputInfo pickedBinaryInput =
+                pickedBinaryInputInfo.binaryInputInfo;
+
+        // begin to build lookup chain for bottom1 to lookup C or D in bottom2
+        int lookupCount = joinAssociationInLookupSide.getBinaryInputCount();
+        DeltaJoinSpec deltaJoinSpec =
+                buildDeltaJoinSpecForStreamSide2PickedLookupBinaryInput(
+                        topJoinSpec,
+                        pickedBinaryInput,
+                        lookupKeysOnThisLookupBinaryInput,
+                        lookupCount);
+
+        return Tuple2.of(pickedBinaryInputInfo, deltaJoinSpec);
+    }
+
+    /** Pick any binary input as lookup input to do first lookup. */
+    private static LookupBinaryInputInfo 
pickAnyBinaryTableOnLookupSideToLookup(
+            List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, IntPair[]>>
+                    joinKeyOfDifferentLookupBinaryTables,
+            String joinKeyErrorMessage) {
+        // select all binary tables on bottom2 that can be looked up
+        // for example if [c1] is an index on C and [d2] is an index on D, the 
result will be:
+        // [<C, <c1, a1>>, <D, <d2, b2>>]
+        List<Tuple2<Integer, IntPair[]>> pickedBinaryTablesToLookup =
+                new ArrayList<>(
+                        pickBinaryTablesThatCanLookup(
+                                joinKeyOfDifferentLookupBinaryTables, 
joinKeyErrorMessage));
+        Preconditions.checkState(!pickedBinaryTablesToLookup.isEmpty());
+
+        // pick the first (leftest) binary input to lookup
+        // TODO consider query hint specified by user
+        Tuple2<Integer, IntPair[]> pickedBinaryTable = 
pickedBinaryTablesToLookup.get(0);
+
+        DeltaJoinAssociation.BinaryInputInfo pickedBinaryInputInfo =
+                
joinKeyOfDifferentLookupBinaryTables.get(pickedBinaryTable.f0).f0;
+        Map<Integer, LookupJoinUtil.FunctionParam> 
lookupKeysOnThisLookupBinaryInput =
+                analyzerDeltaJoinLookupKeys(pickedBinaryTable.f1);
+
+        return LookupBinaryInputInfo.of(
+                pickedBinaryTable.f0, pickedBinaryInputInfo, 
lookupKeysOnThisLookupBinaryInput);
+    }
+
+    /**
+     * Pick the tables that can be looked up by the given join keys.
+     *
+     * <p>If the table can be picked, that means the join keys contain one of 
its indexes.
+     *
+     * @return the f0 of the list element is the picked table's idx, the f1 of 
the list element is
+     *     its lookup keys.
+     */
+    private static List<Tuple2<Integer, IntPair[]>> 
pickBinaryTablesThatCanLookup(
+            List<Tuple2<DeltaJoinAssociation.BinaryInputInfo, IntPair[]>>
+                    joinKeyOnDifferentBinaryInputs,
+            String joinKeyErrorMsg) {
+        List<Tuple2<Integer, IntPair[]>> result = new ArrayList<>();
+
+        for (int i = 0; i < joinKeyOnDifferentBinaryInputs.size(); i++) {
+            DeltaJoinAssociation.BinaryInputInfo binaryInput =
+                    joinKeyOnDifferentBinaryInputs.get(i).f0;
+            if (joinKeyOnDifferentBinaryInputs.get(i).f1.length == 0) {
+                continue;
+            }
+            IntPair[] joinKeys = joinKeyOnDifferentBinaryInputs.get(i).f1;
+
+            if (isTableScanSupported(binaryInput.tableScan, 
getTargetOrdinals(joinKeys))) {
+                result.add(Tuple2.of(i, joinKeys));
+            }
+        }
+
+        if (!result.isEmpty()) {
+            return result;
+        }
+
+        // should not happen because we have validated before
+        List<TableSourceTable> allTables =
+                joinKeyOnDifferentBinaryInputs.stream()
+                        .map(t -> t.f0.tableScan.tableSourceTable())
+                        .collect(Collectors.toList());
+
+        String errorMsg =
+                String.format(
+                        "The join key [%s] does not include all primary keys 
nor "
+                                + "all fields from an index of any table on 
the other side.\n"
+                                + "All indexes about tables on the other side 
are:\n\n%s",
+                        joinKeyErrorMsg, 
allTableIndexDetailMessageToString(allTables));
+        throw new TableException(
+                "This is a bug and should not happen. Please file an issue. 
The detail message is:\n"
+                        + errorMsg);
+    }
+
+    /**
+     * Build the delta join spec for stream side to picked lookup binary input.
+     *
+     * <p>Take the following join tree as example. Each leaf table has columns 
named with its
+     * lowercase letter and a number, e.g., A(a0, a1), B(b0, b1, b2), C(c0, 
c1), D(d0, d1, d2).
+     *
+     * <pre>{@code
+     *                       Top
+     *              (a1 = c1 and b2 = d2)
+     *              /                      \
+     *        Bottom1                     Bottom2
+     *      (a0 = b0)                    (c0 = d0)
+     *    /         \                    /      \
+     * A(a0,a1)  B(b0,b1,b2)        C(c0,c1)  D(d0,d1,d2)
+     *
+     * }</pre>
+     *
+     * <p>If Bottom1 is stream side, and choose to lookup C first. Then this 
function is used to
+     * build the delta join spec for Bottom1 to lookup C.
+     */
+    private static DeltaJoinSpec 
buildDeltaJoinSpecForStreamSide2PickedLookupBinaryInput(
+            JoinSpec topJoinSpec,
+            DeltaJoinAssociation.BinaryInputInfo pickedLookupBinaryInput,
+            Map<Integer, LookupJoinUtil.FunctionParam> 
lookupKeysOnPickedLookupBinaryInput,
+            int totalLookupCount) {
+
+        // TODO:
+        //  1. split remaining join condition into the pre-filter and 
remaining parts
+        //  2. supported Constant functionParam
+        Optional<RexNode> nonEquivCondition = 
topJoinSpec.getNonEquiCondition();
+
+        // ignore non-equiv conditions for cascaded lookup here and do final 
filter
+        // in operator later
+        // TODO split the non-equiv deterministic conditions on each inputs by
+        //  RelOptUtil.classifyFilters
+        Optional<RexNode> remainingCondition =

Review Comment:
   Is there any check or guarantee elsewhere for the conditions being ignored 
here?
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DeltaJoinTree.java:
##########
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.spec;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.combineOutputRowType;
+import static 
org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.splitProjectionAndFilter;
+
+/**
+ * A delta join tree used to describe the relationships among one or more 
joins in the input.
+ *
+ * <p>Each node in the tree will have two types:
+ *
+ * <ol>
+ *   <li>{@link BinaryInputNode}: the leaf node of the tree, which represents 
that it is a source
+ *       for delta join.
+ *   <li>{@link JoinNode}: the non-leaf node of the tree, which represents a 
join between two
+ *       inputs.
+ * </ol>
+ *
+ * <p>Take the following sql pattern as an example:
+ *
+ * <pre>{@code
+ *              DeltaJoin
+ *           /            \
+ *       Calc3             \
+ *        /                 \
+ *   DeltaJoin           DeltaJoin
+ *     /    \             /     \
+ *  Calc1    \          /      Calc2
+ *   /        \       /           \
+ * #0 A     #1 B    #2 C          #3 D
+ * }</pre>
+ *
+ * <p>The tree converted from the above sql pattern is:
+ *
+ * <pre>{@code
+ *                     Join
+ *              /                 \
+ *   Join with Calc3              Join
+ *     /             \        /            \
+ * #0 with Calc1     #1     #2           #3 with Calc2
+ * }</pre>
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DeltaJoinTree {
+
+    public static final String FIELD_NAME_ROOT = "root";
+
+    @JsonProperty(FIELD_NAME_ROOT)
+    public final Node root;
+
+    @JsonCreator
+    public DeltaJoinTree(@JsonProperty(FIELD_NAME_ROOT) Node root) {
+        this(root, true);
+    }
+
+    /**
+     * Construct a delta join tree.
+     *
+     * @param root the root node of the delta join tree
+     * @param shouldValidate whether the delta join tree should be validated
+     */
+    private DeltaJoinTree(Node root, boolean shouldValidate) {
+        this.root = root;
+
+        if (shouldValidate) {
+            List<Integer> allInputOrdinals = getAllInputOrdinals(root);
+            Preconditions.checkArgument(
+                    allInputOrdinals.equals(
+                            IntStream.range(0, allInputOrdinals.size())
+                                    .boxed()
+                                    .collect(Collectors.toList())));
+        }
+    }
+
+    /** Shift all input ordinals in {@link BinaryInputNode} in the join with 
the given shift. */
+    public DeltaJoinTree shiftInputIndex(int shift) {
+        return new DeltaJoinTree(shiftInputIndexInternal(root, shift), false);
+    }
+
+    private Node shiftInputIndexInternal(Node node, int shift) {
+        if (node instanceof BinaryInputNode) {
+            BinaryInputNode binaryInputNode = (BinaryInputNode) node;
+            return new BinaryInputNode(
+                    binaryInputNode.inputOrdinal + shift, 
binaryInputNode.rowType, node.rexProgram);
+        }
+        JoinNode joinNode = (JoinNode) node;
+        Node newLeft = shiftInputIndexInternal(joinNode.left, shift);
+        Node newRight = shiftInputIndexInternal(joinNode.right, shift);
+        return new JoinNode(
+                joinNode.joinType,
+                joinNode.condition,
+                joinNode.leftJoinKey,
+                joinNode.rightJoinKey,
+                newLeft,
+                newRight,
+                node.rexProgram);
+    }
+
+    /**
+     * Get the output row type of the delta join tree on the given input 
ordinals.
+     *
+     * <p>In the {@link BinaryInputNode}, we are concerned with its {@link
+     * BinaryInputNode#inputOrdinal}. In the {@link JoinNode}, we focus on the 
{@link
+     * BinaryInputNode#inputOrdinal} of all the {@link BinaryInputNode} within 
its input.
+     *
+     * <p>Take the following delta join tree as an example:
+     *
+     * <pre>{@code
+     *                      Join
+     *              /                 \
+     *  Join with Calc3               Join
+     *     /             \        /            \
+     * #0 with Calc1     #1     #2           #3 with Calc2
+     * }</pre>
+     *
+     * <p>When {@code caresInputOrdinals = [0, 1]} is given, the output row 
type is the row type of
+     * Calc3.
+     */
+    public RowType getOutputRowTypeOnNode(int[] caresInputOrdinals, 
FlinkTypeFactory typeFactory) {
+        Preconditions.checkArgument(caresInputOrdinals.length > 0);
+        return getOutputTypeOnNodeInternal(
+                
Arrays.stream(caresInputOrdinals).boxed().collect(Collectors.toSet()),
+                root,
+                typeFactory);
+    }
+
+    private RowType getOutputTypeOnNodeInternal(
+            Set<Integer> caresInputOrdinals, Node node, FlinkTypeFactory 
typeFactory) {
+        Set<Integer> allInputOrdinalsInThisSubTree = 
node.getAllInputOrdinals();
+        
Preconditions.checkArgument(allInputOrdinalsInThisSubTree.containsAll(caresInputOrdinals));
+        if (allInputOrdinalsInThisSubTree.equals(caresInputOrdinals)) {
+            return node.getRowTypeAfterCalc(typeFactory);
+        }
+
+        Preconditions.checkArgument(node instanceof JoinNode);
+        JoinNode joinNode = (JoinNode) node;
+        if 
(joinNode.left.getAllInputOrdinals().containsAll(caresInputOrdinals)) {
+            return getOutputTypeOnNodeInternal(caresInputOrdinals, 
joinNode.left, typeFactory);
+        }
+        Preconditions.checkArgument(
+                
joinNode.right.getAllInputOrdinals().containsAll(caresInputOrdinals));
+        return getOutputTypeOnNodeInternal(caresInputOrdinals, joinNode.right, 
typeFactory);
+    }
+
+    private static List<Integer> getAllInputOrdinals(Node node) {
+        List<Integer> collector = new ArrayList<>();
+        collectAllInputOrdinals(node, collector);
+        return collector;
+    }
+
+    private static void collectAllInputOrdinals(Node node, List<Integer> 
collector) {
+        if (node instanceof BinaryInputNode) {
+            collector.add(((BinaryInputNode) node).inputOrdinal);
+            return;
+        }
+        JoinNode joinNode = (JoinNode) node;
+        collectAllInputOrdinals(joinNode.left, collector);
+        collectAllInputOrdinals(joinNode.right, collector);
+    }
+
+    /**
+     * An abstract node for {@link BinaryInputNode} and {@link JoinNode}.
+     *
+     * <p>The {@link #projection} and {@link #filter} represents a calc on 
this {@link Node}. If
+     * they are null, that means there is no calc on this {@link Node}.
+     *
+     * <p>The {@link #rowTypeAfterCalc} represents the row type after the 
calc. If it is null, that
+     * means there is no calc on this {@link Node}.
+     */
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "type")
+    @JsonSubTypes({
+        @JsonSubTypes.Type(value = BinaryInputNode.class),
+        @JsonSubTypes.Type(value = JoinNode.class),
+    })
+    public abstract static class Node {
+
+        public static final String FIELD_NAME_PROJECTION = "projection";
+        public static final String FIELD_NAME_FILTER = "filter";
+        public static final String FIELD_NAME_ROW_TYPE_AFTER_CALC = 
"rowTypeAfterCalc";
+
+        @JsonProperty(FIELD_NAME_PROJECTION)
+        @Nullable
+        public final List<RexNode> projection;
+
+        @JsonProperty(FIELD_NAME_FILTER)
+        @Nullable
+        public final RexNode filter;
+
+        @JsonProperty(FIELD_NAME_ROW_TYPE_AFTER_CALC)
+        @Nullable
+        public final RowType rowTypeAfterCalc;
+
+        @JsonIgnore @Nullable public final RexProgram rexProgram;
+
+        private Node(@Nullable RexProgram rexProgram) {
+            this.rexProgram = rexProgram;
+            Tuple2<Optional<List<RexNode>>, Optional<RexNode>> 
projectAndFilter =
+                    splitProjectionAndFilter(rexProgram);
+            this.projection = projectAndFilter.f0.orElse(null);
+            this.filter = projectAndFilter.f1.orElse(null);
+            if (this.projection != null) {
+                Preconditions.checkArgument(rexProgram != null);
+                rowTypeAfterCalc = 
FlinkTypeFactory.toLogicalRowType(rexProgram.getOutputRowType());
+            } else {
+                rowTypeAfterCalc = null;
+            }
+        }
+
+        /** A construct used for restoring. */
+        private Node(
+                @Nullable List<RexNode> projection,
+                @Nullable RexNode filter,
+                @Nullable RowType rowTypeAfterCalc) {
+            this.rexProgram = null;
+            this.projection = projection;
+            this.filter = filter;
+            this.rowTypeAfterCalc = rowTypeAfterCalc;
+        }
+
+        @JsonIgnore
+        public abstract RowType getRowTypeBeforeCalc(FlinkTypeFactory 
typeFactory);
+
+        @JsonIgnore
+        public RowType getRowTypeAfterCalc(FlinkTypeFactory typeFactory) {
+            if (projection == null) {
+                return getRowTypeBeforeCalc(typeFactory);
+            }
+            Preconditions.checkState(null != rowTypeAfterCalc);
+            return rowTypeAfterCalc;
+        }
+
+        @Nullable
+        @JsonIgnore
+        public List<RexNode> getProjection() {
+            return projection;
+        }
+
+        @Nullable
+        @JsonIgnore
+        public RexNode getFilter() {
+            return filter;
+        }
+
+        @JsonIgnore
+        public abstract Set<Integer> getAllInputOrdinals();
+    }
+
+    /**
+     * A leaf {@link Node} in this tree. It represents a source used for delta 
join to scan and
+     * lookup.
+     *
+     * <p>The {@link #inputOrdinal} is the index of the source in the {@link 
DeltaJoinAssociation}.
+     */
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    @JsonTypeName("BinaryInputNode")
+    public static class BinaryInputNode extends Node {
+        public static final String FIELD_NAME_INPUT_ORDINAL = "inputOrdinal";
+        public static final String FIELD_NAME_ROW_TYPE = "rowType";
+
+        @JsonProperty(FIELD_NAME_INPUT_ORDINAL)
+        public final int inputOrdinal;
+
+        @JsonProperty(FIELD_NAME_ROW_TYPE)
+        public final RowType rowType;
+
+        public BinaryInputNode(
+                int inputOrdinal, RowType rowTypeBeforeCalc, @Nullable 
RexProgram rexProgram) {
+            super(rexProgram);
+            this.inputOrdinal = inputOrdinal;
+            this.rowType = rowTypeBeforeCalc;
+        }
+
+        @JsonCreator
+        public BinaryInputNode(
+                @JsonProperty(FIELD_NAME_INPUT_ORDINAL) int inputOrdinal,
+                @JsonProperty(FIELD_NAME_PROJECTION) @Nullable List<RexNode> 
projection,
+                @JsonProperty(FIELD_NAME_FILTER) @Nullable RexNode filter,
+                @JsonProperty(FIELD_NAME_ROW_TYPE_AFTER_CALC) @Nullable 
RowType rowTypeAfterCalc,
+                @JsonProperty(FIELD_NAME_ROW_TYPE) RowType rowType) {
+            super(projection, filter, rowTypeAfterCalc);
+            this.inputOrdinal = inputOrdinal;
+            this.rowType = rowType;
+        }
+
+        @Override
+        @JsonIgnore
+        public RowType getRowTypeBeforeCalc(FlinkTypeFactory typeFactory) {
+            return rowType;
+        }
+
+        @Override
+        @JsonIgnore
+        public Set<Integer> getAllInputOrdinals() {
+            return Collections.singleton(inputOrdinal);
+        }
+    }
+
+    /** A {@link Node} in the tree representing a join operation between two 
inputs. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    @JsonTypeName("JoinNode")
+    public static class JoinNode extends Node {
+        public static final String FIELD_NAME_JOIN_TYPE = "joinType";
+        public static final String FIELD_NAME_CONDITION = "condition";
+        public static final String FIELD_NAME_LEFT_JOIN_KEY = "leftJoinKey";
+        public static final String FIELD_NAME_RIGHT_JOIN_KEY = "rightJoinKey";
+        public static final String FIELD_NAME_LEFT = "left";
+        public static final String FIELD_NAME_RIGHT = "right";
+
+        @JsonProperty(FIELD_NAME_JOIN_TYPE)
+        public final FlinkJoinType joinType;
+
+        @JsonProperty(FIELD_NAME_CONDITION)
+        public final RexNode condition;
+
+        @JsonProperty(FIELD_NAME_LEFT_JOIN_KEY)
+        public final int[] leftJoinKey;
+
+        @JsonProperty(FIELD_NAME_RIGHT_JOIN_KEY)
+        public final int[] rightJoinKey;
+
+        @JsonProperty(FIELD_NAME_LEFT)
+        public final Node left;
+
+        @JsonProperty(FIELD_NAME_RIGHT)
+        public final Node right;
+
+        public JoinNode(
+                FlinkJoinType joinType,
+                RexNode condition,
+                int[] leftJoinKey,
+                int[] rightJoinKey,
+                Node left,
+                Node right,
+                @Nullable RexProgram rexProgram) {
+            super(rexProgram);
+            this.joinType = joinType;

Review Comment:
   We'd better also add a joinType check here because there's no assurance that 
a compiled plan is immutable.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala:
##########
@@ -1101,6 +1131,570 @@ class DeltaJoinTest extends TableTestBase {
     util.verifyRelPlan(stmt)
   }
 
+  @Test
+  def testLHS1(): Unit = {
+    //       DT
+    //     /    \
+    //    DT     C
+    //  /    \
+    // A      B
+    // when records from C come, lookup chain is:
+    // C -> A -> B
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from tmp_ab
+        |    join no_delete_src3 C
+        |     on a1 = c1 and a2 = c2 and c1 <> cast(a0 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testLHS2(): Unit = {
+    //       DT
+    //     /    \
+    //    DT     C
+    //  /    \
+    // A      B
+    // when records from C come, lookup chain is:
+    // C -> B -> A
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from tmp_ab
+        |    join no_delete_src3 C
+        |     on b1 = c1 and b2 = c2 and c1 <> cast(b2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testMultiLHS1(): Unit = {
+    //          DT
+    //        /    \
+    //       DT     D
+    //     /    \
+    //    DT     C
+    //  /    \
+    // A      B
+    // when records from C come, lookup chain is:
+    // C -> B -> A
+    // when records from D come, lookup chain is:
+    // D -> B -> A -> C
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+    tEnv.executeSql("""
+                      |create temporary view tmp_abc as
+                      |  select C.*, tmp_ab.*
+                      |  from tmp_ab
+                      |    join no_delete_src3 C
+                      |     on b1 = c1 and b2 = c2 and c1 <> cast(b2 as double)
+                      |""".stripMargin)
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from tmp_abc
+        |    join no_delete_src4 D
+        |     on b1 = d1 and b2 = d2 and d1 <> cast(b2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testMultiLHS2(): Unit = {
+    //          DT
+    //        /    \
+    //       DT     D
+    //     /    \
+    //    DT     C
+    //  /    \
+    // A      B
+    // when records from C come, lookup chain is:
+    // C -> B -> A
+    // when records from D come, lookup chain is:
+    // D -> A -> B -> C
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+    tEnv.executeSql("""
+                      |create temporary view tmp_abc as
+                      |  select C.*, tmp_ab.*
+                      |  from tmp_ab
+                      |    join no_delete_src3 C
+                      |     on b1 = c1 and b2 = c2 and c1 <> cast(b2 as double)
+                      |""".stripMargin)
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from tmp_abc
+        |    join no_delete_src4 D
+        |     on a1 = d1 and a2 = d2 and d1 <> cast(a2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testMultiLHS3(): Unit = {
+    //          DT
+    //        /    \
+    //       DT     D
+    //     /    \
+    //    DT     C
+    //  /    \
+    // A      B
+    // when records from C come, lookup chain is:
+    // C -> A -> B
+    // when records from D come, lookup chain is:
+    // D -> C -> A -> B
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+    tEnv.executeSql("""
+                      |create temporary view tmp_abc as
+                      |  select C.*, tmp_ab.*
+                      |  from tmp_ab
+                      |    join no_delete_src3 C
+                      |     on a1 = c1 and a2 = c2 and c1 <> cast(a2 as double)
+                      |""".stripMargin)
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, c0, c2, c1 from tmp_abc
+        |    join no_delete_src4 D
+        |     on c1 = d1 and c2 = d2 and d1 <> cast(c2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testRHS1(): Unit = {
+    //       DT
+    //     /    \
+    //    C      DT
+    //         /    \
+    //        A      B
+    // when records from C come, lookup chain is:
+    // C -> A -> B
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from no_delete_src3 C
+        |    join tmp_ab
+        |     on a1 = c1 and a2 = c2 and c1 <> cast(a0 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testRHS2(): Unit = {
+    //       DT
+    //     /    \
+    //    C      DT
+    //         /    \
+    //        A      B
+    // when records from C come, lookup chain is:
+    // C -> B -> A
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from no_delete_src3 C
+        |    join tmp_ab
+        |     on b1 = c1 and b2 = c2 and c1 <> cast(b2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testMultiRHS1(): Unit = {
+    //      DT
+    //    /    \
+    //   D     DT
+    //       /    \
+    //      C     DT
+    //          /    \
+    //         A      B
+    // when records from C come, lookup chain is:
+    // C -> A -> B
+    // when records from D come, lookup chain is:
+    // D -> B -> A -> C
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+    tEnv.executeSql("""
+                      |create temporary view tmp_abc as
+                      |  select C.*, tmp_ab.*
+                      |  from tmp_ab
+                      |    join no_delete_src3 C
+                      |     on a1 = c1 and a2 = c2 and c1 <> cast(a2 as double)
+                      |""".stripMargin)
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from no_delete_src4 D
+        |    join tmp_abc
+        |     on b1 = d1 and b2 = d2 and d1 <> cast(b2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testMultiRHS2(): Unit = {
+    //      DT
+    //    /    \
+    //   D     DT
+    //       /    \
+    //      C     DT
+    //          /    \
+    //         A      B
+    // when records from C come, lookup chain is:
+    // C -> A -> B
+    // when records from D come, lookup chain is:
+    // D -> A -> B -> C
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+    tEnv.executeSql("""
+                      |create temporary view tmp_abc as
+                      |  select C.*, tmp_ab.*
+                      |  from tmp_ab
+                      |    join no_delete_src3 C
+                      |     on a1 = c1 and a2 = c2 and c1 <> cast(a2 as double)
+                      |""".stripMargin)
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from no_delete_src4 D
+        |    join tmp_abc
+        |     on a1 = d1 and a2 = d2 and d1 <> cast(a2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testMultiRHS3(): Unit = {
+    //      DT
+    //    /    \
+    //   D     DT
+    //       /    \
+    //      C     DT
+    //          /    \
+    //         A      B
+    // when records from C come, lookup chain is:
+    // C -> A -> B
+    // when records from D come, lookup chain is:
+    // D -> C -> A -> B
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+    tEnv.executeSql("""
+                      |create temporary view tmp_cab as
+                      |  select C.*, tmp_ab.*
+                      |  from no_delete_src3 C
+                      |    join tmp_ab
+                      |     on a1 = c1 and a2 = c2 and c1 <> cast(a2 as double)
+                      |""".stripMargin)
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, c0, c2, c1 from no_delete_src4 D
+        |    join tmp_cab
+        |     on c1 = d1 and c2 = d2 and d1 <> cast(c2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testBushy1(): Unit = {
+    //        DT-3
+    //      /      \
+    //   DT-1      DT-2
+    //  /    \    /    \
+    // A      B  C      D
+    // when records from DT-1 come, lookup chain is:
+    // DT-1 -> C -> D
+    // when records from DT-2 come, lookup chain is:
+    // DT-2 -> B -> A
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+
+    tEnv.executeSql("""
+                      |create temporary view tmp_cd as
+                      |  select D.*, C.*
+                      |  from no_delete_src3 C
+                      |    join no_delete_src4 D
+                      |     on c1 = d1 and c2 = d2 and c1 <> cast(d2 as double)
+                      |""".stripMargin)
+
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from tmp_ab
+        |    join tmp_cd
+        |     on b1 = c1 and b2 = c2 and c1 <> cast(b2 as double)
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testBushy2(): Unit = {
+    //        DT-3
+    //      /      \
+    //   DT-1      DT-2
+    //  /    \    /    \
+    // A      B  C      D
+    // when records from DT-1 come, lookup chain is:
+    // DT-1 -> D -> C
+    // when records from DT-2 come, lookup chain is:
+    // DT-2 -> A -> B
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select B.*, A.*
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+
+    tEnv.executeSql("""
+                      |create temporary view tmp_cd as
+                      |  select D.*, C.*
+                      |  from no_delete_src3 C
+                      |    join no_delete_src4 D
+                      |     on c1 = d1 and c2 = d2 and c1 <> cast(d2 as double)
+                      |""".stripMargin)
+
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1 from tmp_ab
+        |    join tmp_cd
+        |     on a1 = d1 and a2 = d2
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testCalcBetweenDeltaJoin(): Unit = {
+    //        DT-3
+    //      /      \
+    //   DT-1      DT-2
+    //  /    \    /    \
+    // A      B  C      D
+    // when records from DT-1 come, lookup chain is:
+    // DT-1 -> D -> C
+    // when records from DT-2 come, lookup chain is:
+    // DT-2 -> A -> B
+    tEnv.executeSql("""
+                      |create temporary view tmp_ab as
+                      |  select b0 + 1 as b0, b1, b2, A.*, a3 + b0 as mix_ab
+                      |  from no_delete_src1 A
+                      |    join no_delete_src2 B
+                      |     on a1 = b1 and a2 = b2 and a0 <> cast(b1 as int)
+                      |""".stripMargin)
+
+    tEnv.executeSql("""
+                      |create temporary view tmp_cd as
+                      |  select D.*, c0, c1 - 1.0 as c1, c2, c1 + d1 as mix_cd
+                      |  from no_delete_src3 C
+                      |    join no_delete_src4 D
+                      |     on c1 = d1 and c2 = d2 and c1 <> cast(d2 as double)
+                      |""".stripMargin)
+
+    tEnv.executeSql("""
+                      |alter table snk_for_cdc_src add (mix_ab_cd int)
+                      |""".stripMargin)
+    util.verifyRelPlanInsert(
+      """
+        |insert into snk_for_cdc_src
+        |  select a0, a1, a2, a3, b0, b2, b1, mix_ab + cast(mix_cd as int) 
from tmp_ab
+        |    join tmp_cd
+        |     on a1 = d1 and a2 = d2
+        |""".stripMargin
+    )
+  }
+
+  @Test
+  def testTopJoinCouldNotBeConvertedIntoDeltaJoin(): Unit = {

Review Comment:
   Can we add outer jonis that can't be converted as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to