AHeise commented on a change in pull request #17050:
URL: https://github.com/apache/flink/pull/17050#discussion_r699178562



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java
##########
@@ -17,130 +17,109 @@
 
 package org.apache.flink.test.streaming.api;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests that watermarks are emitted while file is being read, particularly 
the last split.
  *
  * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-19109";>FLINK-19109</a>
  */
-public class FileReadingWatermarkITCase {
-    private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks";
-    private static final String RUNTIME_ACC_NAME = "runtime";
-    private static final int FILE_SIZE_LINES = 5_000_000;
-    private static final int WATERMARK_INTERVAL_MILLIS = 10;
-    private static final int MIN_EXPECTED_WATERMARKS = 5;
+public class FileReadingWatermarkITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileReadingWatermarkITCase.class);
+
+    private static final int WATERMARK_INTERVAL_MILLIS = 1_000;
+    private static final int EXPECTED_WATERMARKS = 5;
 
     @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+    private SharedReference<CountDownLatch> latch;
+
+    @Before
+    public void setUp() {
+        latch = sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS));
+    }

Review comment:
       Here is a bigger rewrite of the test that removes a few extra 
classes/fields and works around deprecations. I can't attach that to the whole 
file though. Ping me offline if you want to have more detailed feedback.
   
   ```suggestion
       @Rule public final SharedObjects sharedObjects = SharedObjects.create();
   
       /**
        * Adds an infinite split that causes the input of {@link
        * 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator} 
to instantly go
        * idle while data is still being processed.
        *
        * <p>Before FLINK-19109, watermarks would not be emitted at this point.
        */
       @Test
       public void testWatermarkEmissionWithChaining() throws Exception {
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
           env.getConfig().setAutoWatermarkInterval(WATERMARK_INTERVAL_MILLIS);
   
           SharedReference<CountDownLatch> latch =
                   sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS));
           checkState(env.isChainingEnabled());
           env.createInput(new InfiniteIntegerInputFormat(true))
                   .assignTimestampsAndWatermarks(
                           WatermarkStrategy.<Integer>forMonotonousTimestamps()
                                   .withTimestampAssigner(context -> 
getExtractorAssigner()))
                   .addSink(getWatermarkCounter(latch));
   
           env.executeAsync();
           latch.get().await();
       }
   
       private static TimestampAssigner<Integer> getExtractorAssigner() {
           return new TimestampAssigner<Integer>() {
   
               private long counter = 1;
   
               @Override
               public long extractTimestamp(Integer element, long 
recordTimestamp) {
                   return counter++;
               }
           };
       }
   
       private static SinkFunction<Integer> getWatermarkCounter(
               final SharedReference<CountDownLatch> latch) {
           return new RichSinkFunction<Integer>() {
   
               @Override
               public void invoke(Integer value, SinkFunction.Context context) {
                   try {
                       Thread.sleep(1000);
                       LOG.info("Sink received record");
                   } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                   }
               }
   
               @Override
               public void writeWatermark(Watermark watermark) {
                   LOG.info("Sink received watermark {}", watermark);
                   latch.get().countDown();
               }
           };
       }
   ```

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java
##########
@@ -17,130 +17,109 @@
 
 package org.apache.flink.test.streaming.api;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests that watermarks are emitted while file is being read, particularly 
the last split.
  *
  * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-19109";>FLINK-19109</a>
  */
-public class FileReadingWatermarkITCase {
-    private static final String NUM_WATERMARKS_ACC_NAME = "numWatermarks";
-    private static final String RUNTIME_ACC_NAME = "runtime";
-    private static final int FILE_SIZE_LINES = 5_000_000;
-    private static final int WATERMARK_INTERVAL_MILLIS = 10;
-    private static final int MIN_EXPECTED_WATERMARKS = 5;
+public class FileReadingWatermarkITCase extends TestLogger {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileReadingWatermarkITCase.class);
+
+    private static final int WATERMARK_INTERVAL_MILLIS = 1_000;
+    private static final int EXPECTED_WATERMARKS = 5;
 
     @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+    private SharedReference<CountDownLatch> latch;
+
+    @Before
+    public void setUp() {
+        latch = sharedObjects.add(new CountDownLatch(EXPECTED_WATERMARKS));
+    }

Review comment:
       When I checked the execution of the test I haven't noticed any 
difference. According to docs, it is also meant as a replacement for the 
deprecated `AssignerWith*Watermarks`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to