Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2350#discussion_r75304305
  
    --- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
    @@ -106,6 +109,140 @@ public static void destroyHDFS() {
        //                                              TESTS
     
        @Test
    +   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
    +           Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +           Map<Integer, String> expectedFileContents = new HashMap<>();
    +           for(int i = 0; i < NO_OF_FILES; i++) {
    +                   Tuple2<org.apache.hadoop.fs.Path, String> file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
    +                   filesCreated.add(file.f0);
    +                   expectedFileContents.put(i, file.f1);
    +           }
    +
    +           TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +           TypeInformation<String> typeInfo = 
TypeExtractor.getInputFormatTypes(format);
    +
    +           ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
    +
    +           StreamConfig streamConfig = new StreamConfig(new 
Configuration());
    +           
streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +
    +           ExecutionConfig executionConfig = new ExecutionConfig();
    +           executionConfig.setAutoWatermarkInterval(100);
    +
    +           TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
    +           OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
    +                   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider, streamConfig);
    +
    +           reader.setOutputType(typeInfo, new ExecutionConfig());
    +           tester.open();
    +
    +           timeServiceProvider.setCurrentTime(0);
    +
    +           long elementTimestamp = 201;
    +           timeServiceProvider.setCurrentTime(elementTimestamp);
    +
    +           // test that a watermark is actually emitted
    +           Assert.assertTrue(tester.getOutput().size() == 1 &&
    +                   tester.getOutput().peek() instanceof Watermark &&
    +                   ((Watermark) tester.getOutput().peek()).getTimestamp() 
== 200);
    +
    +           // create the necessary splits for the test
    +           FileInputSplit[] splits = format.createInputSplits(
    +                   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
    +
    +           // and feed them to the operator
    +           for(FileInputSplit split: splits) {
    +                   tester.processElement(new StreamRecord<>(split));
    +           }
    +
    +           /*
    +           * Given that the reader is multithreaded, the test finishes 
before the reader thread finishes
    +           * reading. This results in files being deleted by the test 
before being read, thus throwing an exception.
    +           * In addition, even if file deletion happens at the end, the 
results are not ready for testing.
    +           * To face this, we wait until all the output is collected or 
until the waiting time exceeds 1000 ms, or 1s.
    +           */
    +
    +           long start = System.currentTimeMillis();
    +           Queue<Object> output;
    +           do {
    +                   output = tester.getOutput();
    +                   Thread.sleep(50);
    +           } while ((output == null || output.size() != NO_OF_FILES * 
LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
    --- End diff --
    
    I wonder if this can lead to unstable tests (for example on Travis).
    What if the output needs more than one second to show up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to