gaoyunhaii commented on code in PR #20374: URL: https://github.com/apache/flink/pull/20374#discussion_r936334758
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import java.util.Arrays; +import java.util.List; + +/** + * ExecutionOrderEnforcerOperator has two inputs, one of which is a source, and the other is the + * dependent upstream. It enforces that the input source is executed after the dependent input is + * finished. Everything passed from the inputs is forwarded to the output, though typically the + * dependent input should not send anything. + * + * <p>The operator must be chained with the source, which is generally ensured by the {@link + * ExecutionOrderEnforcerOperatorFactory}. If chaining is explicitly disabled, the enforcer can not + * work as expected. + * + * <p>The operator is used only for dynamic filtering at present. + */ +public class ExecutionOrderEnforcerOperator<IN> extends AbstractStreamOperatorV2<IN> + implements MultipleInputStreamOperator<IN> { + + public ExecutionOrderEnforcerOperator(StreamOperatorParameters<IN> parameters) { + super(parameters, 2); + } + + @Override + public List<Input> getInputs() { + return Arrays.asList(new ForwardingInput<>(output), new ForwardingInput<>(output)); + } + + private static class ForwardingInput<IN> implements Input<IN> { Review Comment: Although logically this operator only used in batch mode, logically the processing of watermark / watermark status / latency mark here is not right. We might change the Input to be the subclass of `AbstractInput`, which has dealt with these event correctly. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.CoordinatorStore; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator + * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators. + */ +public class DynamicFilteringDataCollectorOperatorCoordinator + implements OperatorCoordinator, CoordinationRequestHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class); + + private final CoordinatorStore coordinatorStore; + private final List<String> dynamicFilteringDataListenerIDs; + + private boolean hasReceivedFilteringData; + + public DynamicFilteringDataCollectorOperatorCoordinator( + Context context, List<String> dynamicFilteringDataListenerIDs) { + this.coordinatorStore = checkNotNull(context.getCoordinatorStore()); + this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs); + } + + @Override + public void start() throws Exception {} + + @Override + public void close() throws Exception {} + + @Override + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) + throws Exception { + // Since there might be speculative execution, once the dynamic filter collectors operator + // has been executed for multiple attempts, we only keep the first notification. + if (hasReceivedFilteringData) { Review Comment: If there are global failover, all the tasks would restarted, then here we might have some problem since the flag would not be clear. We might make the provider of this operator coordinator extends the `RecreateOnResetOperatorCoordinator.Provider`. If the dpp task restarted, it's also possible that the filtering data get changed. In this case, we have to ensure the filtering data sent to the source coordinator and the join operators are same. However, in consideration of the speculative execution, we could not easily know which version is the chosen one. Thus for now we may need to ask user to disable dpp in this case. For the long run, we may consider restarting sources in some way once the data changed. In summary, we may 1. Use RecreateOnResetOperatorCoordinator to clear the state on global failover. 2. When received repeat data, compare it to the previous one, if not the same, then throw an exception now. 3. Create a follow-up issue to optimize this behavior (restarting sources or update partition is source is not fully finished) in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org