gustavodemorais commented on code in PR #26313: URL: https://github.com/apache/flink/pull/26313#discussion_r2096216903
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java: ########## @@ -0,0 +1,830 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.stream; + +import org.apache.flink.streaming.api.operators.AbstractInput; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.MultiJoinCondition; +import org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor; +import org.apache.flink.table.runtime.operators.join.stream.state.MultiJoinStateView; +import org.apache.flink.table.runtime.operators.join.stream.state.MultiJoinStateViews; +import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.types.RowKind; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Streaming multi-way join operator which supports inner join and left outer join, right joins are + * transformed into left joins by the optimizer. It only supports a combination of joins that joins + * on at least one common column due to partitioning. It eliminates the intermediate state necessary + * for a chain of multiple binary joins. In other words, it reduces the total amount of state + * necessary for chained joins. As of time complexity, it performs better for the worst binary joins + * cases, where the number of records in the intermediate state is large. Binary joins perform + * better if they are optimally ordered, updates come mostly for the table on the right and the + * query uses primary keys (the intermediate state for a specific join key is small). + * + * <p>Performs the multi-way join logic recursively. This method drives the join process by + * traversing through the input streams (represented by `depth`) and their corresponding states. It + * attempts to find matching combinations of rows across all inputs based on the defined join + * conditions. + * + * <p><b>Core Idea:</b> The method explores a conceptual "join tree". Each level (`depth`) + * corresponds to an input stream. At each level, it iterates through the records stored in the + * state for that input. For each state record, it tentatively adds it to the `currentRows` array + * and, if the relevant join condition passes ({@link #matchesCondition(int, RowData[])}), + * recursively calls itself to process the next level (`depth + 1`). When the recursion reaches the + * level corresponding to the triggering input record ({@link #isInputLevel(int, int)}), it + * incorporates the `input` record itself into `currentRows` (again, subject to condition checks). + * Finally, when the maximum depth is reached ({@link #isMaxDepth(int)}), it evaluates the final, + * overall `multiJoinCondition` on the fully assembled `currentRows`. + * + * <p><b>Two-Phase Execution (`JoinPhase`):</b> The recursion operates in two distinct phases, + * crucial for correctly handling LEFT joins: + * + * <ol> + * <li><b>{@link JoinPhase#CALCULATE_MATCHES}:</b> This initial phase traverses the state + * <i>without</i> the actual input record (unless {@link #isInputLevel(int, int)} is true, + * where it switches phases). Its primary purpose is to calculate the `associations` counts + * for LEFT joins. This determines if rows from the "left" side found any matches on their + * respective "right" sides based on the {@link #joinConditions}. No results are emitted in + * this phase. + * <li><b>{@link JoinPhase#EMIT_RESULTS}:</b> This phase is triggered when the recursion reaches + * the level of the input record (`depth == inputId`) or continues from there. It incorporates + * the actual `input` record and proceeds with the recursion. When the base case (checked via + * {@link #isMaxDepth(int)}) is reached, it evaluates the join conditions and emits the + * resulting joined row via the {@link #collector}. + * </ol> + * + * <p><b>LEFT Join Specifics:</b> LEFT joins require special handling to ensure rows from the left + * side are emitted even if they have no matching rows on the right side. + * + * <ul> + * <li><b>Condition Checks:</b> + * <ul> + * <li>At each step `d > 0`, the specific {@code joinConditions[d]} is evaluated using the + * rows accumulated so far (up to `currentRows[d]`). If this condition fails for a + * combination (from state or the input record), that recursive path is pruned via + * {@link #matchesCondition(int, RowData[])}. + * <li>At the maximum depth (base case), the final {@code multiJoinCondition} is evaluated + * on the complete `currentRows` array to determine if the overall joined row is valid. + * </ul> + * <li><b>Association Tracking ({@code associations} array):</b> {@code associations[d-1]} counts + * how many records from subsequent inputs (depth `d` onwards) have matched the current row at + * {@code currentRows[d-1]} based on the outer join conditions. This count is primarily + * updated during the {@code CALCULATE_MATCHES} phase. + * <li><b>Null Padding:</b> If, after processing all state records for a LEFT join's right side + * (depth `d`), no matches were found (`!matched`) AND the corresponding left row also had no + * associations ({@link #hasNoAssociations(int, int[])}), it indicates the left row needs to + * be padded with nulls for the right side. This triggers {@link #processWithNullPadding(int, + * RowData, int, RowData[], int[], JoinPhase)}, which places a null row at `currentRows[d]` + * and continues the recursion. + * <li><b>Input Record Handling (Upserts/Retractions):</b> When processing the actual `input` + * record at its native depth (`inputId`) in a LEFT join scenario: + * <ul> + * <li>If the input is an INSERT/UPDATE_AFTER and its preceding left-side row had no matches + * found during the `CALCULATE_MATCHES` phase (checked via {@link + * #hasNoAssociations(int, int[])}), a retraction (`DELETE`) may be emitted first for + * any previously padded result ({@link #handleRetractBeforeInput}). + * <li>If the input is a DELETE/UPDATE_BEFORE and its preceding left-side row had no + * matches, an insertion (`INSERT`) may be emitted for the new padded result (this also + * implicitly checks via {@link #hasNoAssociations(int, int[])} in the corresponding + * `if` condition in `processInputRecord`), ({@link #handleInsertAfterInput}). + * </ul> + * </ul> + * + * <p><b>Base Case (Maximum Depth):</b> When {@link #isMaxDepth(int)} is true, all potential + * contributing rows are in `currentRows`. + * + * <ul> + * <li>The final {@code multiJoinCondition} is evaluated on the complete `currentRows` array. + * <li>If the conditions pass and the phase is {@code EMIT_RESULTS}, the combined row is + * constructed and emitted using {@link #emitRow(RowKind, RowData[])}. + * </ul> + * + * <hr> + * + * <h3>Example Walkthrough (A LEFT JOIN B INNER JOIN C)</h3> + * + * <p>Inputs: A(idx=0), B(idx=1), C(idx=2) + * + * <p>Join: {@code A LEFT JOIN B ON A.id = B.id INNER JOIN C ON B.id = C.id} + * + * <p>Conditions: + * + * <ul> + * <li>{@code joinConditions[1]}: {@code A.id == B.id} (LEFT JOIN condition) + * <li>{@code joinConditions[2]}: {@code B.id == C.id} (INNER JOIN condition) + * <li>{@code multiJoinCondition}: {@code (A.id == B.id) && (B.id == C.id)} (Overall condition) + * </ul> + * + * <p>Initial State: + * + * <ul> + * <li>StateA: {@code { a1(1, 100) }} + * <li>StateB: {@code { }} + * <li>StateC: {@code { c1(50, 501), c2(60, 601) }} + * </ul> + * + * <p><b>=== Event 1: Input +b1(1, 50) arrives at Input B (inputId=1) ===</b> + * + * <pre><code> + * Output: +I[a1(1,100), b1(1,50), c1(50,501)]. + * No INSERT for null padding emitted due to inner join with C. If this was + * A LEFT JOIN B LEFT JOIN C instead of an inner join, we'd also retract this first -D[a1(1,100), NULL, NULL]). + * + * [Depth][currentRows] + * [Depth 0][_, _, _] Initial Call: recursiveMultiJoin(0, +b1, 1, [_,_,_], [0,0,0], CALCULATE_MATCHES) + * [Depth 0][_, _, _] Phase: CALCULATE_MATCHES + * [Depth 0][_, _, _] Process StateA: { a1 } + * [Depth 0][_, _, _] Record a1: + * [Depth 0][a1, _, _] currentRows = [a1, _, _] + * [Depth 0][a1, _, _] isLeftJoin(0): false + * [Depth 0][a1, _, _] Recurse: + * [Depth 1][a1, _, _] Call: recursiveMultiJoin(1, +b1, 1, [a1,_,_], [0,0,0], CALCULATE_MATCHES) + * + * [Depth 1][a1, _, _] Phase: CALCULATE_MATCHES + * [Depth 1][a1, _, _] isLeftJoin(1): true (A LEFT B) + * [Depth 1][a1, _, _] Process StateB: {} -> Empty. 'matched' = false. + * [Depth 1][a1, _, _] NULL_PAD? Check Null Padding: isLeftJoin(1) && !matched && hasNoAssociations(1, [0,0,0]) -> true + * [Depth 1][a1, _, _] DO_NULL_PAD Call processWithNullPadding(1, +b1, 1, [a1,_,_], [0,0,0], CALCULATE_MATCHES) + * [Depth 1][a1, nullB, _] Set currentRows = [a1, nullB, _] + * [Depth 1][a1, nullB, _] Recurse to next depth: + * [Depth 2][a1, nullB, _] Call: recursiveMultiJoin(2, +b1, 1, [a1,nullB,_], [0,0,0], CALCULATE_MATCHES) + * + * [Depth 2][a1, nullB, _] Phase: CALCULATE_MATCHES + * [Depth 2][a1, nullB, _] isLeftJoin(2): false + * [Depth 2][a1, nullB, _] Process StateC: { c1, c2 } + * [Depth 2][a1, nullB, c1] Record c1: currentRows = [a1, nullB, c1]. Check matchesCondition(2, [a1,nullB,c1]) -> fails (nullB.id != c1.id). Continue loop. + * [Depth 2][a1, nullB, c2] Record c2: currentRows = [a1, nullB, c2]. Check matchesCondition(2, [a1,nullB,c2]) -> fails (nullB.id != c2.id). Continue loop. + * [Depth 2][a1, nullB, _] StateC loop finishes. 'matched' = false. + * [Depth 2][a1, nullB, _] Return false. + * [Depth 1][a1, _, _] Return from processWithNullPadding: false. (Restores currentRows[1] to _ implicitly) + * [Depth 1][a1, _, _] 'matched' from null padding is false. + * [Depth 1][a1, _, _] INPUT_LVL? isInputLevel(1, 1): true -> Process the input record +b1 itself. + * [Depth 1][a1, _, _] PROC_INPUT Call processInputRecord(1, +b1, 1, [a1,_,_], [0,0,0], false) -------> *** PHASE SWITCHES TO EMIT_RESULTS *** + * [Depth 1][a1, _, _] isLeftJoin(1): true + * [Depth 1][a1, _, _] RETRACT? Check Retract: isUpsert(+b1) && isLeftJoin(1) && !matched -> true && true && true -> true + * [Depth 1][a1, _, _] DO_RETRACT Call handleRetractBeforeInput(1, +b1, 1, [a1,_,_], [0,0,0]) + * [Depth 1][a1, nullB, _] Set currentRows = [a1, nullB, _] + * [Depth 1][a1, nullB, _] input becomes temp -b1_temp + * [Depth 1][a1, nullB, _] Recurse: + * [Depth 2][a1, nullB, _] Call: recursiveMultiJoin(2, -b1_temp, 1, [a1,nullB,_], [0,0,0], EMIT_RESULTS) + * [Depth 2][a1, nullB, _] Phase: EMIT_RESULTS + * [Depth 2][a1, nullB, _] Process StateC: { c1, c2 } + * [Depth 2][a1, nullB, c1] Record c1: currentRows = [a1, nullB, c1]. Check matchesCondition(2, [a1,nullB,c1]) -> fails (nullB). Continue. + * [Depth 2][a1, nullB, c2] Record c2: currentRows = [a1, nullB, c2]. Check matchesCondition(2, [a1,nullB,c2]) -> fails (nullB). Continue. + * [Depth 2][a1, nullB, _] StateC loop returns false. + * [Depth 2][a1, nullB, _] Return false. + * [Depth 1][a1, nullB, _] handleRetractBeforeInput returns nothing. *** EMIT NOTHING, inner join does not match *** + * [Depth 1][a1, +b1, _] Restore input to +b1. Set currentRows = [a1, +b1, _]. + * [Depth 1][a1, +b1, _] Check matchesCondition(1, [a1,+b1]) (a1.id == b1.id -> 1==1) -> true. + * [Depth 1][a1, +b1, _] ASSOC_UPD Update Associations: updateAssociationCount(1, associations, EMIT_RESULTS, +b1) -> associations[0]++. associations = [1, 0, 0]. + * [Depth 1][a1, +b1, _] Recurse: + * [Depth 2][a1, +b1, _] Call: recursiveMultiJoin(2, +b1, 1, [a1,+b1,_], [1,0,0], EMIT_RESULTS) + * + * [Depth 2][a1, +b1, _] Phase: EMIT_RESULTS + * [Depth 2][a1, +b1, _] isLeftJoin(2): false + * [Depth 2][a1, +b1, _] Process StateC: { c1, c2 } + * [Depth 2][a1, +b1, c1] Record c1: currentRows = [a1, +b1, c1]. Check matchesCondition(2, [a1,+b1,c1]) (b1.id == c1.id -> 50==50) -> true. Recurse: + * [Depth 3][a1, +b1, c1] Call: recursiveMultiJoin(3, +b1, 1, [a1,+b1,c1], [1,0,0], EMIT_RESULTS) + * [Depth 3][a1, +b1, c1] Phase: EMIT_RESULTS + * [Depth 3][a1, +b1, c1] isMaxDepth(3): true + * [Depth 3][a1, +b1, c1] Evaluate multiJoinCondition([a1,+b1,c1]): (a1.id==b1.id && b1.id==c1.id) -> (1==1 && 50==50) -> true. + * [Depth 3][a1, +b1, c1] *** EMIT *** emitRow(INSERT, [a1, b1, c1]) // *** EMIT OUTPUT: +I[a1(1,100), b1(1,50), c1(50,501)] *** + * [Depth 3][a1, +b1, c1] Return true. + * [Depth 2][a1, +b1, c2] Record c2: currentRows = [a1, +b1, c2]. Check matchesCondition(2, [a1,+b1,c2]) (b1.id == c2.id -> 50==60) -> false. Continue loop. + * [Depth 2][a1, +b1, _] StateC loop returns true ('matched' = true because c1 matched). + * [Depth 2][a1, +b1, _] Return true. + * [Depth 1][a1, +b1, _] Return from processInputRecord: true. + * [Depth 1][a1, +b1, _] INSERT? Check Insert: isRetraction(+b1) is false. Skip handleInsertAfterInput. + * [Depth 1][a1, +b1, _] Return true. + * [Depth 1][a1, _, _] Return from Depth 1: true. (Restores currentRows[1] to _ implicitly) + * [Depth 0][a1, _, _] Return from Depth 0: true. + * [Depth 0][_, _, _] End StateA loop. Return true. (Restores currentRows[0] to _ implicitly) + * + * --- End Event 1 --- + * Add record to StateB: +b1(1, 50) -> StateB becomes { b1(1, 50) }. + * StateB is now { b1(1, 50) }. + * Output: +I[a1(1,100), b1(1,50), c1(50,501)]. + * No INSERT for null padding emitted due to inner join with C. + * If this was A LEFT JOIN B LEFT JOIN C instead of a inner join, we'd have retracted this first -D[a1(1,100), NULL, NULL]. + * Note: The example shows detailed recursive calls. `recursiveMultiJoin` calls might return intermediate boolean `matched` values used internally, but the final output is the key outcome. + * </code></pre> + * + * <p><b>=== Event 2: Input delete -b1(1, 50) arrives at Input B (inputId=1) ===</b> State + * + * <pre><code> + * Before: StateB = { b1(1, 50) } + * Output: -D[a1, b1, c1]. + * No INSERT for null padding emitted due to inner join with C. + * If the query was A LEFT JOIN B LEFT JOIN C, we'd also emit a null padded row -I[a1(1,100), NULL, NULL]. + * + * [Depth 0][_, _, _] Initial Call: recursiveMultiJoin(0, -b1, 1, [_,_,_], [0,0,0], CALCULATE_MATCHES) + * [Depth 0][_, _, _] Phase: CALCULATE_MATCHES + * [Depth 0][_, _, _] Process StateA: { a1 } + * [Depth 0][_, _, _] Record a1: + * [Depth 0][a1, _, _] currentRows = [a1, _, _] + * [Depth 0][a1, _, _] Recurse: + * [Depth 1][a1, _, _] Call: recursiveMultiJoin(1, -b1, 1, [a1,_,_], [0,0,0], CALCULATE_MATCHES) + * + * [Depth 1][a1, _, _] Phase: CALCULATE_MATCHES + * [Depth 1][a1, _, _] isLeftJoin(1): true + * [Depth 1][a1, _, _] Process StateB: { b1 } // State contains b1 from Event 1 + * [Depth 1][a1, b1, _] Record b1: currentRows = [a1, b1, _] + * [Depth 1][a1, b1, _] Check matchesCondition(1, [a1, b1]) -> (a1.id == b1.id -> 1==1) -> true. Match found. + * [Depth 1][a1, b1, _] ASSOC_UPD Update Associations: updateAssociationCount(1, associations, CALCULATE_MATCHES, +b1) -> associations[0]++. associations = [1, 0, 0]. + * [Depth 1][a1, b1, _] associations[1] = 0 // Reset for next level + * [Depth 1][a1, b1, _] Recurse: + * [Depth 2][a1, b1, _] Call: recursiveMultiJoin(2, -b1, 1, [a1, b1, _], [1, 0, 0], CALCULATE_MATCHES) + * [Depth 2][a1, b1, _] Phase: CALCULATE_MATCHES + * [Depth 2][a1, b1, _] isLeftJoin(2): false + * [Depth 2][a1, b1, _] Process StateC: { c1, c2 } + * [Depth 2][a1, b1, c1] Record c1: currentRows = [a1, b1, c1]. Check matchesCondition(2, [a1,b1,c1]) -> (50==50) -> true. Recurse: + * [Depth 3][a1, b1, c1] Call: recursiveMultiJoin(3, -b1, 1, [a1,b1,c1], [1,0,0], CALCULATE_MATCHES) + * [Depth 3][a1, b1, c1] Phase: CALCULATE_MATCHES + * [Depth 3][a1, b1, c1] isMaxDepth(3): true. Evaluate multiJoinCondition([a1,b1,c1]) -> (1==1 && 50==50) -> true. Return true. + * [Depth 2][a1, b1, c2] Record c2: currentRows = [a1, b1, c2]. Check matchesCondition(2, [a1,b1,c2]) -> (50==60) -> false. Continue loop. + * [Depth 2][a1, b1, _] StateC loop returns true (c1 matched). + * [Depth 2][a1, b1, _] Return true. + * [Depth 1][a1, b1, _] StateB loop finishes. matched = true. + * [Depth 1][a1, b1, _] NULL_PAD? Check Null Padding: isLeftJoin(1) && !matched -> false. Skip null padding. + * [Depth 1][a1, b1, _] INPUT_LVL? isInputLevel(1, 1): true -> Process input record -b1. + * [Depth 1][a1, _, _] PROC_INPUT Call processInputRecord(1, -b1, 1, [a1,_,_], [1,0,0], true) -- PHASE SWITCHES TO EMIT_RESULTS + * [Depth 1][a1, _, _] isLeftJoin(1): true + * [Depth 1][a1, _, _] RETRACT? Check Retract: isUpsert(-b1) is false. Skip handleRetractBeforeInput. + * [Depth 1][a1, -b1, _] Set currentRows = [a1, -b1, _]. + * [Depth 1][a1, -b1, _] Check matchesCondition(1, [a1,-b1]) (a1.id == b1.id -> 1==1) -> true. Match found. + * [Depth 1][a1, -b1, _] ASSOC_UPD Update Associations: updateAssociationCount(1, associations, EMIT_RESULTS, -b1) -> associations[0]--. associations = [0, 0, 0]. + * [Depth 1][a1, -b1, _] Recurse: + * [Depth 2][a1, -b1, _] Call: recursiveMultiJoin(2, -b1, 1, [a1, -b1, _], [0, 0, 0], EMIT_RESULTS) + * [Depth 2][a1, -b1, _] Phase: EMIT_RESULTS + * [Depth 2][a1, -b1, _] Process StateC: { c1, c2 } + * [Depth 2][a1, -b1, c1] Record c1: currentRows = [a1, -b1, c1]. Check matchesCondition(2, [a1,-b1,c1]) -> (b1.id==c1.id -> 50==50) -> true. Recurse: + * [Depth 3][a1, -b1, c1] Call: recursiveMultiJoin(3, -b1, 1, [a1, -b1, c1], [0, 0, 0], EMIT_RESULTS) + * [Depth 3][a1, -b1, c1] Phase: EMIT_RESULTS + * [Depth 3][a1, -b1, c1] isMaxDepth(3): true. Evaluate multiJoinCondition([a1,-b1,c1]) -> (1==1 && 50==50) -> true. + * [Depth 3][a1, -b1, c1] *** EMIT *** emitRow(DELETE, [a1, b1, c1]) // *** EMIT OUTPUT: -D[a1(1,100), b1(1,50), c1(50,501)] *** + * [Depth 3][a1, -b1, c1] Return true. + * [Depth 2][a1, -b1, c2] Record c2: currentRows = [a1, -b1, c2]. Check matchesCondition(2, [a1,-b1,c2]) -> (b1.id==c2.id -> 50==60) -> false. Continue loop. + * [Depth 2][a1, -b1, _] StateC loop returns true (c1 matched). + * [Depth 2][a1, -b1, _] Return true. matched_input = true. + * [Depth 1][a1, -b1, _] INSERT? Check Insert: isRetraction(-b1) && isLeftJoin(1) && hasNoAssociations(1, [0,0,0]) -> true && true && true. -> true + * [Depth 1][a1, -b1, _] DO_INSERT Call handleInsertAfterInput(1, -b1, 1, [a1,-b1,_], [0,0,0]) -- EMIT NULL PADDING INSERT? + * [Depth 1][a1, -b1, _] // Attempts to emit the padded row [a1, nullB, ...] combined with state from C + * [Depth 1][a1, nullB, _] currentRows = [a1, nullB, _] + * [Depth 1][a1, nullB, _] input becomes temp +b1_temp (Kind.INSERT) + * [Depth 1][a1, nullB, _] Recurse: + * [Depth 2][a1, nullB, _] Call: recursiveMultiJoin(2, +b1_temp, 1, [a1, nullB, _], [0, 0, 0], EMIT_RESULTS) + * [Depth 2][a1, nullB, _] Phase: EMIT_RESULTS + * [Depth 2][a1, nullB, _] isLeftJoinAtDepth(2) is false (B INNER JOIN C). + * [Depth 2][a1, nullB, _] Process StateC: { c1, c2 } + * [Depth 2][a1, nullB, c1] Record c1: currentRows = [a1, nullB, c1]. Check matchesCondition(2, [a1,nullB,c1]) fails (nullB). Continue. + * [Depth 2][a1, nullB, c2] Record c2: currentRows = [a1, nullB, c2]. Check matchesCondition(2, [a1,nullB,c2]) fails (nullB). Continue. + * [Depth 2][a1, nullB, _] NULL_PAD? isLeftJoin && !matched && hasNoAssociations(depth, associations) -> not left join, false. + * [Depth 2][a1, nullB, _] INPUT_LVL? isInputLevel(depth, inputId) -> false + * [Depth 2][a1, nullB, _] *** EMIT NOTHING since the outer inner join does not match. *** + * [Depth 2][a1, nullB, _] StateC loop returns false. + * [Depth 2][a1, nullB, _] No call to processWithNullPadding as isLeftJoinAtDepth(2) is false. + * [Depth 2][a1, nullB, _] Return false. + * [Depth 1][a1, nullB, _] No row emitted because multiJoinCondition failed for all combinations with StateC. + * [Depth 1][a1, -b1, _] handleInsertAfterInput restores input kind (-b1), returns false. (Restores currentRows[1]) + * [Depth 1][a1, -b1, _] processInputRecord returns true (because matched_input was true before handleInsertAfterInput). + * [Depth 1][a1, _, _] Return from Depth 1: true. (Restores currentRows[1] implicitly) + * [Depth 0][a1, _, _] Return from Depth 0: true. + * [Depth 0][_, _, _] End StateA loop. Return true. (Restores currentRows[0] to _ implicitly) + * + * --- End Event 2 --- + * Add record to StateB: -b1(1, 50) -> StateB becomes {}. + * Output: -D[a1, b1, c1]. + * No INSERT for null padding emitted due to inner join with C. + * </code></pre> + */ +public class StreamingMultiJoinOperator extends AbstractStreamOperatorV2<RowData> + implements MultipleInputStreamOperator<RowData> { + private static final long serialVersionUID = 1L; + + /** List of supported join types. */ + public enum JoinType { + INNER, + LEFT + } + + private final List<JoinInputSideSpec> inputSpecs; + private final List<JoinType> joinTypes; + private final List<InternalTypeInfo<RowData>> inputTypes; + private final MultiJoinCondition multiJoinCondition; + private final long[] stateRetentionTime; + private final List<Input<RowData>> typedInputs; + private final MultiJoinCondition[] joinConditions; + private final JoinKeyExtractor keyExtractor; + + private transient List<MultiJoinStateView> stateHandlers; + private transient TimestampedCollector<RowData> collector; + private transient List<RowData> nullRows; + + /** Represents the different phases of the join process. */ + private enum JoinPhase { + /** Phase where we calculate match counts (associations) without emitting results. */ + CALCULATE_MATCHES, + /** Phase where we emit the actual join results. */ + EMIT_RESULTS + } + + public StreamingMultiJoinOperator( + StreamOperatorParameters<RowData> parameters, + List<InternalTypeInfo<RowData>> inputTypes, + List<JoinInputSideSpec> inputSpecs, + List<JoinType> joinTypes, + MultiJoinCondition multiJoinCondition, + long[] stateRetentionTime, + MultiJoinCondition[] joinConditions, + JoinKeyExtractor keyExtractor) { + super(parameters, inputSpecs.size()); + this.inputTypes = inputTypes; + this.inputSpecs = inputSpecs; + this.joinTypes = joinTypes; + this.multiJoinCondition = multiJoinCondition; + this.stateRetentionTime = stateRetentionTime; + this.joinConditions = joinConditions; + this.keyExtractor = keyExtractor; + this.typedInputs = new ArrayList<>(inputSpecs.size()); + } + + @Override + public void open() throws Exception { + super.open(); + initializeCollector(); + initializeNullRows(); + initializeStateHandlers(); + } + + @Override + public void close() throws Exception { + closeConditions(); + super.close(); + } + + public void processElement(int inputId, StreamRecord<RowData> element) throws Exception { + RowData input = element.getValue(); + if (input == null) { + return; + } + + performMultiJoin(input, inputId); + addRecordToState(input, inputId); + } + + private void performMultiJoin(RowData input, int inputId) throws Exception { + int[] associations = createInitialAssociations(); + RowData[] currentRows = new RowData[inputSpecs.size()]; + + recursiveMultiJoin( + 0, input, inputId, currentRows, associations, JoinPhase.CALCULATE_MATCHES); + } + + /** + * See {@link StreamingMultiJoinOperator} for a detailed explanation of the recursive join and + * examples. + * + * @param depth The current depth of the recursion, representing the input stream index (0 to Review Comment: Hey Steve, thanks for your input. I totally understand where you are coming from. I think my personal preference would be sticking to the recursive approach for now since I don't see any technical limitations + think the code will look cleaner, even though I do believe both are good approaches. We still have a lot of missing tasks to get the multi join to a running state. I think right now, my idea would now be to address the missing pieces. We can always reevaluate this in a second interaction or as part of another ticket. As long as the layout of the operator doesn't change, it shouldn't be a problem changing the implementation later, if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org