gaoyunhaii commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936334758


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * ExecutionOrderEnforcerOperator has two inputs, one of which is a source, 
and the other is the
+ * dependent upstream. It enforces that the input source is executed after the 
dependent input is
+ * finished. Everything passed from the inputs is forwarded to the output, 
though typically the
+ * dependent input should not send anything.
+ *
+ * <p>The operator must be chained with the source, which is generally ensured 
by the {@link
+ * ExecutionOrderEnforcerOperatorFactory}. If chaining is explicitly disabled, 
the enforcer can not
+ * work as expected.
+ *
+ * <p>The operator is used only for dynamic filtering at present.
+ */
+public class ExecutionOrderEnforcerOperator<IN> extends 
AbstractStreamOperatorV2<IN>
+        implements MultipleInputStreamOperator<IN> {
+
+    public ExecutionOrderEnforcerOperator(StreamOperatorParameters<IN> 
parameters) {
+        super(parameters, 2);
+    }
+
+    @Override
+    public List<Input> getInputs() {
+        return Arrays.asList(new ForwardingInput<>(output), new 
ForwardingInput<>(output));
+    }
+
+    private static class ForwardingInput<IN> implements Input<IN> {

Review Comment:
   Although logically this operator only used in batch mode, logically the 
processing of watermark / watermark status / latency mark here is not right. 
   
   We might change the Input to be the subclass of `AbstractInput`, which has 
dealt with these event correctly. 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. 
The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening 
source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private boolean hasReceivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = checkNotNull(context.getCoordinatorStore());
+        this.dynamicFilteringDataListenerIDs = 
checkNotNull(dynamicFilteringDataListenerIDs);
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event)
+            throws Exception {
+        // Since there might be speculative execution, once the dynamic filter 
collectors operator
+        // has been executed for multiple attempts, we only keep the first 
notification.
+        if (hasReceivedFilteringData) {

Review Comment:
   If there are global failover, all the tasks would restarted, then here we 
might have some problem since the flag  would not be clear. We might make the 
provider of this operator coordinator extends the 
`RecreateOnResetOperatorCoordinator.Provider`.
   
   If the dpp task restarted, it's also possible that the filtering data get 
changed. In this case, we have to ensure the filtering data sent to the source 
coordinator and the join operators are same. However, in consideration of the 
speculative execution, we could not easily know which version is the chosen 
one. Thus for now we may need to ask user to disable dpp in this case. For the 
long run, we may consider restarting sources in some way once the data changed. 
   
   In summary, we may
   1. Use RecreateOnResetOperatorCoordinator to clear the state on global 
failover.
   2. When received repeat data, compare it to the previous one, if not the 
same, then throw an exception now.
   3. Create a follow-up issue to optimize this behavior (restarting sources or 
update partition is source is not fully finished) in the future. 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to