lindong28 commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r945439037


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImplTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import 
org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
+import 
org.apache.flink.runtime.operators.coordination.AcknowledgeCloseGatewayEvent;
+import org.apache.flink.runtime.operators.coordination.CloseGatewayEvent;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RunnableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OperatorEventDispatcherImpl}. */
+public class OperatorEventDispatcherImplTest {
+    private static final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+    @Test
+    public void testSendOperatorEvent() {
+        DispatcherEventListener eventListener = new 
DispatcherEventListener(classLoader);
+        OperatorEventDispatcherImpl dispatcher =
+                new OperatorEventDispatcherImpl(classLoader, eventListener);
+        OperatorID operatorID = new OperatorID();
+
+        OperatorEventGateway gateway = 
dispatcher.getOperatorEventGateway(operatorID);
+        gateway.sendEventToCoordinator(new TestOperatorEvent(0));
+
+        assertThat(eventListener.sentEventsMap).containsOnlyKeys(operatorID);
+        assertThat(eventListener.sentEventsMap.get(operatorID))
+                .containsExactly(new TestOperatorEvent(0));
+    }
+
+    @Test
+    public void testReceiveAndForwardOperatorEvent() throws Exception {
+        DispatcherEventListener eventListener = new 
DispatcherEventListener(classLoader);
+        OperatorEventDispatcherImpl dispatcher =
+                new OperatorEventDispatcherImpl(classLoader, eventListener);
+        OperatorID operatorID = new OperatorID();
+        dispatcher.registerEventHandler(operatorID, eventListener);
+
+        dispatcher.dispatchEventToHandlers(operatorID, serialize(new 
TestOperatorEvent(0)));
+
+        assertThat(eventListener.receivedEvents).containsExactly(new 
TestOperatorEvent(0));
+    }
+
+    @Test
+    public void testCloseGatewayOnCheckpoint() throws Exception {
+        DispatcherEventListener eventListener = new 
DispatcherEventListener(classLoader);
+        OperatorEventDispatcherImpl dispatcher =
+                new OperatorEventDispatcherImpl(classLoader, eventListener);
+        OperatorID operatorID = new OperatorID();
+
+        OperatorEventGateway gateway = 
dispatcher.getOperatorEventGateway(operatorID);
+        dispatcher.dispatchEventToHandlers(operatorID, serialize(new 
CloseGatewayEvent(0L, 0)));
+        gateway.sendEventToCoordinator(new TestOperatorEvent(0));
+
+        assertThat(eventListener.sentEventsMap).containsOnlyKeys(operatorID);
+        assertThat(eventListener.sentEventsMap.get(operatorID))
+                .containsExactly(new AcknowledgeCloseGatewayEvent(0L, 0));
+    }
+
+    @Test
+    public void testReopenGatewayOnCompletedCheckpoint() throws Exception {
+        DispatcherEventListener eventListener = new 
DispatcherEventListener(classLoader);
+        OperatorEventDispatcherImpl dispatcher =
+                new OperatorEventDispatcherImpl(classLoader, eventListener);
+        OperatorID operatorID = new OperatorID();
+        StreamOperator<?> operator = new MockStreamOperator<>(operatorID);
+
+        OperatorEventGateway gateway = 
dispatcher.getOperatorEventGateway(operatorID);
+        dispatcher.dispatchEventToHandlers(operatorID, serialize(new 
CloseGatewayEvent(0L, 0)));
+        gateway.sendEventToCoordinator(new TestOperatorEvent(0));
+        dispatcher.snapshotOperatorEventGatewayIfExists(operator);
+        dispatcher.notifyOperatorSnapshotCompletedIfExists(operator, 0L);
+
+        assertThat(eventListener.sentEventsMap).containsOnlyKeys(operatorID);
+        assertThat(eventListener.sentEventsMap.get(operatorID))
+                .containsExactly(
+                        new AcknowledgeCloseGatewayEvent(0L, 0),
+                        new AcknowledgeCheckpointEvent(0L, 0),
+                        new TestOperatorEvent(0));
+    }
+
+    @Test
+    public void testReloadOperatorEventFromSnapshot() throws Exception {
+        DispatcherEventListener eventListener = new 
DispatcherEventListener(classLoader);
+        OperatorEventDispatcherImpl dispatcher =
+                new OperatorEventDispatcherImpl(classLoader, eventListener);
+        OperatorID operatorID = new OperatorID();
+        StreamOperator<?> operator = new MockStreamOperator<>(operatorID);
+
+        OperatorEventGateway gateway = 
dispatcher.getOperatorEventGateway(operatorID);
+        dispatcher.dispatchEventToHandlers(operatorID, serialize(new 
CloseGatewayEvent(0L, 0)));
+        gateway.sendEventToCoordinator(new TestOperatorEvent(0));
+        dispatcher.snapshotOperatorEventGatewayIfExists(operator);
+        dispatcher.notifyOperatorSnapshotCompletedIfExists(operator, 0L);
+
+        eventListener.sentEventsMap.clear();
+
+        OperatorEventDispatcherImpl reloadedDispatcher =
+                new OperatorEventDispatcherImpl(classLoader, eventListener);
+        reloadedDispatcher.getOperatorEventGateway(operatorID);
+        reloadedDispatcher.initializeOperatorEventGatewayIfExists(operator);
+
+        assertThat(eventListener.sentEventsMap).containsOnlyKeys(operatorID);
+        assertThat(eventListener.sentEventsMap.get(operatorID))
+                .containsExactly(new TestOperatorEvent(0));
+    }
+
+    private static class MockStreamOperator<T> extends 
AbstractStreamOperator<T> {
+        private final OperatorID operatorID;
+
+        private final OperatorStateBackend operatorStateBackend;
+
+        private MockStreamOperator(OperatorID operatorID) {
+            this.operatorID = operatorID;
+            this.operatorStateBackend = new MockOperatorStateBackend();
+        }
+
+        @Override
+        public OperatorID getOperatorID() {
+            return operatorID;
+        }
+
+        @Override
+        public OperatorStateBackend getOperatorStateBackend() {
+            return operatorStateBackend;
+        }
+
+        @Override
+        public StreamingRuntimeContext getRuntimeContext() {
+            return new MockStreamingRuntimeContext(true, 1, 0);
+        }
+    }
+
+    private static class MockOperatorStateBackend extends 
MockOperatorStateStore
+            implements OperatorStateBackend {
+
+        @Override
+        public void dispose() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Nonnull
+        @Override
+        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
+                long checkpointId,
+                long timestamp,
+                @Nonnull CheckpointStreamFactory streamFactory,
+                @Nonnull CheckpointOptions checkpointOptions)
+                throws Exception {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static SerializedValue<OperatorEvent> serialize(OperatorEvent 
event) {
+        final SerializedValue<OperatorEvent> serializedEvent;
+        try {
+            serializedEvent = new SerializedValue<>(event);
+        } catch (IOException e) {
+            // this is not a recoverable situation, so we wrap this in an
+            // unchecked exception and let it bubble up
+            throw new FlinkRuntimeException("Cannot serialize operator event", 
e);
+        }
+        return serializedEvent;
+    }
+
+    /**
+     * A class that monitors all events sent by an {@link 
OperatorEventDispatcherImpl}, and all
+     * events that the {@link OperatorEventDispatcherImpl} has received and 
forwarded to a specific
+     * operator.
+     */
+    private static class DispatcherEventListener

Review Comment:
   Would it be more intuitive to name it `MockTaskOperatorEventGateway`?



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorEventsExactlyOnceITCase.java:
##########
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.runtime.operators.coordination.CoordinationEventsExactlyOnceITCaseUtils.IntegerEvent;
+import static 
org.apache.flink.runtime.operators.coordination.CoordinationEventsExactlyOnceITCaseUtils.TestScript;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for 
operator events sent from an
+ * operator to its coordinator around checkpoint.
+ *
+ * <p>In the test cases provided in this class, a test stream operator would 
send operator events to
+ * its coordinator while aligned or unaligned checkpointing is enabled. Some 
of these events would
+ * be sent when the coordinator has completed a checkpoint, while the operator 
has not yet. The
+ * coordinator or operator may inject failures at some time during the job's 
execution, and this
+ * class verifies that the exactly-once semantics of the delivery of these 
events would not be
+ * affected in these situations.
+ *
+ * <p>See also {@link 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase} for integration
+ * tests about operator events sent in the reversed direction.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class StreamOperatorEventsExactlyOnceITCase {
+
+    @ClassRule
+    public static final MiniClusterResource MINI_CLUSTER =
+            new MiniClusterResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        EventReceivingCoordinator.RECEIVED_INTEGERS.clear();
+        TestScript.reset();
+    }
+
+    @Test
+    public void testCheckpointWithCoordinatorFailure() throws Exception {
+        executeAndVerifyResult(true, false);
+    }
+
+    @Test
+    public void testUnalignedCheckpointWithCoordinatorFailure() throws 
Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResult(true, false);
+    }
+
+    @Test
+    public void testCheckpointWithSubtaskFailure() throws Exception {
+        executeAndVerifyResult(false, true);
+    }
+
+    @Test
+    public void testUnalignedCheckpointWithSubtaskFailure() throws Exception {
+        env.getCheckpointConfig().enableUnalignedCheckpoints();
+        executeAndVerifyResult(false, true);
+    }
+
+    private void executeAndVerifyResult(
+            boolean shouldCoordinatorFailAtSecondCheckpoint,
+            boolean shouldOperatorFailAtSecondCheckpoint)
+            throws Exception {
+        env.addSource(new GuaranteeCheckpointSourceFunction())
+                .disableChaining()
+                .transform(
+                        "eventSending",
+                        TypeInformation.of(Integer.class),
+                        new EventSendingOperatorFactory(
+                                shouldCoordinatorFailAtSecondCheckpoint,
+                                shouldOperatorFailAtSecondCheckpoint))
+                .addSink(new DiscardingSink<>());
+
+        JobExecutionResult executionResult =
+                MINI_CLUSTER
+                        .getMiniCluster()
+                        
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+        List<Integer> sentNumbers =
+                
executionResult.getAccumulatorResult(EventSendingOperator.ACCUMULATOR_NAME);
+        assertThat(EventReceivingCoordinator.RECEIVED_INTEGERS)
+                .containsExactly(sentNumbers.toArray(new Integer[0]));
+
+        
assertThat(TestScript.getForOperator("EventReceivingCoordinator").hasAlreadyFailed())
+                .isEqualTo(shouldCoordinatorFailAtSecondCheckpoint);
+        
assertThat(TestScript.getForOperator("EventSendingOperator-subtask0").hasAlreadyFailed())
+                .isEqualTo(shouldOperatorFailAtSecondCheckpoint);
+    }
+
+    /**
+     * A source function that guarantees that there are at lease two 
checkpoints, and that there are
+     * operator events sent from the test operator to its coordinator during 
the first checkpoint.
+     */
+    private static class GuaranteeCheckpointSourceFunction
+            extends RichParallelSourceFunction<Integer> implements 
CheckpointedFunction {
+        private boolean isCanceled;
+
+        private boolean isFirstCheckpointCompleted;
+
+        private boolean isSecondCheckpointCompleted;
+
+        private GuaranteeCheckpointSourceFunction() {
+            this.isCanceled = false;
+            this.isFirstCheckpointCompleted = false;
+            this.isSecondCheckpointCompleted = false;
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            while (!isCanceled && !isSecondCheckpointCompleted) {
+                Thread.sleep(10);
+            }
+
+            Thread.sleep(100);
+        }
+
+        @Override
+        public void cancel() {
+            isCanceled = true;
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) 
throws Exception {}
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+            if (!isFirstCheckpointCompleted) {
+                int currentLength = 
EventReceivingCoordinator.RECEIVED_INTEGERS.size();
+                while (EventReceivingCoordinator.RECEIVED_INTEGERS.size() == 
currentLength) {
+                    Thread.sleep(10);
+                }
+                isFirstCheckpointCompleted = true;
+            } else if (!isSecondCheckpointCompleted) {
+                isSecondCheckpointCompleted = true;
+            }
+        }
+    }
+
+    /**
+     * A wrapper operator factory for {@link EventReceivingCoordinator} and 
{@link
+     * EventSendingOperator}.
+     */
+    private static class EventSendingOperatorFactory extends 
AbstractStreamOperatorFactory<Integer>
+            implements CoordinatedOperatorFactory<Integer>,
+                    OneInputStreamOperatorFactory<Integer, Integer> {
+        private final boolean shouldCoordinatorFailAtSecondCheckpoint;
+
+        private final boolean shouldOperatorFailAtSecondCheckpoint;
+
+        private EventSendingOperatorFactory(
+                boolean shouldCoordinatorFailAtSecondCheckpoint,
+                boolean shouldOperatorFailAtSecondCheckpoint) {
+            this.shouldCoordinatorFailAtSecondCheckpoint = 
shouldCoordinatorFailAtSecondCheckpoint;
+            this.shouldOperatorFailAtSecondCheckpoint = 
shouldOperatorFailAtSecondCheckpoint;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new OperatorCoordinator.Provider() {
+
+                @Override
+                public OperatorID getOperatorId() {
+                    return operatorID;
+                }
+
+                @Override
+                public OperatorCoordinator create(OperatorCoordinator.Context 
context) {
+                    return new EventReceivingCoordinator(
+                            context, operatorID, 
shouldCoordinatorFailAtSecondCheckpoint);
+                }
+            };
+        }
+
+        @Override
+        public <T extends StreamOperator<Integer>> T createStreamOperator(
+                StreamOperatorParameters<Integer> parameters) {
+            final OperatorID operatorId = 
parameters.getStreamConfig().getOperatorID();
+            OperatorEventGateway gateway =
+                    
parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+            EventSendingOperator operator =
+                    new EventSendingOperator(gateway, 
shouldOperatorFailAtSecondCheckpoint);
+            operator.setup(
+                    parameters.getContainingTask(),
+                    parameters.getStreamConfig(),
+                    parameters.getOutput());
+            return (T) operator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return EventSendingOperator.class;
+        }
+    }
+
+    /**
+     * A coordinator that listens on received integer events. It would store 
received integers into
+     * a global list and its snapshots. It can fail during the second 
checkpoint process if
+     * required.
+     */
+    private static class EventReceivingCoordinator implements 
OperatorCoordinator {
+
+        /** A global list that records the integers that the test coordinator 
has received. */
+        private static final List<Integer> RECEIVED_INTEGERS = new 
ArrayList<>();
+
+        private final Context context;
+
+        private final TestScript testScript;
+
+        private final ExecutorService mailboxExecutor;
+
+        private final Map<Long, List<Integer>> attemptedCheckpointValueMap;
+
+        private final boolean shouldFailAtSecondCheckpoint;
+
+        private boolean isFirstCheckpointCompleted;
+
+        private boolean isSecondCheckpointCompleted;
+
+        private EventReceivingCoordinator(
+                Context context, OperatorID operatorID, boolean 
shouldFailAtSecondCheckpoint) {
+            this.context = context;
+            this.testScript = 
TestScript.getForOperator("EventReceivingCoordinator");
+            this.shouldFailAtSecondCheckpoint = shouldFailAtSecondCheckpoint;
+            this.isFirstCheckpointCompleted = false;
+            this.isSecondCheckpointCompleted = false;
+
+            this.attemptedCheckpointValueMap = new HashMap<>();
+
+            this.mailboxExecutor =
+                    Executors.newSingleThreadExecutor(
+                            new DispatcherThreadFactory(
+                                    Thread.currentThread().getThreadGroup(),
+                                    "Coordinator Mailbox for " + operatorID));
+        }
+
+        @Override
+        public void start() throws Exception {}
+
+        @Override
+        public void close() throws Exception {
+            mailboxExecutor.shutdownNow();
+            assertThat(mailboxExecutor.awaitTermination(10, 
TimeUnit.MINUTES)).isTrue();
+        }
+
+        @Override
+        public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event)
+                throws Exception {
+            if (subtask != 0 || !(event instanceof IntegerEvent)) {
+                throw new Exception(
+                        String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
+            }
+
+            runInMailbox(() -> RECEIVED_INTEGERS.add(((IntegerEvent) 
event).value));
+        }
+
+        @Override
+        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture)
+                throws Exception {
+            runInMailbox(
+                    () -> {
+                        if (isFirstCheckpointCompleted
+                                && !isSecondCheckpointCompleted
+                                && shouldFailAtSecondCheckpoint
+                                && !testScript.hasAlreadyFailed()) {
+                            testScript.recordHasFailed();
+                            context.failJob(new Exception("test failure"));
+                            resultFuture.completeExceptionally(new 
Exception("test failure"));
+                            return;
+                        }
+
+                        attemptedCheckpointValueMap.put(
+                                checkpointId, new 
ArrayList<>(RECEIVED_INTEGERS));
+
+                        ByteBuffer byteBuffer = 
ByteBuffer.allocate(RECEIVED_INTEGERS.size() * 4);
+                        for (int i : RECEIVED_INTEGERS) {
+                            byteBuffer.putInt(i);
+                        }
+                        resultFuture.complete(byteBuffer.array());
+                    });
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            runInMailbox(
+                    () -> {
+                        if (!isFirstCheckpointCompleted) {
+                            isFirstCheckpointCompleted = true;
+                        } else if (!isSecondCheckpointCompleted) {
+                            isSecondCheckpointCompleted = true;
+                        }
+                    });
+        }
+
+        @Override
+        public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) {
+            runInMailbox(
+                    () -> {
+                        isFirstCheckpointCompleted = true;
+
+                        RECEIVED_INTEGERS.clear();
+
+                        if (checkpointData == null) {
+                            return;
+                        }
+
+                        ByteBuffer byteBuffer = 
ByteBuffer.wrap(checkpointData);
+                        for (int i = 0; i < checkpointData.length / 4; i++) {
+                            RECEIVED_INTEGERS.add(byteBuffer.getInt());
+                        }
+
+                        attemptedCheckpointValueMap.put(

Review Comment:
   Is this line needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to