dawidwys commented on code in PR #26313:
URL: https://github.com/apache/flink/pull/26313#discussion_r2039702575


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java:
##########
@@ -0,0 +1,474 @@
+package org.apache.flink.table.runtime.operators.join.stream;
+
+// TODO Gustavo Confirm we should create a private custom enum for join types 
instead of using
+// Calcite's JoinRelType
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.MultiJoinCondition;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews;
+import 
org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Streaming multi-way join operator which supports inner join and 
left/right/full outer join. It
+ * eliminates the intermediate state necessary for a chain of multiple binary 
joins. In other words,
+ * it considerable reduces the total amount of state necessary for chained 
joins. As of time
+ * complexity, it performs better in the worst cases where the number of 
records in the intermediate
+ * state is large but worst than reorded binary joins when the number of 
records in the intermediate
+ * state is small.
+ */
+public class StreamingMultiJoinOperator extends 
AbstractStreamOperatorV2<RowData>
+        implements MultipleInputStreamOperator<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StreamingMultiJoinOperator.class);
+    private static final long serialVersionUID = 1L;
+
+    private final List<JoinInputSideSpec> inputSpecs;
+    private final List<JoinRelType> joinTypes;
+    private final List<InternalTypeInfo<RowData>> inputTypes;
+    private final MultiJoinCondition multiJoinCondition;
+    private final boolean[] filterNulls;
+    private final long[] stateRetentionTime;
+    private final List<Input> inputs;
+    private final boolean isFullOuterJoin;
+    private final MultiJoinCondition[] outerJoinConditions;
+
+    private transient List<JoinRecordStateView> stateHandlers;
+    private transient ValueState<Long> cleanupTimeState;
+    private transient TimestampedCollector<RowData> collector;
+    private transient List<RowData> nullRows;
+
+    /** Represents the different phases of the join process. */
+    private enum JoinPhase {
+        /** Phase where we calculate match counts (associations) without 
emitting results */
+        CALCULATE_MATCHES,
+        /** Phase where we emit the actual join results */
+        EMIT_RESULTS
+    }
+
+    /**
+     * Constructor for the streaming multi-way join operator.
+     *
+     * @param parameters Operator parameters
+     * @param inputTypes Types for each input
+     * @param inputSpecs Specifications for each input
+     * @param joinTypes Types of joins between inputs
+     * @param multiJoinCondition Condition for the multi-way join
+     * @param filterNulls Whether to filter nulls for each join key
+     * @param stateRetentionTime Retention time for state
+     * @param isFullOuterJoin Whether this is a full outer join
+     * @param outerJoinConditions Conditions for outer joins
+     */
+    public StreamingMultiJoinOperator(
+            StreamOperatorParameters<RowData> parameters,
+            List<InternalTypeInfo<RowData>> inputTypes,
+            List<JoinInputSideSpec> inputSpecs,
+            List<JoinRelType> joinTypes,
+            MultiJoinCondition multiJoinCondition,
+            boolean[] filterNulls,
+            long[] stateRetentionTime,
+            boolean isFullOuterJoin,
+            MultiJoinCondition[] outerJoinConditions) {
+        super(parameters, inputSpecs.size());
+        this.inputTypes = inputTypes;
+        this.inputSpecs = inputSpecs;
+        this.joinTypes = joinTypes;
+        this.multiJoinCondition = multiJoinCondition;
+        this.filterNulls = filterNulls;
+        this.stateRetentionTime = stateRetentionTime;
+        this.isFullOuterJoin = isFullOuterJoin;
+        this.outerJoinConditions = outerJoinConditions;
+        this.inputs = new ArrayList<>(inputSpecs.size());
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeCollector();
+        initializeNullRows();
+        initializeStateHandlers();
+        initializeCleanupState();
+    }
+
+    @Override
+    public void close() throws Exception {
+        closeConditions();
+        super.close();
+    }
+
+    public void processElement(int inputId, StreamRecord<RowData> element) 
throws Exception {
+        RowData input = element.getValue();
+        long timestamp = element.getTimestamp();
+        processElement(inputId, input, timestamp);
+    }
+
+    private void processElement(int inputId, RowData input, long timestamp) 
throws Exception {
+        inputId = inputId - 1; // Convert to 0-based index

Review Comment:
   It's considered a bad practice to reassign the parameters.
   
   When I read a code and see a method parameter used I assume it's not 
modified. When you do modify it, it forces me be extra cautious.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java:
##########
@@ -0,0 +1,474 @@
+package org.apache.flink.table.runtime.operators.join.stream;
+
+// TODO Gustavo Confirm we should create a private custom enum for join types 
instead of using
+// Calcite's JoinRelType
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.MultiJoinCondition;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews;
+import 
org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Streaming multi-way join operator which supports inner join and 
left/right/full outer join. It
+ * eliminates the intermediate state necessary for a chain of multiple binary 
joins. In other words,
+ * it considerable reduces the total amount of state necessary for chained 
joins. As of time
+ * complexity, it performs better in the worst cases where the number of 
records in the intermediate
+ * state is large but worst than reorded binary joins when the number of 
records in the intermediate
+ * state is small.
+ */
+public class StreamingMultiJoinOperator extends 
AbstractStreamOperatorV2<RowData>
+        implements MultipleInputStreamOperator<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StreamingMultiJoinOperator.class);
+    private static final long serialVersionUID = 1L;
+
+    private final List<JoinInputSideSpec> inputSpecs;
+    private final List<JoinRelType> joinTypes;
+    private final List<InternalTypeInfo<RowData>> inputTypes;
+    private final MultiJoinCondition multiJoinCondition;
+    private final boolean[] filterNulls;
+    private final long[] stateRetentionTime;

Review Comment:
   I don't think we need separate state retention time for each input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to