[ 
https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683866#comment-15683866
 ] 

ASF GitHub Bot commented on FLINK-5096:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2845#discussion_r88910876
  
    --- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 ---
    @@ -638,6 +644,231 @@ public void flatMap(Tuple2<Integer, String> value,
                Assert.assertEquals(8, numFiles);
        }
     
    +   private static final String PART_PREFIX = "part";
    +   private static final String PENDING_SUFFIX = ".pending";
    +   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
    +   private static final String VALID_LENGTH_SUFFIX = ".valid";
    +
    +   @Test
    +   public void testBucketStateTransitions() throws Exception {
    +           final File outDir = tempFolder.newFolder();
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createRescalingTestSink(outDir, 1, 0);
    +           testHarness.setup();
    +           testHarness.open();
    +
    +           testHarness.setProcessingTime(0L);
    +
    +           testHarness.processElement(new StreamRecord<>("test1", 1L)); // 
we have a bucket size of 5 bytes
    --- End diff --
    
    the comment should include the conclusion the reader is supposed to arrive 
on: that every record goes in a separate file.


> Make the RollingSink rescalable.
> --------------------------------
>
>                 Key: FLINK-5096
>                 URL: https://issues.apache.org/jira/browse/FLINK-5096
>             Project: Flink
>          Issue Type: Improvement
>          Components: filesystem-connector
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its 
> parallelism can change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to