SteveStevenpoor commented on code in PR #26313:
URL: https://github.com/apache/flink/pull/26313#discussion_r2094106304


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java:
##########
@@ -0,0 +1,830 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.MultiJoinCondition;
+import 
org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.MultiJoinStateView;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.MultiJoinStateViews;
+import 
org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Streaming multi-way join operator which supports inner join and left outer 
join, right joins are
+ * transformed into left joins by the optimizer. It only supports a 
combination of joins that joins
+ * on at least one common column due to partitioning. It eliminates the 
intermediate state necessary
+ * for a chain of multiple binary joins. In other words, it reduces the total 
amount of state
+ * necessary for chained joins. As of time complexity, it performs better for 
the worst binary joins
+ * cases, where the number of records in the intermediate state is large. 
Binary joins perform
+ * better if they are optimally ordered, updates come mostly for the table on 
the right and the
+ * query uses primary keys (the intermediate state for a specific join key is 
small).
+ *
+ * <p>Performs the multi-way join logic recursively. This method drives the 
join process by
+ * traversing through the input streams (represented by `depth`) and their 
corresponding states. It
+ * attempts to find matching combinations of rows across all inputs based on 
the defined join
+ * conditions.
+ *
+ * <p><b>Core Idea:</b> The method explores a conceptual "join tree". Each 
level (`depth`)
+ * corresponds to an input stream. At each level, it iterates through the 
records stored in the
+ * state for that input. For each state record, it tentatively adds it to the 
`currentRows` array
+ * and, if the relevant join condition passes ({@link #matchesCondition(int, 
RowData[])}),
+ * recursively calls itself to process the next level (`depth + 1`). When the 
recursion reaches the
+ * level corresponding to the triggering input record ({@link 
#isInputLevel(int, int)}), it
+ * incorporates the `input` record itself into `currentRows` (again, subject 
to condition checks).
+ * Finally, when the maximum depth is reached ({@link #isMaxDepth(int)}), it 
evaluates the final,
+ * overall `multiJoinCondition` on the fully assembled `currentRows`.
+ *
+ * <p><b>Two-Phase Execution (`JoinPhase`):</b> The recursion operates in two 
distinct phases,
+ * crucial for correctly handling LEFT joins:
+ *
+ * <ol>
+ *   <li><b>{@link JoinPhase#CALCULATE_MATCHES}:</b> This initial phase 
traverses the state
+ *       <i>without</i> the actual input record (unless {@link 
#isInputLevel(int, int)} is true,
+ *       where it switches phases). Its primary purpose is to calculate the 
`associations` counts
+ *       for LEFT joins. This determines if rows from the "left" side found 
any matches on their
+ *       respective "right" sides based on the {@link #joinConditions}. No 
results are emitted in
+ *       this phase.
+ *   <li><b>{@link JoinPhase#EMIT_RESULTS}:</b> This phase is triggered when 
the recursion reaches
+ *       the level of the input record (`depth == inputId`) or continues from 
there. It incorporates
+ *       the actual `input` record and proceeds with the recursion. When the 
base case (checked via
+ *       {@link #isMaxDepth(int)}) is reached, it evaluates the join 
conditions and emits the
+ *       resulting joined row via the {@link #collector}.
+ * </ol>
+ *
+ * <p><b>LEFT Join Specifics:</b> LEFT joins require special handling to 
ensure rows from the left
+ * side are emitted even if they have no matching rows on the right side.
+ *
+ * <ul>
+ *   <li><b>Condition Checks:</b>
+ *       <ul>
+ *         <li>At each step `d > 0`, the specific {@code joinConditions[d]} is 
evaluated using the
+ *             rows accumulated so far (up to `currentRows[d]`). If this 
condition fails for a
+ *             combination (from state or the input record), that recursive 
path is pruned via
+ *             {@link #matchesCondition(int, RowData[])}.
+ *         <li>At the maximum depth (base case), the final {@code 
multiJoinCondition} is evaluated
+ *             on the complete `currentRows` array to determine if the overall 
joined row is valid.
+ *       </ul>
+ *   <li><b>Association Tracking ({@code associations} array):</b> {@code 
associations[d-1]} counts
+ *       how many records from subsequent inputs (depth `d` onwards) have 
matched the current row at
+ *       {@code currentRows[d-1]} based on the outer join conditions. This 
count is primarily
+ *       updated during the {@code CALCULATE_MATCHES} phase.
+ *   <li><b>Null Padding:</b> If, after processing all state records for a 
LEFT join's right side
+ *       (depth `d`), no matches were found (`!matched`) AND the corresponding 
left row also had no
+ *       associations ({@link #hasNoAssociations(int, int[])}), it indicates 
the left row needs to
+ *       be padded with nulls for the right side. This triggers {@link 
#processWithNullPadding(int,
+ *       RowData, int, RowData[], int[], JoinPhase)}, which places a null row 
at `currentRows[d]`
+ *       and continues the recursion.
+ *   <li><b>Input Record Handling (Upserts/Retractions):</b> When processing 
the actual `input`
+ *       record at its native depth (`inputId`) in a LEFT join scenario:
+ *       <ul>
+ *         <li>If the input is an INSERT/UPDATE_AFTER and its preceding 
left-side row had no matches
+ *             found during the `CALCULATE_MATCHES` phase (checked via {@link
+ *             #hasNoAssociations(int, int[])}), a retraction (`DELETE`) may 
be emitted first for
+ *             any previously padded result ({@link 
#handleRetractBeforeInput}).
+ *         <li>If the input is a DELETE/UPDATE_BEFORE and its preceding 
left-side row had no
+ *             matches, an insertion (`INSERT`) may be emitted for the new 
padded result (this also
+ *             implicitly checks via {@link #hasNoAssociations(int, int[])} in 
the corresponding
+ *             `if` condition in `processInputRecord`), ({@link 
#handleInsertAfterInput}).
+ *       </ul>
+ * </ul>
+ *
+ * <p><b>Base Case (Maximum Depth):</b> When {@link #isMaxDepth(int)} is true, 
all potential
+ * contributing rows are in `currentRows`.
+ *
+ * <ul>
+ *   <li>The final {@code multiJoinCondition} is evaluated on the complete 
`currentRows` array.
+ *   <li>If the conditions pass and the phase is {@code EMIT_RESULTS}, the 
combined row is
+ *       constructed and emitted using {@link #emitRow(RowKind, RowData[])}.
+ * </ul>
+ *
+ * <hr>
+ *
+ * <h3>Example Walkthrough (A LEFT JOIN B INNER JOIN C)</h3>
+ *
+ * <p>Inputs: A(idx=0), B(idx=1), C(idx=2)
+ *
+ * <p>Join: {@code A LEFT JOIN B ON A.id = B.id INNER JOIN C ON B.id = C.id}
+ *
+ * <p>Conditions:
+ *
+ * <ul>
+ *   <li>{@code joinConditions[1]}: {@code A.id == B.id} (LEFT JOIN condition)
+ *   <li>{@code joinConditions[2]}: {@code B.id == C.id} (INNER JOIN condition)
+ *   <li>{@code multiJoinCondition}: {@code (A.id == B.id) && (B.id == C.id)} 
(Overall condition)
+ * </ul>
+ *
+ * <p>Initial State:
+ *
+ * <ul>
+ *   <li>StateA: {@code { a1(1, 100) }}
+ *   <li>StateB: {@code { }}
+ *   <li>StateC: {@code { c1(50, 501), c2(60, 601) }}
+ * </ul>
+ *
+ * <p><b>=== Event 1: Input +b1(1, 50) arrives at Input B (inputId=1) ===</b>
+ *
+ * <pre><code>
+ * Output: +I[a1(1,100), b1(1,50), c1(50,501)].
+ * No INSERT for null padding emitted due to inner join with C. If this was
+ * A LEFT JOIN B LEFT JOIN C instead of an inner join, we'd also retract this 
first -D[a1(1,100), NULL, NULL]).
+ *
+ * [Depth][currentRows]
+ * [Depth 0][_, _, _] Initial Call: recursiveMultiJoin(0, +b1, 1, [_,_,_], 
[0,0,0], CALCULATE_MATCHES)
+ * [Depth 0][_, _, _] Phase: CALCULATE_MATCHES
+ * [Depth 0][_, _, _]  Process StateA: { a1 }
+ * [Depth 0][_, _, _]   Record a1:
+ * [Depth 0][a1, _, _]     currentRows = [a1, _, _]
+ * [Depth 0][a1, _, _]     isLeftJoin(0): false
+ * [Depth 0][a1, _, _]     Recurse:
+ * [Depth 1][a1, _, _]       Call: recursiveMultiJoin(1, +b1, 1, [a1,_,_], 
[0,0,0], CALCULATE_MATCHES)
+ *
+ * [Depth 1][a1, _, _]       Phase: CALCULATE_MATCHES
+ * [Depth 1][a1, _, _]       isLeftJoin(1): true (A LEFT B)
+ * [Depth 1][a1, _, _]        Process StateB: {} -> Empty. 'matched' = false.
+ * [Depth 1][a1, _, _] NULL_PAD? Check Null Padding: isLeftJoin(1) && !matched 
&& hasNoAssociations(1, [0,0,0]) -> true
+ * [Depth 1][a1, _, _] DO_NULL_PAD Call processWithNullPadding(1, +b1, 1, 
[a1,_,_], [0,0,0], CALCULATE_MATCHES)
+ * [Depth 1][a1, nullB, _]     Set currentRows = [a1, nullB, _]
+ * [Depth 1][a1, nullB, _]     Recurse to next depth:
+ * [Depth 2][a1, nullB, _]       Call: recursiveMultiJoin(2, +b1, 1, 
[a1,nullB,_], [0,0,0], CALCULATE_MATCHES)
+ *
+ * [Depth 2][a1, nullB, _]       Phase: CALCULATE_MATCHES
+ * [Depth 2][a1, nullB, _]       isLeftJoin(2): false
+ * [Depth 2][a1, nullB, _]        Process StateC: { c1, c2 }
+ * [Depth 2][a1, nullB, c1]        Record c1: currentRows = [a1, nullB, c1]. 
Check matchesCondition(2, [a1,nullB,c1]) -> fails (nullB.id != c1.id). Continue 
loop.
+ * [Depth 2][a1, nullB, c2]        Record c2: currentRows = [a1, nullB, c2]. 
Check matchesCondition(2, [a1,nullB,c2]) -> fails (nullB.id != c2.id). Continue 
loop.
+ * [Depth 2][a1, nullB, _]       StateC loop finishes. 'matched' = false.
+ * [Depth 2][a1, nullB, _]       Return false.
+ * [Depth 1][a1, _, _]         Return from processWithNullPadding: false. 
(Restores currentRows[1] to _ implicitly)
+ * [Depth 1][a1, _, _]       'matched' from null padding is false.
+ * [Depth 1][a1, _, _] INPUT_LVL? isInputLevel(1, 1): true -> Process the 
input record +b1 itself.
+ * [Depth 1][a1, _, _] PROC_INPUT Call processInputRecord(1, +b1, 1, [a1,_,_], 
[0,0,0], false) -------> *** PHASE SWITCHES TO EMIT_RESULTS ***
+ * [Depth 1][a1, _, _]           isLeftJoin(1): true
+ * [Depth 1][a1, _, _] RETRACT?    Check Retract: isUpsert(+b1) && 
isLeftJoin(1) && !matched -> true && true && true -> true
+ * [Depth 1][a1, _, _] DO_RETRACT  Call handleRetractBeforeInput(1, +b1, 1, 
[a1,_,_], [0,0,0])
+ * [Depth 1][a1, nullB, _]         Set currentRows = [a1, nullB, _]
+ * [Depth 1][a1, nullB, _]         input becomes temp -b1_temp
+ * [Depth 1][a1, nullB, _]         Recurse:
+ * [Depth 2][a1, nullB, _]           Call: recursiveMultiJoin(2, -b1_temp, 1, 
[a1,nullB,_], [0,0,0], EMIT_RESULTS)
+ * [Depth 2][a1, nullB, _]           Phase: EMIT_RESULTS
+ * [Depth 2][a1, nullB, _]            Process StateC: { c1, c2 }
+ * [Depth 2][a1, nullB, c1]            Record c1: currentRows = [a1, nullB, 
c1]. Check matchesCondition(2, [a1,nullB,c1]) -> fails (nullB). Continue.
+ * [Depth 2][a1, nullB, c2]            Record c2: currentRows = [a1, nullB, 
c2]. Check matchesCondition(2, [a1,nullB,c2]) -> fails (nullB). Continue.
+ * [Depth 2][a1, nullB, _]           StateC loop returns false.
+ * [Depth 2][a1, nullB, _]           Return false.
+ * [Depth 1][a1, nullB, _]         handleRetractBeforeInput returns nothing. 
*** EMIT NOTHING, inner join does not match ***
+ * [Depth 1][a1, +b1, _]         Restore input to +b1. Set currentRows = [a1, 
+b1, _].
+ * [Depth 1][a1, +b1, _]         Check matchesCondition(1, [a1,+b1]) (a1.id == 
b1.id -> 1==1) -> true.
+ * [Depth 1][a1, +b1, _] ASSOC_UPD   Update Associations: 
updateAssociationCount(1, associations, EMIT_RESULTS, +b1) -> 
associations[0]++. associations = [1, 0, 0].
+ * [Depth 1][a1, +b1, _]         Recurse:
+ * [Depth 2][a1, +b1, _]           Call: recursiveMultiJoin(2, +b1, 1, 
[a1,+b1,_], [1,0,0], EMIT_RESULTS)
+ *
+ * [Depth 2][a1, +b1, _]           Phase: EMIT_RESULTS
+ * [Depth 2][a1, +b1, _]           isLeftJoin(2): false
+ * [Depth 2][a1, +b1, _]            Process StateC: { c1, c2 }
+ * [Depth 2][a1, +b1, c1]            Record c1: currentRows = [a1, +b1, c1]. 
Check matchesCondition(2, [a1,+b1,c1]) (b1.id == c1.id -> 50==50) -> true. 
Recurse:
+ * [Depth 3][a1, +b1, c1]              Call: recursiveMultiJoin(3, +b1, 1, 
[a1,+b1,c1], [1,0,0], EMIT_RESULTS)
+ * [Depth 3][a1, +b1, c1]              Phase: EMIT_RESULTS
+ * [Depth 3][a1, +b1, c1]              isMaxDepth(3): true
+ * [Depth 3][a1, +b1, c1]              Evaluate 
multiJoinCondition([a1,+b1,c1]): (a1.id==b1.id && b1.id==c1.id) -> (1==1 && 
50==50) -> true.
+ * [Depth 3][a1, +b1, c1] *** EMIT ***  emitRow(INSERT, [a1, b1, c1]) // *** 
EMIT OUTPUT: +I[a1(1,100), b1(1,50), c1(50,501)] ***
+ * [Depth 3][a1, +b1, c1]              Return true.
+ * [Depth 2][a1, +b1, c2]            Record c2: currentRows = [a1, +b1, c2]. 
Check matchesCondition(2, [a1,+b1,c2]) (b1.id == c2.id -> 50==60) -> false. 
Continue loop.
+ * [Depth 2][a1, +b1, _]           StateC loop returns true ('matched' = true 
because c1 matched).
+ * [Depth 2][a1, +b1, _]           Return true.
+ * [Depth 1][a1, +b1, _]         Return from processInputRecord: true.
+ * [Depth 1][a1, +b1, _] INSERT?     Check Insert: isRetraction(+b1) is false. 
Skip handleInsertAfterInput.
+ * [Depth 1][a1, +b1, _]         Return true.
+ * [Depth 1][a1, _, _]     Return from Depth 1: true. (Restores currentRows[1] 
to _ implicitly)
+ * [Depth 0][a1, _, _]   Return from Depth 0: true.
+ * [Depth 0][_, _, _] End StateA loop. Return true. (Restores currentRows[0] 
to _ implicitly)
+ *
+ * --- End Event 1 ---
+ * Add record to StateB: +b1(1, 50) -> StateB becomes { b1(1, 50) }.
+ * StateB is now { b1(1, 50) }.
+ * Output: +I[a1(1,100), b1(1,50), c1(50,501)].
+ * No INSERT for null padding emitted due to inner join with C.
+ * If this was A LEFT JOIN B LEFT JOIN C instead of a inner join, we'd have 
retracted this first -D[a1(1,100), NULL, NULL].
+ * Note: The example shows detailed recursive calls. `recursiveMultiJoin` 
calls might return intermediate boolean `matched` values used internally, but 
the final output is the key outcome.
+ * </code></pre>
+ *
+ * <p><b>=== Event 2: Input delete -b1(1, 50) arrives at Input B (inputId=1) 
===</b> State
+ *
+ * <pre><code>
+ * Before: StateB = { b1(1, 50) }
+ * Output: -D[a1, b1, c1].
+ * No INSERT for null padding emitted due to inner join with C.
+ * If the query was A LEFT JOIN B LEFT JOIN C, we'd also emit a null padded 
row -I[a1(1,100), NULL, NULL].
+ *
+ * [Depth 0][_, _, _] Initial Call: recursiveMultiJoin(0, -b1, 1, [_,_,_], 
[0,0,0], CALCULATE_MATCHES)
+ * [Depth 0][_, _, _] Phase: CALCULATE_MATCHES
+ * [Depth 0][_, _, _]  Process StateA: { a1 }
+ * [Depth 0][_, _, _]   Record a1:
+ * [Depth 0][a1, _, _]     currentRows = [a1, _, _]
+ * [Depth 0][a1, _, _]     Recurse:
+ * [Depth 1][a1, _, _]       Call: recursiveMultiJoin(1, -b1, 1, [a1,_,_], 
[0,0,0], CALCULATE_MATCHES)
+ *
+ * [Depth 1][a1, _, _]       Phase: CALCULATE_MATCHES
+ * [Depth 1][a1, _, _]       isLeftJoin(1): true
+ * [Depth 1][a1, _, _]        Process StateB: { b1 } // State contains b1 from 
Event 1
+ * [Depth 1][a1, b1, _]        Record b1: currentRows = [a1, b1, _]
+ * [Depth 1][a1, b1, _]        Check matchesCondition(1, [a1, b1]) -> (a1.id 
== b1.id -> 1==1) -> true. Match found.
+ * [Depth 1][a1, b1, _] ASSOC_UPD     Update Associations: 
updateAssociationCount(1, associations, CALCULATE_MATCHES, +b1) -> 
associations[0]++. associations = [1, 0, 0].
+ * [Depth 1][a1, b1, _]          associations[1] = 0 // Reset for next level
+ * [Depth 1][a1, b1, _]          Recurse:
+ * [Depth 2][a1, b1, _]            Call: recursiveMultiJoin(2, -b1, 1, [a1, 
b1, _], [1, 0, 0], CALCULATE_MATCHES)
+ * [Depth 2][a1, b1, _]            Phase: CALCULATE_MATCHES
+ * [Depth 2][a1, b1, _]            isLeftJoin(2): false
+ * [Depth 2][a1, b1, _]             Process StateC: { c1, c2 }
+ * [Depth 2][a1, b1, c1]              Record c1: currentRows = [a1, b1, c1]. 
Check matchesCondition(2, [a1,b1,c1]) -> (50==50) -> true. Recurse:
+ * [Depth 3][a1, b1, c1]                Call: recursiveMultiJoin(3, -b1, 1, 
[a1,b1,c1], [1,0,0], CALCULATE_MATCHES)
+ * [Depth 3][a1, b1, c1]                Phase: CALCULATE_MATCHES
+ * [Depth 3][a1, b1, c1]                isMaxDepth(3): true. Evaluate 
multiJoinCondition([a1,b1,c1]) -> (1==1 && 50==50) -> true. Return true.
+ * [Depth 2][a1, b1, c2]              Record c2: currentRows = [a1, b1, c2]. 
Check matchesCondition(2, [a1,b1,c2]) -> (50==60) -> false. Continue loop.
+ * [Depth 2][a1, b1, _]            StateC loop returns true (c1 matched).
+ * [Depth 2][a1, b1, _]            Return true.
+ * [Depth 1][a1, b1, _]        StateB loop finishes. matched = true.
+ * [Depth 1][a1, b1, _] NULL_PAD?    Check Null Padding: isLeftJoin(1) && 
!matched -> false. Skip null padding.
+ * [Depth 1][a1, b1, _] INPUT_LVL?   isInputLevel(1, 1): true -> Process input 
record -b1.
+ * [Depth 1][a1, _, _] PROC_INPUT   Call processInputRecord(1, -b1, 1, 
[a1,_,_], [1,0,0], true) -- PHASE SWITCHES TO EMIT_RESULTS
+ * [Depth 1][a1, _, _]            isLeftJoin(1): true
+ * [Depth 1][a1, _, _] RETRACT?     Check Retract: isUpsert(-b1) is false. 
Skip handleRetractBeforeInput.
+ * [Depth 1][a1, -b1, _]         Set currentRows = [a1, -b1, _].
+ * [Depth 1][a1, -b1, _]         Check matchesCondition(1, [a1,-b1]) (a1.id == 
b1.id -> 1==1) -> true. Match found.
+ * [Depth 1][a1, -b1, _] ASSOC_UPD    Update Associations: 
updateAssociationCount(1, associations, EMIT_RESULTS, -b1) -> 
associations[0]--. associations = [0, 0, 0].
+ * [Depth 1][a1, -b1, _]         Recurse:
+ * [Depth 2][a1, -b1, _]           Call: recursiveMultiJoin(2, -b1, 1, [a1, 
-b1, _], [0, 0, 0], EMIT_RESULTS)
+ * [Depth 2][a1, -b1, _]           Phase: EMIT_RESULTS
+ * [Depth 2][a1, -b1, _]            Process StateC: { c1, c2 }
+ * [Depth 2][a1, -b1, c1]            Record c1: currentRows = [a1, -b1, c1]. 
Check matchesCondition(2, [a1,-b1,c1]) -> (b1.id==c1.id -> 50==50) -> true. 
Recurse:
+ * [Depth 3][a1, -b1, c1]              Call: recursiveMultiJoin(3, -b1, 1, 
[a1, -b1, c1], [0, 0, 0], EMIT_RESULTS)
+ * [Depth 3][a1, -b1, c1]              Phase: EMIT_RESULTS
+ * [Depth 3][a1, -b1, c1]              isMaxDepth(3): true. Evaluate 
multiJoinCondition([a1,-b1,c1]) -> (1==1 && 50==50) -> true.
+ * [Depth 3][a1, -b1, c1] *** EMIT *** emitRow(DELETE, [a1, b1, c1]) // *** 
EMIT OUTPUT: -D[a1(1,100), b1(1,50), c1(50,501)] ***
+ * [Depth 3][a1, -b1, c1]              Return true.
+ * [Depth 2][a1, -b1, c2]            Record c2: currentRows = [a1, -b1, c2]. 
Check matchesCondition(2, [a1,-b1,c2]) -> (b1.id==c2.id -> 50==60) -> false. 
Continue loop.
+ * [Depth 2][a1, -b1, _]           StateC loop returns true (c1 matched).
+ * [Depth 2][a1, -b1, _]           Return true. matched_input = true.
+ * [Depth 1][a1, -b1, _] INSERT?      Check Insert: isRetraction(-b1) && 
isLeftJoin(1) && hasNoAssociations(1, [0,0,0]) -> true && true && true. -> true
+ * [Depth 1][a1, -b1, _] DO_INSERT    Call handleInsertAfterInput(1, -b1, 1, 
[a1,-b1,_], [0,0,0]) -- EMIT NULL PADDING INSERT?
+ * [Depth 1][a1, -b1, _]             // Attempts to emit the padded row [a1, 
nullB, ...] combined with state from C
+ * [Depth 1][a1, nullB, _]           currentRows = [a1, nullB, _]
+ * [Depth 1][a1, nullB, _]           input becomes temp +b1_temp (Kind.INSERT)
+ * [Depth 1][a1, nullB, _]           Recurse:
+ * [Depth 2][a1, nullB, _]             Call: recursiveMultiJoin(2, +b1_temp, 
1, [a1, nullB, _], [0, 0, 0], EMIT_RESULTS)
+ * [Depth 2][a1, nullB, _]             Phase: EMIT_RESULTS
+ * [Depth 2][a1, nullB, _]             isLeftJoinAtDepth(2) is false (B INNER 
JOIN C).
+ * [Depth 2][a1, nullB, _]              Process StateC: { c1, c2 }
+ * [Depth 2][a1, nullB, c1]              Record c1: currentRows = [a1, nullB, 
c1]. Check matchesCondition(2, [a1,nullB,c1]) fails (nullB). Continue.
+ * [Depth 2][a1, nullB, c2]              Record c2: currentRows = [a1, nullB, 
c2]. Check matchesCondition(2, [a1,nullB,c2]) fails (nullB). Continue.
+ * [Depth 2][a1, nullB, _]             NULL_PAD? isLeftJoin && !matched && 
hasNoAssociations(depth, associations) -> not left join, false.
+ * [Depth 2][a1, nullB, _]             INPUT_LVL? isInputLevel(depth, inputId) 
-> false
+ * [Depth 2][a1, nullB, _]             *** EMIT NOTHING since the outer inner 
join does not match. ***
+ * [Depth 2][a1, nullB, _]             StateC loop returns false.
+ * [Depth 2][a1, nullB, _]             No call to processWithNullPadding as 
isLeftJoinAtDepth(2) is false.
+ * [Depth 2][a1, nullB, _]             Return false.
+ * [Depth 1][a1, nullB, _]           No row emitted because multiJoinCondition 
failed for all combinations with StateC.
+ * [Depth 1][a1, -b1, _]           handleInsertAfterInput restores input kind 
(-b1), returns false. (Restores currentRows[1])
+ * [Depth 1][a1, -b1, _]         processInputRecord returns true (because 
matched_input was true before handleInsertAfterInput).
+ * [Depth 1][a1, _, _]       Return from Depth 1: true. (Restores 
currentRows[1] implicitly)
+ * [Depth 0][a1, _, _]   Return from Depth 0: true.
+ * [Depth 0][_, _, _] End StateA loop. Return true. (Restores currentRows[0] 
to _ implicitly)
+ *
+ * --- End Event 2 ---
+ * Add record to StateB: -b1(1, 50) -> StateB becomes {}.
+ * Output: -D[a1, b1, c1].
+ * No INSERT for null padding emitted due to inner join with C.
+ * </code></pre>
+ */
+public class StreamingMultiJoinOperator extends 
AbstractStreamOperatorV2<RowData>
+        implements MultipleInputStreamOperator<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    /** List of supported join types. */
+    public enum JoinType {
+        INNER,
+        LEFT
+    }
+
+    private final List<JoinInputSideSpec> inputSpecs;
+    private final List<JoinType> joinTypes;
+    private final List<InternalTypeInfo<RowData>> inputTypes;
+    private final MultiJoinCondition multiJoinCondition;
+    private final long[] stateRetentionTime;
+    private final List<Input<RowData>> typedInputs;
+    private final MultiJoinCondition[] joinConditions;
+    private final JoinKeyExtractor keyExtractor;
+
+    private transient List<MultiJoinStateView> stateHandlers;
+    private transient TimestampedCollector<RowData> collector;
+    private transient List<RowData> nullRows;
+
+    /** Represents the different phases of the join process. */
+    private enum JoinPhase {
+        /** Phase where we calculate match counts (associations) without 
emitting results. */
+        CALCULATE_MATCHES,
+        /** Phase where we emit the actual join results. */
+        EMIT_RESULTS
+    }
+
+    public StreamingMultiJoinOperator(
+            StreamOperatorParameters<RowData> parameters,
+            List<InternalTypeInfo<RowData>> inputTypes,
+            List<JoinInputSideSpec> inputSpecs,
+            List<JoinType> joinTypes,
+            MultiJoinCondition multiJoinCondition,
+            long[] stateRetentionTime,
+            MultiJoinCondition[] joinConditions,
+            JoinKeyExtractor keyExtractor) {
+        super(parameters, inputSpecs.size());
+        this.inputTypes = inputTypes;
+        this.inputSpecs = inputSpecs;
+        this.joinTypes = joinTypes;
+        this.multiJoinCondition = multiJoinCondition;
+        this.stateRetentionTime = stateRetentionTime;
+        this.joinConditions = joinConditions;
+        this.keyExtractor = keyExtractor;
+        this.typedInputs = new ArrayList<>(inputSpecs.size());
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeCollector();
+        initializeNullRows();
+        initializeStateHandlers();
+    }
+
+    @Override
+    public void close() throws Exception {
+        closeConditions();
+        super.close();
+    }
+
+    public void processElement(int inputId, StreamRecord<RowData> element) 
throws Exception {
+        RowData input = element.getValue();
+        if (input == null) {
+            return;
+        }
+
+        performMultiJoin(input, inputId);
+        addRecordToState(input, inputId);
+    }
+
+    private void performMultiJoin(RowData input, int inputId) throws Exception 
{
+        int[] associations = createInitialAssociations();
+        RowData[] currentRows = new RowData[inputSpecs.size()];
+
+        recursiveMultiJoin(
+                0, input, inputId, currentRows, associations, 
JoinPhase.CALCULATE_MATCHES);
+    }
+
+    /**
+     * See {@link StreamingMultiJoinOperator} for a detailed explanation of 
the recursive join and
+     * examples.
+     *
+     * @param depth The current depth of the recursion, representing the input 
stream index (0 to

Review Comment:
   Thanks for pointing me the section. I agree with you that a depth search 
first across join inputs does align with a recursive structure.
   
   That said, I’ve actually implemented a similar multi-join operator with an 
iterative approach in our internal fork, and it worked fine in performance and 
the logic remained quite readable. Also there is a possibility that in the 
future we will want to change traversal order. In my mind it would be more 
simple with maps.
   
   I totally understand that this a matter of preference, but I just wanted to 
share my experience in case it can help. If you're interested in revisiting 
this direction, I can help you.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to