Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75304305 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); + Map<Integer, String> expectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.open(); + + timeServiceProvider.setCurrentTime(0); + + long elementTimestamp = 201; + timeServiceProvider.setCurrentTime(elementTimestamp); + + // test that a watermark is actually emitted + Assert.assertTrue(tester.getOutput().size() == 1 && + tester.getOutput().peek() instanceof Watermark && + ((Watermark) tester.getOutput().peek()).getTimestamp() == 200); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + for(FileInputSplit split: splits) { + tester.processElement(new StreamRecord<>(split)); + } + + /* + * Given that the reader is multithreaded, the test finishes before the reader thread finishes + * reading. This results in files being deleted by the test before being read, thus throwing an exception. + * In addition, even if file deletion happens at the end, the results are not ready for testing. + * To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s. + */ + + long start = System.currentTimeMillis(); + Queue<Object> output; + do { + output = tester.getOutput(); + Thread.sleep(50); + } while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000); --- End diff -- I wonder if this can lead to unstable tests (for example on Travis). What if the output needs more than one second to show up?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---