guoweiM commented on a change in pull request #13617:

File path: 
@@ -0,0 +1,345 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators.sink;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import static org.junit.Assert.assertEquals;
+ * Test the writer operator.
+ */
+public class WriterOperatorTest {
+       @Test
+       public void testStatelessWriter() throws Exception {
+               final long initialTime = 0;
+               final Supplier<StreamOperatorFactory<Tuple3<Integer, Long, 
Long>>> factorySupplier =
+                       () -> new StatelessWriterOperatorFactory<>(new 
+               final ConcurrentLinkedQueue<Object> expectedPreCommitOutput =
+                       new ConcurrentLinkedQueue<>(
+                               Arrays.asList(
+                                       new Watermark(initialTime),
+                                       new StreamRecord<>(Tuple3.of(1, 
initialTime + 1, initialTime)),
+                                       new StreamRecord<>(Tuple3.of(2, 
initialTime + 2, initialTime))
+                               )
+                       );
+               final ConcurrentLinkedQueue<Object> expectedEndOutput = new 
+               expectedEndOutput.add(new 
+               final Consumer<OneInputStreamTaskTestHarness<Integer, 
Tuple3<Integer, Long, Long>>> process = task -> {
+                       task.processElement(new Watermark(initialTime));
+                       task.processElement(new StreamRecord<>(1, initialTime + 
+                       task.processElement(new StreamRecord<>(2, initialTime + 
+               };
+               final TaskStateSnapshot subtaskStates = processElements(
+                       null,
+                       factorySupplier,
+                       process,
+                       output -> 
Arrays.equals(expectedPreCommitOutput.toArray(), output),
+                       output -> Arrays.equals(expectedEndOutput.toArray(), 
+               // test after restoring
+               processElements(
+                       subtaskStates,
+                       factorySupplier,
+                       process,
+                       output -> 
Arrays.equals(expectedPreCommitOutput.toArray(), output),
+                       output -> Arrays.equals(expectedEndOutput.toArray(), 
+       }
+       @Test
+       public void testStatefulWriter() throws Exception {
+               final long initialTime = 0;
+               final Supplier<StreamOperatorFactory<Tuple3<Integer, Long, 
Long>>> factorySupplier = () -> new StatefulWriterOperatorFactory<>(new 
+               final ConcurrentLinkedQueue<Object> expectedEndOutput1 =
+                       new ConcurrentLinkedQueue<>(
+                               Arrays.asList(
+                                       new StreamRecord<>(Tuple3.of(1, 
initialTime + 1, Long.MIN_VALUE)),
+                                       new StreamRecord<>(Tuple3.of(2, 
initialTime + 2, Long.MIN_VALUE)),
+                                       new 
+                               )
+                       );
+               final Consumer<OneInputStreamTaskTestHarness<Integer, 
Tuple3<Integer, Long, Long>>> process1 = task -> {
+                       task.processElement(new StreamRecord<>(1, initialTime + 
+                       task.processElement(new StreamRecord<>(2, initialTime + 
+               };
+               final TaskStateSnapshot subtaskStates = processElements(null,
+                       factorySupplier,
+                       process1,
+                       output -> Arrays.equals(new StreamRecord[0], output),
+                       output -> Arrays.equals(expectedEndOutput1.toArray(), 
+               final Consumer<OneInputStreamTaskTestHarness<Integer, 
Tuple3<Integer, Long, Long>>> process2 =
+                       task -> task.processElement(new StreamRecord<>(3, 
initialTime + 3));
+               final ConcurrentLinkedQueue<Object> expectedPreCommitOutput2 = 
new ConcurrentLinkedQueue<>();
+               expectedPreCommitOutput2.add(new StreamRecord<>(Tuple3.of(1, 
initialTime + 1, Long.MIN_VALUE)));
+               expectedPreCommitOutput2.add(new StreamRecord<>(Tuple3.of(2, 
initialTime + 2, Long.MIN_VALUE)));
+               expectedPreCommitOutput2.add(new StreamRecord<>(Tuple3.of(3, 
initialTime + 3, Long.MIN_VALUE)));
+               final ConcurrentLinkedQueue<Object> expectedEndOutput2 = new 
+               expectedEndOutput2.add(new 
+               processElements(subtaskStates,
+                       factorySupplier,
+                       process2,
+                       output -> 
Arrays.equals(expectedPreCommitOutput2.toArray(), output),
+                       output -> Arrays.equals(expectedEndOutput2.toArray(), 
+       }
+       private TaskStateSnapshot processElements(
+               @Nullable TaskStateSnapshot taskStateSnapshot,
+               Supplier<StreamOperatorFactory<Tuple3<Integer, Long, Long>>> 
+               Consumer<OneInputStreamTaskTestHarness<Integer, Tuple3<Integer, 
Long, Long>>> process,
+               Predicate<Object[]> verifyPreCommitOutput,
+               Predicate<Object[]> verifyEndOutput) throws Exception {
+               final long checkpointId = 1L;
+               final long checkpointTimestamp = 1L;
+               final OneInputStreamTaskTestHarness<Integer, Tuple3<Integer, 
Long, Long>> testHarness = new OneInputStreamTaskTestHarness<>(
+                       OneInputStreamTask::new,
+                       1, 1,
+                       BasicTypeInfo.INT_TYPE_INFO, 
TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Long.class, Long.class));
+               if (taskStateSnapshot != null) {
+                       testHarness.setTaskStateSnapshot(checkpointId, 
+               }
+               testHarness.setupOutputForSingletonOperatorChain();
+               final StreamConfig streamConfig = testHarness.getStreamConfig();
+               final OperatorID operatorID = new OperatorID(38L, 3801L);
+               streamConfig.setOperatorID(operatorID);
+               streamConfig.setStreamOperatorFactory(factorySupplier.get());
+               final TestTaskStateManager taskStateManagerMock = 
+               taskStateManagerMock.setWaitForReportLatch(new OneShotLatch());
+               testHarness.invoke();
+               testHarness.waitForTaskRunning();
+               final OneInputStreamTask<Integer, Tuple3<Integer, Long, Long>> 
task = testHarness.getTask();
+               process.accept(testHarness);
+               testHarness.waitForInputProcessing();
+               final CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, checkpointTimestamp);
+               // pre-commit
+               task.triggerCheckpointAsync(checkpointMetaData, 
CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+               taskStateManagerMock.getWaitForReportLatch().await();
+               assertEquals(checkpointId, 
+               // remove CheckpointBarrier which is not expected
+               testHarness.getOutput()
+                       .removeIf(record -> record instanceof 
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+               AbstractWriterOperator<Integer, Tuple3<Integer, Long, Long>> s 
=  testHarness.getHeadOperator();
+               DummyWriter writer = (DummyWriter) s.getWriter();
+               Assert.assertTrue(writer.isClosed());
+               // set the operator state from previous attempt into the 
restored one
+               return 
+       }
+       static final class StatelessWriterSink implements TestSink<Integer, 
Tuple3<Integer, Long, Long>, Tuple3<Integer, Long, Long>, Void> {
+               @Override
+               public Writer<Integer, Tuple3<Integer, Long, Long>, 
Tuple3<Integer, Long, Long>> createWriter(
+                       InitContext context, List<Tuple3<Integer, Long, Long>> 
states) {
+                       return new DummyWriter();
+               }
+       }
+       static final class StatefulWriterSink implements TestSink<Integer, 
Tuple3<Integer, Long, Long>, Tuple3<Integer, Long, Long>, Void> {
+               @Override
+               public Writer<Integer, Tuple3<Integer, Long, Long>, 
Tuple3<Integer, Long, Long>> createWriter(InitContext context, 
List<Tuple3<Integer, Long, Long>> states) {
+                       return new DummyWriter(3, states);
+               }
+               @Override
+               public Optional<SimpleVersionedSerializer<Tuple3<Integer, Long, 
Long>>> getWriterStateSerializer() {
+                       return Optional.of(new WriterStateSerializer());
+               }
+       }
+       static final class DummyWriter
+               implements Writer<Integer, Tuple3<Integer, Long, Long>, 
Tuple3<Integer, Long, Long>> {
+               static final Tuple3<Integer, Long, Long> LAST_ELEMENT = 
Tuple3.of(Integer.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE);
+               private boolean isClosed;
+               private final int maxCacheElementNum;
+               // element, timestamp, watermark
+               private List<Tuple3<Integer, Long, Long>> elements;
+               DummyWriter(int maxCacheElementNum, List<Tuple3<Integer, Long, 
Long>> restoreElements) {
+                       this.isClosed = false;
+                       this.elements = new ArrayList<>(restoreElements);
+                       this.maxCacheElementNum = maxCacheElementNum;
+               }
+               DummyWriter() {
+                       this(0, Collections.emptyList());
+               }
+               @Override
+               public void write(Integer element, Context context) {
+                       elements.add(Tuple3.of(element, context.timestamp(), 
+               }
+               @Override
+               public List<Tuple3<Integer, Long, Long>> prepareCommit(boolean 
flush) {
+                       final List<Tuple3<Integer, Long, Long>> r = elements;
+                       if (flush) {
+                               elements.add(LAST_ELEMENT);
+                               return elements;
+                       } else if (elements.size() >= maxCacheElementNum) {
+                               elements = new ArrayList<>();
+                               return r;
+                       } else {
+                               return Collections.emptyList();
+                       }
+               }
+               @Override
+               public List<Tuple3<Integer, Long, Long>> snapshotState() {
+                       return elements;

Review comment:
       This is by design. The `List` only be flush when the number is larger 
than `maxCacheElementNum`.

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:

Reply via email to