[ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683866#comment-15683866 ]
ASF GitHub Bot commented on FLINK-5096: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2845#discussion_r88910876 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java --- @@ -638,6 +644,231 @@ public void flatMap(Tuple2<Integer, String> value, Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + testHarness.processElement(new StreamRecord<>("test1", 1L)); // we have a bucket size of 5 bytes --- End diff -- the comment should include the conclusion the reader is supposed to arrive on: that every record goes in a separate file. > Make the RollingSink rescalable. > -------------------------------- > > Key: FLINK-5096 > URL: https://issues.apache.org/jira/browse/FLINK-5096 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Integrate the RollingSink with the new state abstractions so that its > parallelism can change after restoring from a savepoint. -- This message was sent by Atlassian JIRA (v6.3.4#6332)