[ https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663715#comment-15663715 ]
ASF GitHub Bot commented on FLINK-5017: --------------------------------------- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2801 [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources This PR is the first part of making temporarily idle sources in Flink possible, by adding a new `StreamStatus` element that flows with the records. The second part, allowing source operators to emit `StreamStatus` elements, will be submitted as a separate PR based on this one. `StreamStatus` elements are generated at the sources, and affect how operators advance their watermarks with the presence of idle sources. Prior to this PR, when advancing watermarks at downstream operators, the new min watermark is found by simply determining if the min watermark across all input channels has advanced. This resulted in watermark-stalling downstream operators when there are idle sources. With this change, operators can now mark input channels to be idle, and ignore them when advancing their watermark. # Design Choices ### `StreamStatus` are only generated at sources Sources guarantee that no records or watermarks will be emitted between a `IDLE` and `ACTIVE` (this would be implemented in the second part PR). All downstream operators need to process statuses that they receive, and appropriately forward status changes. `StreamStatus` can not be generated mid-topology. ### Using two status markers - `IDLE` and `ACTIVE` We need 2 markers, instead of only a `IDLE`, because operators at the `AbstractStreamOperator` level need to clearly know the end and start of idleness. I had considered using only a `IDLE` marker and seeing any watermarks / records received afterwards as a sign of resuming to be active, but that would deny us of correctly determining whether or not watermarks generated at timestamp extractors in the middle of topologies should actually be ignored (watermarks generated in the middle of topologies need to be ignored when the operator is actually idle, because they can be generated even if sources are idle and records aren't flowing through). Despite 2 markers, I plan to only have a single new `markAsTemporarilyIdle()` method on `SourceContext`s that serve as the only means for user source functions to express that the source is idle (included in second part PR). `SourceContext` implementations are responsible for controlling how actual `StreamStatus` elements are sent downstream. ### Consolidate watermark / status forwarding as a `StatusWatermarkValve` Since the forwarding logic is rather complex now with this change, all forwarding logic is bundled into a "valve", that `OneInputStreamTask`, `TwoInputStreamTask`, and `AbstractStreamOperator` use to control watermark and status forwarding. `StatusWatermarkValve` takes a implementation of `ValveOutputHandler`. Implementations decide what to do when a watermark or status is emitted from the valve. For example, `OneInputStreamTask` and `TwoInputStreamTask` simply forwards it to the head operator; `AbstractStreamOperator` needs to advance timers when the valve outputs a new watermark. I didn't want to use the `Output` interface, because record elements and latency markers have nothing to do with the valve's control logic; they are always simply forwarded. ### Adding a `setup()` life cycle method to `StreamInputProcessor` and `StreamTwoInputProcessor` This is mainly to facilitate creating `StatusWatermarkValve`s in the input processors. They need reference to the head operator when being created. I considered passing it in to input processors as a constructor argument in `init()`, but the `StreamIterationTail` operator forbids doing so, because for that operator, the `headOperator` is created only after `init()`. We could also consider moving creation of input processors into the beginning of `run()`, which would avoid the need of a `setup()` method on the input processors, but I wasn't sure if that would break anything. ### Block watermarks generated mid-topology at the `AbstractStreamOperator` level when idle Two catches for this: - `processStreamStatus` must NOT be overriden by concrete implementations. We need to rely on that to correctly block watermarks generated by timestamp extractors that emit watermarks completely bypassing the valve's forwarding logic. - The current implementation only works for one-input stream operators that generate watermarks. Since we don't seem to have two-input stream operators that generate watermarks also, this should be fine for now. # Testing - Add new test in `AbstractStreamOperatorTest` to test that for concrete one-input operators that bypass the valve and directly emit watermarks, the watermarks are blocked if the operator is idle. - Unit tests for `StatusWatermarkValve` for complex forwarding cases. - Extended `testWatermarkForwarding` tests in both `OneInputStreamTaskTest` and `TwoInputStreamTaskTest` to also test stream status forwarding. They are relatively simple compared to unit test for `StatusWatermarkValve`, just to implicitly ensure that tasks are using valves correctly. I plan to add IT tests as second part PR, when source operators can start emitting `StreamStatus`. # Other Remarks While working on this task, I have a feeling that perhaps we can consider to start differentiating between what process element methods can be overriden by concrete operator implementations, and what can't. For example, it would be best if `AbstractStreamOperator#processStreamStatus()` method can be `final`, to keep Stream Status processing logic away from concrete implementations, forbidding any possibility of overriding that. Right now this isn't possible, as we need to tie processing methods to the `OneInputStreamOperator` / `TwoInputStreamOperator` interfaces for a mixin pattern with the `AbstractStreamOperator`. What we could probably do, is to let input processors access the head operator as a `AbstractStreamOperator` so that the processing methods only visible at the abstract level can be called, and we keep only the processing methods we allow to override in `OneInputStreamOperator` / `TwoInputStreamOperator`. Processing methods that I think should only be visible at the abstract level are: `processElement()`, `processStreamStatus()`, and `processLatencyMarker()`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2801.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2801 ---- commit c70d4dca464220ae63f596a474bdd4d957934838 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Date: 2016-11-14T02:53:18Z [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources This commit is the first part of making idle streaming sources in Flink possible. It introduces a new element, StreamStatus, that flows with other records in streams. StreamStatus elements are generated at the sources, and affect how operators advance their watermarks with the presence of idle sources. Prior to this commit, when advancing watermarks at downstream operators, the new min watermark is found by simply determining if the min watermark across all input channels has advanced. This resulted in watermark-stalling downstream operators when there are idle sources. With this change, operators can now mark input channels to be idle, and ignore them when advancing their watermark. This commit also includes refactoring of previous watermark forwarding logic into a single class, StatusWatermarkVavle. OneInputStreamTasks, TwoInputStreamTasks, and AbstractStreamOperator use valves to help them determine how watermarks and stream statuses are forwarded. ---- > Introduce WatermarkStatus stream element to allow for temporarily idle > streaming sources > ---------------------------------------------------------------------------------------- > > Key: FLINK-5017 > URL: https://issues.apache.org/jira/browse/FLINK-5017 > Project: Flink > Issue Type: New Feature > Components: Streaming > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.2.0 > > Attachments: operator_chain_with_multiple_network_outputs.png > > > A {{WatermarkStatus}} element informs receiving operators whether or not they > should continue to expect watermarks from the sending operator. There are 2 > kinds of status, namely {{IDLE}} and {{ACTIVE}}. Watermark status elements > are generated at the sources, and may be propagated through the operators of > the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}. > Sources and downstream operators should emit either of the status elements > once it changes between "watermark-idle" and "watermark-active" states. > A source is considered "watermark-idle" if it will not emit records for an > indefinite amount of time. This is the case, for example, for Flink's Kafka > Consumer, where sources might initially have no assigned partitions to read > from, or no records can be read from the assigned partitions. Once the source > detects that it will resume emitting data, it is considered > "watermark-active". > Downstream operators with multiple inputs (ex. head operators of a > {{OneInputStreamTask}} or {{TwoInputStreamTask}}) should not wait for > watermarks from an upstream operator that is "watermark-idle" when deciding > whether or not to advance the operator's current watermark. When a downstream > operator determines that all upstream operators are "watermark-idle" (i.e. > when all input channels have received the watermark idle status element), > then the operator is considered to also be "watermark-idle", as it will > temporarily be unable to advance its own watermark. This is always the case > for operators that only read from a single upstream operator. Once an > operator is considered "watermark-idle", it should itself forward its idle > status to inform downstream operators. The operator is considered to be back > to "watermark-active" as soon as at least one of its upstream operators > resume to be "watermark-active" (i.e. when at least one input channel > receives the watermark active status element), and should also forward its > active status to inform downstream operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)