[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533663#comment-15533663 ]
ASF GitHub Bot commented on FLINK-4329: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204828 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } + private class TimeUpdatingThread extends Thread { + + private volatile boolean isRunning; + + private final TestTimeServiceProvider timeServiceProvider; + private final OneInputStreamOperatorTestHarness testHarness; + private final long wmInterval; + private final int elementUntilUpdating; + + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider, + final OneInputStreamOperatorTestHarness testHarness, + final long wmInterval, + final int elementUntilUpdating) { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + } + + @Override + public void run() { + try { + while (isRunning) { + if (testHarness.getOutput().size() % elementUntilUpdating == 0) { + long now = timeServiceProvider.getCurrentProcessingTime(); + timeServiceProvider.setCurrentTime(now + wmInterval); + } + } + } catch (Exception e) { + e.printStackTrace(); --- End diff -- This will not result in any meaningful feedback to the test. > Fix Streaming File Source Timestamps/Watermarks Handling > -------------------------------------------------------- > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Aljoscha Krettek > Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)