[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533662#comment-15533662 ]
ASF GitHub Bot commented on FLINK-4329: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204990 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -190,12 +213,30 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception { } content.add(element.getValue() + "\n"); } else if (line instanceof Watermark) { - Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE); + watermarkTimestamps.add(((Watermark) line).getTimestamp()); } else { Assert.fail("Unknown element in the list."); } } + // check if all the input was read + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines); + + // check if the last element is the LongMax watermark + Assert.assertTrue(lastElement instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp()); + + System.out.println(watermarkTimestamps.size()); --- End diff -- Leftover sysout printing. > Fix Streaming File Source Timestamps/Watermarks Handling > -------------------------------------------------------- > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Aljoscha Krettek > Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)