lindong28 commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r945439037
########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImplTest.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent; +import org.apache.flink.runtime.operators.coordination.AcknowledgeCloseGatewayEvent; +import org.apache.flink.runtime.operators.coordination.CloseGatewayEvent; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.operators.coordination.TestOperatorEvent; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RunnableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OperatorEventDispatcherImpl}. */ +public class OperatorEventDispatcherImplTest { + private static final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + @Test + public void testSendOperatorEvent() { + DispatcherEventListener eventListener = new DispatcherEventListener(classLoader); + OperatorEventDispatcherImpl dispatcher = + new OperatorEventDispatcherImpl(classLoader, eventListener); + OperatorID operatorID = new OperatorID(); + + OperatorEventGateway gateway = dispatcher.getOperatorEventGateway(operatorID); + gateway.sendEventToCoordinator(new TestOperatorEvent(0)); + + assertThat(eventListener.sentEventsMap).containsOnlyKeys(operatorID); + assertThat(eventListener.sentEventsMap.get(operatorID)) + .containsExactly(new TestOperatorEvent(0)); + } + + @Test + public void testReceiveAndForwardOperatorEvent() throws Exception { + DispatcherEventListener eventListener = new DispatcherEventListener(classLoader); + OperatorEventDispatcherImpl dispatcher = + new OperatorEventDispatcherImpl(classLoader, eventListener); + OperatorID operatorID = new OperatorID(); + dispatcher.registerEventHandler(operatorID, eventListener); + + dispatcher.dispatchEventToHandlers(operatorID, serialize(new TestOperatorEvent(0))); + + assertThat(eventListener.receivedEvents).containsExactly(new TestOperatorEvent(0)); + } + + @Test + public void testCloseGatewayOnCheckpoint() throws Exception { + DispatcherEventListener eventListener = new DispatcherEventListener(classLoader); + OperatorEventDispatcherImpl dispatcher = + new OperatorEventDispatcherImpl(classLoader, eventListener); + OperatorID operatorID = new OperatorID(); + + OperatorEventGateway gateway = dispatcher.getOperatorEventGateway(operatorID); + dispatcher.dispatchEventToHandlers(operatorID, serialize(new CloseGatewayEvent(0L, 0))); + gateway.sendEventToCoordinator(new TestOperatorEvent(0)); + + assertThat(eventListener.sentEventsMap).containsOnlyKeys(operatorID); + assertThat(eventListener.sentEventsMap.get(operatorID)) + .containsExactly(new AcknowledgeCloseGatewayEvent(0L, 0)); + } + + @Test + public void testReopenGatewayOnCompletedCheckpoint() throws Exception { + DispatcherEventListener eventListener = new DispatcherEventListener(classLoader); + OperatorEventDispatcherImpl dispatcher = + new OperatorEventDispatcherImpl(classLoader, eventListener); + OperatorID operatorID = new OperatorID(); + StreamOperator<?> operator = new MockStreamOperator<>(operatorID); + + OperatorEventGateway gateway = dispatcher.getOperatorEventGateway(operatorID); + dispatcher.dispatchEventToHandlers(operatorID, serialize(new CloseGatewayEvent(0L, 0))); + gateway.sendEventToCoordinator(new TestOperatorEvent(0)); + dispatcher.snapshotOperatorEventGatewayIfExists(operator); + dispatcher.notifyOperatorSnapshotCompletedIfExists(operator, 0L); + + assertThat(eventListener.sentEventsMap).containsOnlyKeys(operatorID); + assertThat(eventListener.sentEventsMap.get(operatorID)) + .containsExactly( + new AcknowledgeCloseGatewayEvent(0L, 0), + new AcknowledgeCheckpointEvent(0L, 0), + new TestOperatorEvent(0)); + } + + @Test + public void testReloadOperatorEventFromSnapshot() throws Exception { + DispatcherEventListener eventListener = new DispatcherEventListener(classLoader); + OperatorEventDispatcherImpl dispatcher = + new OperatorEventDispatcherImpl(classLoader, eventListener); + OperatorID operatorID = new OperatorID(); + StreamOperator<?> operator = new MockStreamOperator<>(operatorID); + + OperatorEventGateway gateway = dispatcher.getOperatorEventGateway(operatorID); + dispatcher.dispatchEventToHandlers(operatorID, serialize(new CloseGatewayEvent(0L, 0))); + gateway.sendEventToCoordinator(new TestOperatorEvent(0)); + dispatcher.snapshotOperatorEventGatewayIfExists(operator); + dispatcher.notifyOperatorSnapshotCompletedIfExists(operator, 0L); + + eventListener.sentEventsMap.clear(); + + OperatorEventDispatcherImpl reloadedDispatcher = + new OperatorEventDispatcherImpl(classLoader, eventListener); + reloadedDispatcher.getOperatorEventGateway(operatorID); + reloadedDispatcher.initializeOperatorEventGatewayIfExists(operator); + + assertThat(eventListener.sentEventsMap).containsOnlyKeys(operatorID); + assertThat(eventListener.sentEventsMap.get(operatorID)) + .containsExactly(new TestOperatorEvent(0)); + } + + private static class MockStreamOperator<T> extends AbstractStreamOperator<T> { + private final OperatorID operatorID; + + private final OperatorStateBackend operatorStateBackend; + + private MockStreamOperator(OperatorID operatorID) { + this.operatorID = operatorID; + this.operatorStateBackend = new MockOperatorStateBackend(); + } + + @Override + public OperatorID getOperatorID() { + return operatorID; + } + + @Override + public OperatorStateBackend getOperatorStateBackend() { + return operatorStateBackend; + } + + @Override + public StreamingRuntimeContext getRuntimeContext() { + return new MockStreamingRuntimeContext(true, 1, 0); + } + } + + private static class MockOperatorStateBackend extends MockOperatorStateStore + implements OperatorStateBackend { + + @Override + public void dispose() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( + long checkpointId, + long timestamp, + @Nonnull CheckpointStreamFactory streamFactory, + @Nonnull CheckpointOptions checkpointOptions) + throws Exception { + throw new UnsupportedOperationException(); + } + } + + private static SerializedValue<OperatorEvent> serialize(OperatorEvent event) { + final SerializedValue<OperatorEvent> serializedEvent; + try { + serializedEvent = new SerializedValue<>(event); + } catch (IOException e) { + // this is not a recoverable situation, so we wrap this in an + // unchecked exception and let it bubble up + throw new FlinkRuntimeException("Cannot serialize operator event", e); + } + return serializedEvent; + } + + /** + * A class that monitors all events sent by an {@link OperatorEventDispatcherImpl}, and all + * events that the {@link OperatorEventDispatcherImpl} has received and forwarded to a specific + * operator. + */ + private static class DispatcherEventListener Review Comment: Would it be more intuitive to name it `MockTaskOperatorEventGateway`? ########## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorEventsExactlyOnceITCase.java: ########## @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.ListAccumulator; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.runtime.operators.coordination.CoordinationEventsExactlyOnceITCaseUtils.IntegerEvent; +import static org.apache.flink.runtime.operators.coordination.CoordinationEventsExactlyOnceITCaseUtils.TestScript; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test case that validates the exactly-once mechanism for operator events sent from an + * operator to its coordinator around checkpoint. + * + * <p>In the test cases provided in this class, a test stream operator would send operator events to + * its coordinator while aligned or unaligned checkpointing is enabled. Some of these events would + * be sent when the coordinator has completed a checkpoint, while the operator has not yet. The + * coordinator or operator may inject failures at some time during the job's execution, and this + * class verifies that the exactly-once semantics of the delivery of these events would not be + * affected in these situations. + * + * <p>See also {@link CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase} for integration + * tests about operator events sent in the reversed direction. + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +public class StreamOperatorEventsExactlyOnceITCase { + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER = + new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(1) + .build()); + + private StreamExecutionEnvironment env; + + @Before + public void setup() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(100); + EventReceivingCoordinator.RECEIVED_INTEGERS.clear(); + TestScript.reset(); + } + + @Test + public void testCheckpointWithCoordinatorFailure() throws Exception { + executeAndVerifyResult(true, false); + } + + @Test + public void testUnalignedCheckpointWithCoordinatorFailure() throws Exception { + env.getCheckpointConfig().enableUnalignedCheckpoints(); + executeAndVerifyResult(true, false); + } + + @Test + public void testCheckpointWithSubtaskFailure() throws Exception { + executeAndVerifyResult(false, true); + } + + @Test + public void testUnalignedCheckpointWithSubtaskFailure() throws Exception { + env.getCheckpointConfig().enableUnalignedCheckpoints(); + executeAndVerifyResult(false, true); + } + + private void executeAndVerifyResult( + boolean shouldCoordinatorFailAtSecondCheckpoint, + boolean shouldOperatorFailAtSecondCheckpoint) + throws Exception { + env.addSource(new GuaranteeCheckpointSourceFunction()) + .disableChaining() + .transform( + "eventSending", + TypeInformation.of(Integer.class), + new EventSendingOperatorFactory( + shouldCoordinatorFailAtSecondCheckpoint, + shouldOperatorFailAtSecondCheckpoint)) + .addSink(new DiscardingSink<>()); + + JobExecutionResult executionResult = + MINI_CLUSTER + .getMiniCluster() + .executeJobBlocking(env.getStreamGraph().getJobGraph()); + + List<Integer> sentNumbers = + executionResult.getAccumulatorResult(EventSendingOperator.ACCUMULATOR_NAME); + assertThat(EventReceivingCoordinator.RECEIVED_INTEGERS) + .containsExactly(sentNumbers.toArray(new Integer[0])); + + assertThat(TestScript.getForOperator("EventReceivingCoordinator").hasAlreadyFailed()) + .isEqualTo(shouldCoordinatorFailAtSecondCheckpoint); + assertThat(TestScript.getForOperator("EventSendingOperator-subtask0").hasAlreadyFailed()) + .isEqualTo(shouldOperatorFailAtSecondCheckpoint); + } + + /** + * A source function that guarantees that there are at lease two checkpoints, and that there are + * operator events sent from the test operator to its coordinator during the first checkpoint. + */ + private static class GuaranteeCheckpointSourceFunction + extends RichParallelSourceFunction<Integer> implements CheckpointedFunction { + private boolean isCanceled; + + private boolean isFirstCheckpointCompleted; + + private boolean isSecondCheckpointCompleted; + + private GuaranteeCheckpointSourceFunction() { + this.isCanceled = false; + this.isFirstCheckpointCompleted = false; + this.isSecondCheckpointCompleted = false; + } + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + while (!isCanceled && !isSecondCheckpointCompleted) { + Thread.sleep(10); + } + + Thread.sleep(100); + } + + @Override + public void cancel() { + isCanceled = true; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception {} + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (!isFirstCheckpointCompleted) { + int currentLength = EventReceivingCoordinator.RECEIVED_INTEGERS.size(); + while (EventReceivingCoordinator.RECEIVED_INTEGERS.size() == currentLength) { + Thread.sleep(10); + } + isFirstCheckpointCompleted = true; + } else if (!isSecondCheckpointCompleted) { + isSecondCheckpointCompleted = true; + } + } + } + + /** + * A wrapper operator factory for {@link EventReceivingCoordinator} and {@link + * EventSendingOperator}. + */ + private static class EventSendingOperatorFactory extends AbstractStreamOperatorFactory<Integer> + implements CoordinatedOperatorFactory<Integer>, + OneInputStreamOperatorFactory<Integer, Integer> { + private final boolean shouldCoordinatorFailAtSecondCheckpoint; + + private final boolean shouldOperatorFailAtSecondCheckpoint; + + private EventSendingOperatorFactory( + boolean shouldCoordinatorFailAtSecondCheckpoint, + boolean shouldOperatorFailAtSecondCheckpoint) { + this.shouldCoordinatorFailAtSecondCheckpoint = shouldCoordinatorFailAtSecondCheckpoint; + this.shouldOperatorFailAtSecondCheckpoint = shouldOperatorFailAtSecondCheckpoint; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new OperatorCoordinator.Provider() { + + @Override + public OperatorID getOperatorId() { + return operatorID; + } + + @Override + public OperatorCoordinator create(OperatorCoordinator.Context context) { + return new EventReceivingCoordinator( + context, operatorID, shouldCoordinatorFailAtSecondCheckpoint); + } + }; + } + + @Override + public <T extends StreamOperator<Integer>> T createStreamOperator( + StreamOperatorParameters<Integer> parameters) { + final OperatorID operatorId = parameters.getStreamConfig().getOperatorID(); + OperatorEventGateway gateway = + parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId); + EventSendingOperator operator = + new EventSendingOperator(gateway, shouldOperatorFailAtSecondCheckpoint); + operator.setup( + parameters.getContainingTask(), + parameters.getStreamConfig(), + parameters.getOutput()); + return (T) operator; + } + + @Override + public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { + return EventSendingOperator.class; + } + } + + /** + * A coordinator that listens on received integer events. It would store received integers into + * a global list and its snapshots. It can fail during the second checkpoint process if + * required. + */ + private static class EventReceivingCoordinator implements OperatorCoordinator { + + /** A global list that records the integers that the test coordinator has received. */ + private static final List<Integer> RECEIVED_INTEGERS = new ArrayList<>(); + + private final Context context; + + private final TestScript testScript; + + private final ExecutorService mailboxExecutor; + + private final Map<Long, List<Integer>> attemptedCheckpointValueMap; + + private final boolean shouldFailAtSecondCheckpoint; + + private boolean isFirstCheckpointCompleted; + + private boolean isSecondCheckpointCompleted; + + private EventReceivingCoordinator( + Context context, OperatorID operatorID, boolean shouldFailAtSecondCheckpoint) { + this.context = context; + this.testScript = TestScript.getForOperator("EventReceivingCoordinator"); + this.shouldFailAtSecondCheckpoint = shouldFailAtSecondCheckpoint; + this.isFirstCheckpointCompleted = false; + this.isSecondCheckpointCompleted = false; + + this.attemptedCheckpointValueMap = new HashMap<>(); + + this.mailboxExecutor = + Executors.newSingleThreadExecutor( + new DispatcherThreadFactory( + Thread.currentThread().getThreadGroup(), + "Coordinator Mailbox for " + operatorID)); + } + + @Override + public void start() throws Exception {} + + @Override + public void close() throws Exception { + mailboxExecutor.shutdownNow(); + assertThat(mailboxExecutor.awaitTermination(10, TimeUnit.MINUTES)).isTrue(); + } + + @Override + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) + throws Exception { + if (subtask != 0 || !(event instanceof IntegerEvent)) { + throw new Exception( + String.format("Don't recognize event '%s' from task %d.", event, subtask)); + } + + runInMailbox(() -> RECEIVED_INTEGERS.add(((IntegerEvent) event).value)); + } + + @Override + public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) + throws Exception { + runInMailbox( + () -> { + if (isFirstCheckpointCompleted + && !isSecondCheckpointCompleted + && shouldFailAtSecondCheckpoint + && !testScript.hasAlreadyFailed()) { + testScript.recordHasFailed(); + context.failJob(new Exception("test failure")); + resultFuture.completeExceptionally(new Exception("test failure")); + return; + } + + attemptedCheckpointValueMap.put( + checkpointId, new ArrayList<>(RECEIVED_INTEGERS)); + + ByteBuffer byteBuffer = ByteBuffer.allocate(RECEIVED_INTEGERS.size() * 4); + for (int i : RECEIVED_INTEGERS) { + byteBuffer.putInt(i); + } + resultFuture.complete(byteBuffer.array()); + }); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + runInMailbox( + () -> { + if (!isFirstCheckpointCompleted) { + isFirstCheckpointCompleted = true; + } else if (!isSecondCheckpointCompleted) { + isSecondCheckpointCompleted = true; + } + }); + } + + @Override + public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) { + runInMailbox( + () -> { + isFirstCheckpointCompleted = true; + + RECEIVED_INTEGERS.clear(); + + if (checkpointData == null) { + return; + } + + ByteBuffer byteBuffer = ByteBuffer.wrap(checkpointData); + for (int i = 0; i < checkpointData.length / 4; i++) { + RECEIVED_INTEGERS.add(byteBuffer.getInt()); + } + + attemptedCheckpointValueMap.put( Review Comment: Is this line needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org