[ 
https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663715#comment-15663715
 ] 

ASF GitHub Bot commented on FLINK-5017:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2801

    [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources

    This PR is the first part of making temporarily idle sources in Flink 
possible, by adding a new `StreamStatus` element that flows with the records. 
The second part, allowing source operators to emit `StreamStatus` elements, 
will be submitted as a separate PR based on this one.
    
    `StreamStatus` elements are generated at the sources, and affect how 
operators advance their watermarks with the presence of idle sources.
    
    Prior to this PR, when advancing watermarks at downstream operators, the 
new min watermark is found by simply determining if the min watermark across 
all input channels has advanced. This resulted in
    watermark-stalling downstream operators when there are idle sources.  With 
this change, operators can
    now mark input channels to be idle, and ignore them when advancing their 
watermark.
    
    # Design Choices
    
    ### `StreamStatus` are only generated at sources
    
    Sources guarantee that no records or watermarks will be emitted between a 
`IDLE` and `ACTIVE` (this would be implemented in the second part PR). All 
downstream operators need to process statuses that they receive, and 
appropriately forward status changes. `StreamStatus` can not be generated 
mid-topology.
    
    ### Using two status markers - `IDLE` and `ACTIVE`
    
    We need 2 markers, instead of only a `IDLE`, because operators at the 
`AbstractStreamOperator` level need to clearly know the end and start of 
idleness. I had considered using only a `IDLE` marker and seeing any watermarks 
/ records received afterwards as a sign of resuming to be active, but that 
would deny us of correctly determining whether or not watermarks generated at 
timestamp extractors in the middle of topologies should actually be ignored 
(watermarks generated in the middle of topologies need to be ignored when the 
operator is actually idle, because they can be generated even if sources are 
idle and records aren't flowing through).
    
    Despite 2 markers, I plan to only have a single new 
`markAsTemporarilyIdle()` method on `SourceContext`s that serve as the only 
means for user source functions to express that the source is idle (included in 
second part PR). `SourceContext` implementations are responsible for 
controlling how actual `StreamStatus` elements are sent downstream.
    
    ### Consolidate watermark / status forwarding as a `StatusWatermarkValve`
    
    Since the forwarding logic is rather complex now with this change, all 
forwarding logic is bundled into a "valve", that `OneInputStreamTask`, 
`TwoInputStreamTask`, and `AbstractStreamOperator` use to control watermark and 
status forwarding.
    
    `StatusWatermarkValve` takes a implementation of `ValveOutputHandler`. 
Implementations decide what to do when a watermark or status is emitted from 
the valve. For example, `OneInputStreamTask` and `TwoInputStreamTask` simply 
forwards it to the head operator; `AbstractStreamOperator` needs to advance 
timers when the valve outputs a new watermark.
    
    I didn't want to use the `Output` interface, because record elements and 
latency markers have nothing to do with the valve's control logic; they are 
always simply forwarded.
    
    ### Adding a `setup()` life cycle method to `StreamInputProcessor` and 
`StreamTwoInputProcessor`
    
    This is mainly to facilitate creating `StatusWatermarkValve`s in the input 
processors. They need reference to the head operator when being created. I 
considered passing it in to input processors as a constructor argument in 
`init()`, but the `StreamIterationTail` operator forbids doing so, because for 
that operator, the `headOperator` is created only after `init()`.
    
    We could also consider moving creation of input processors into the 
beginning of `run()`, which would avoid the need of a `setup()` method on the 
input processors, but I wasn't sure if that would break anything.
    
    ### Block watermarks generated mid-topology at the `AbstractStreamOperator` 
level when idle
    
    Two catches for this:
    
    - `processStreamStatus` must NOT be overriden by concrete implementations. 
We need to rely on that to correctly block watermarks generated by timestamp 
extractors that emit watermarks completely bypassing the valve's forwarding 
logic.
    
    - The current implementation only works for one-input stream operators that 
generate watermarks. Since we don't seem to have two-input stream operators 
that generate watermarks also, this should be fine for now.
    
    # Testing
    
    - Add new test in `AbstractStreamOperatorTest` to test that for concrete 
one-input operators that bypass the valve and directly emit watermarks, the 
watermarks are blocked if the operator is idle.
    - Unit tests for `StatusWatermarkValve` for complex forwarding cases.
    - Extended `testWatermarkForwarding` tests in both `OneInputStreamTaskTest` 
and `TwoInputStreamTaskTest` to also test stream status forwarding. They are 
relatively simple compared to unit test for `StatusWatermarkValve`, just to 
implicitly ensure that tasks are using valves correctly.
    
    I plan to add IT tests as second part PR, when source operators can start 
emitting `StreamStatus`.
    
    # Other Remarks
    
    While working on this task, I have a feeling that perhaps we can consider 
to start differentiating between what process element methods can be overriden 
by concrete operator implementations, and what can't.
    
    For example, it would be best if 
`AbstractStreamOperator#processStreamStatus()` method can be `final`, to keep 
Stream Status processing logic away from concrete implementations, forbidding 
any possibility of overriding that.
    
    Right now this isn't possible, as we need to tie processing methods to the 
`OneInputStreamOperator` / `TwoInputStreamOperator` interfaces for a mixin 
pattern with the `AbstractStreamOperator`.
    
    What we could probably do, is to let input processors access the head 
operator as a `AbstractStreamOperator` so that the processing methods only 
visible at the abstract level can be called, and we keep only the processing 
methods we allow to override in `OneInputStreamOperator` / 
`TwoInputStreamOperator`.
    
    Processing methods that I think should only be visible at the abstract 
level are: `processElement()`, `processStreamStatus()`, and 
`processLatencyMarker()`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5017

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2801.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2801
    
----
commit c70d4dca464220ae63f596a474bdd4d957934838
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2016-11-14T02:53:18Z

    [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources
    
    This commit is the first part of making idle streaming sources in Flink 
possible. It introduces a new
    element, StreamStatus, that flows with other records in streams. 
StreamStatus elements are generated
    at the sources, and affect how operators advance their watermarks with the 
presence of idle sources.
    
    Prior to this commit, when advancing watermarks at downstream operators, 
the new min watermark is found
    by simply determining if the min watermark across all input channels has 
advanced. This resulted in
    watermark-stalling downstream operators when there are idle sources.  With 
this change, operators can
    now mark input channels to be idle, and ignore them when advancing their 
watermark.
    
    This commit also includes refactoring of previous watermark forwarding 
logic into a single class,
    StatusWatermarkVavle. OneInputStreamTasks, TwoInputStreamTasks, and 
AbstractStreamOperator use valves
    to help them determine how watermarks and stream statuses are forwarded.

----


> Introduce WatermarkStatus stream element to allow for temporarily idle 
> streaming sources
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-5017
>                 URL: https://issues.apache.org/jira/browse/FLINK-5017
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>         Attachments: operator_chain_with_multiple_network_outputs.png
>
>
> A {{WatermarkStatus}} element informs receiving operators whether or not they 
> should continue to expect watermarks from the sending operator. There are 2 
> kinds of status, namely {{IDLE}} and {{ACTIVE}}. Watermark status elements 
> are generated at the sources, and may be propagated through the operators of 
> the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}.
> Sources and downstream operators should emit either of the status elements 
> once it changes between "watermark-idle" and "watermark-active" states.
> A source is considered "watermark-idle" if it will not emit records for an 
> indefinite amount of time. This is the case, for example, for Flink's Kafka 
> Consumer, where sources might initially have no assigned partitions to read 
> from, or no records can be read from the assigned partitions. Once the source 
> detects that it will resume emitting data, it is considered 
> "watermark-active".
> Downstream operators with multiple inputs (ex. head operators of a 
> {{OneInputStreamTask}} or {{TwoInputStreamTask}}) should not wait for 
> watermarks from an upstream operator that is "watermark-idle" when deciding 
> whether or not to advance the operator's current watermark. When a downstream 
> operator determines that all upstream operators are "watermark-idle" (i.e. 
> when all input channels have received the watermark idle status element), 
> then the operator is considered to also be "watermark-idle", as it will 
> temporarily be unable to advance its own watermark. This is always the case 
> for operators that only read from a single upstream operator. Once an 
> operator is considered "watermark-idle", it should itself forward its idle 
> status to inform downstream operators. The operator is considered to be back 
> to "watermark-active" as soon as at least one of its upstream operators 
> resume to be "watermark-active" (i.e. when at least one input channel 
> receives the watermark active status element), and should also forward its 
> active status to inform downstream operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to