[ 
https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16185574#comment-16185574
 ] 

ASF GitHub Bot commented on FLINK-7728:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/4747

    [FLINK-7728] [DataStream] Flush StatusWatermarkValve once all inputs become 
idle 

    ## What is the purpose of the change
    
    This PR is based on #4738. Only the last three commits are relevant.
    
    Prior to this PR, once all inputs of the `StatusWatermarkValve` becomes 
idle, we only emit the `StreamStatus.IDLE marker`, and check nothing else. This 
makes the watermark advancement behaviour inconsistent in the case that all 
inputs become idle, depending on the order that they become idle.
    
    This PR fixes this by "flushing" the max watermark across all channels once 
all inputs become idle. At a high-level, what this means for downstream 
operators is that all inputs have become idle and will temporarily cease to 
advance their watermarks, so they can safely advance their event time to 
whatever the largest watermark is.
    
    This PR also includes changes to `StatusWatermarkValveTest` to cover the 
above mentioned case, as well as reworking the unit tests to be more 
fine-grained with small enough test case scope, so that we have a better 
overview of what cases are covered.
    
    ## Brief change log
    
    - d11d98d preliminary cleanup of 
`StatusWatermarkValveTest#BufferedValveOutputHandler`. Previous implementation 
was overly complex and could not provide test behaviors that we required.
    - 0e386da main change for this PR. Includes a new test in 
`StatusWatermarkValve` to cover the missing case. That test does not pass 
without this change.
    -  b977c18 refactor unit tests to be more fine-grained scoped.
    
    
    ## Verifying this change
    
    `StatusWatermarkValveTest`, `OneInputStreamTaskTest`, 
`TwoInputStreamTaskTest` should be able to verify this change.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): **YES**
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? n/a
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-7728

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4747.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4747
    
----
commit a9ce90aaee99374513941270b804de2f5564c47e
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-09-27T18:05:21Z

    [FLINK-7721] [DataStream] Only emit new min watermark iff it was aggregated 
from watermark-aligned inputs
    
    Prior to this commit, In the calculation of the new min watermark in
    StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels(),
    there is no verification that the calculated new min watermark
    really is aggregated from some aligned channel.
    
    In the corner case where all input channels are currently not aligned
    but actually some are active, we would then incorrectly determine that
    the final aggregation is Long.MAX_VALUE and emit that.
    
    This commit fixes this by only emitting the aggregated watermark iff it was
    really calculated from some aligned input channel (as well as the
    already existing constraint that it needs to be larger than the last
    emitted watermark). This change should also safely cover the case that a
    Long.MAX_VALUE was genuinely aggregated from the input channels.

commit d11d98dda4723760e7d9fb3b9680a1e9daca3705
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-09-28T12:11:22Z

    [FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in 
StatusWatermarkValveTest
    
    The previous implementation was overly complicated. Having separate
    buffers for the StreamStatus and Watermarks is not required for our
    tests. Also, that design doesn't allow checking the order StreamStatus /
    Watermarks are emitted from a single input to the valve.
    
    This commit reworks it by buffering both StreamStatus and Watermarks in
    a shared queue.

commit 0e386dab2fc19df78276ce203ed8f38792145759
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-09-28T14:49:22Z

    [FLINK-7728] [DataStream] Flush max watermark across all inputs once all 
become idle
    
    Prior to this commit, once all inputs of the StatusWatermarkValve
    becomes idle, we only emit the StreamStatus.IDLE marker, and check
    nothing else. This makes the watermark advancement behaviour
    inconsistent in the case that all inputs become idle depending on the
    order that they become idle.
    
    This commit fixes this by "flushing" the max watermark across all
    channels once all inputs become idle. At a high-level, what this means
    for downstream operators is that all inputs have become idle and will
    temporariliy cease to advance their watermarks, so they can safely
    advance their event time to whatever the largest watermark is.

commit b977c18c802fcb0060b47d7079bf214da7e764bb
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-09-29T09:16:22Z

    [FLINK-7728] [DataStream] Make StatusWatermarkValve unit tests more 
fine-grained
    
    Previously, the unit tests in StatusWatermarkValveTest were too
    cluttered and testing too many behaviours in a single test. This makes
    it hard to have a good overview of what test cases are covered.
    
    This commit is a rework of the previous tests, making them more
    fine-grained so that the scope of each test is small enough. All
    previously tested behaviours are still covered.

----


> StatusWatermarkValve has different min watermark advancement behavior 
> depending on the ordering inputs become idle
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7728
>                 URL: https://issues.apache.org/jira/browse/FLINK-7728
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.1, 1.4.0, 1.3.2
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.2, 1.4.0, 1.3.3
>
>
> Currently, once all inputs of a {{StatusWatermarkValve}} becomes idle, we 
> only emit the {{StreamStatus.IDLE}} marker, and check nothing else. This 
> makes the watermark advancement behavior inconsistent in the case that all 
> inputs become idle, depending on the order that they become idle.
> Consider the following setup:
> {code}
> Channel #1: Watermark 10, ACTIVE
> Channel #2: Watermark 5, ACTIVE
> Channel #3: Watermark 5, ACTIVE
> {code}
> If the channels become IDLE with the order #2 -> #3 -> #1, watermark 10 will 
> be emitted as the new min watermark once both #2 and #3 becomes idle.
> On the other hand, if the order is #1 -> #2 -> #3, watermark 10 will not be 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to