[ 
https://issues.apache.org/jira/browse/FLINK-11286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16776540#comment-16776540
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-11286 at 2/25/19 5:57 AM:
----------------------------------------------------------------------

Please allow me to re-elaborate what I do agree with and what I don't agree 
with. I feel like the discussion is looping in circles and we keep on repeating 
the same arguments.

What I agree with:
# In some cases, like filtering within the pipeline, input channels become IDLE 
only after the filtering, and not at the source.
# However, these non-source operators are not capable of marking themselves as 
IDLE, only sources can do that right now.

What I don't agree with:
# The JIRA proposes that ALL operators should be able to mark themselves as 
IDLE. I DON'T agree with this.
# Only watermark-generating operators (i.e. source operators, 
`TimestampsAndPeriodicWatermarksOperator`, and 
`TimestampsAndPunctuatedWatermarksOperator`) are required to mark themselves as 
IDLE if some input channel is IDLE.
# Their idleness status should NOT be triggered by user code, like what is 
available on `SourceContext#markAsTemporarilyIdle()`. The sources can do that, 
because in certain cases they can deterministically know that they are indeed 
idle (e.g. a subtask is not assigned any Kafka partitions). This is not the 
case for watermark operators; the only way for them to detect idleness, is by 
some configured timeout of how much time has passed since one of their input 
channels did not provide input. This should be automatically done by the 
system. This is likewise to how idleness timeout works in `StreamSource` 
(although right now the timeout is always set to -1, meaning that automatic 
idleness detection triggered by timeouts is always disabled).

So, it really boils down to the following:
- Only sources can allow user code to mark the operator as temporarily idle. 
That is because only sources can identify specific cases that immediately 
defines idleness.
- All other operators need to rely on some timeout configuration to detect 
idleness, even the source themselves because it could be that the source didn't 
explicitly say it is IDLE, but it actually does not produce any records (e.g., 
some source subtask has a Kafka partition assigned, but the partition is empty)
- That being said, we do not need all operators to perform this timeout-based 
automatic idleness detection, because time-based operations such as windows 
always happen after some watermark-generating operator.
- Therefore, only watermark-generating operators should need to do the idleness 
detection. We already have that in the source operator; it is still missing in 
the `TimestampsAndPeriodicWatermarksOperator` and 
`TimestampsAndPunctuatedWatermarksOperator` operators.


was (Author: tzulitai):
Please allow me to re-elaborate what I do agree with and what I don't agree 
with. I feel like the discussion is looping in circles and we keep on repeating 
the same arguments.

What I agree with:
1. In some cases, like filtering within the pipeline, input channels become 
IDLE only after the filtering, and not at the source.
2. However, these non-source operators are not capable of marking themselves as 
IDLE, only sources can do that right now.

What I don't agree with:
1. The JIRA proposes that ALL operators should be able to mark themselves as 
IDLE. I DON'T agree with this.
2. Only watermark-generating operators (i.e. source operators, 
`TimestampsAndPeriodicWatermarksOperator`, and 
`TimestampsAndPunctuatedWatermarksOperator`) are required to mark themselves as 
IDLE if some input channel is IDLE.
3. Their idleness status should NOT be triggered by user code, like what is 
available on `SourceContext#markAsTemporarilyIdle()`. The sources can do that, 
because in certain cases they can deterministically know that they are indeed 
idle (e.g. a subtask is not assigned any Kafka partitions). This is not the 
case for watermark operators; the only way for them to detect idleness, is by 
some configured timeout of how much time has passed since one of their input 
channels did not provide input. This should be automatically done by the 
system. This is likewise to how idleness timeout works in `StreamSource` 
(although right now the timeout is always set to -1, meaning that automatic 
idleness detection triggered by timeouts is always disabled).

So, it really boils down to the following:
- Only sources can allow user code to mark the operator as temporarily idle. 
That is because only sources can identify specific cases that immediately 
defines idleness.
- All other operators need to rely on some timeout configuration to detect 
idleness, even the source themselves because it could be that the source didn't 
explicitly say it is IDLE, but it actually does not produce any records (e.g., 
some source subtask has a Kafka partition assigned, but the partition is empty)
- That being said, we do not need all operators to perform this timeout-based 
automatic idleness detection, because time-based operations such as windows 
always happen after some watermark-generating operator.
- Therefore, only watermark-generating operators should need to do the idleness 
detection. We already have that in the source operator; it is still missing in 
the `TimestampsAndPeriodicWatermarksOperator` and 
`TimestampsAndPunctuatedWatermarksOperator` operators.

> Support to send StreamStatus.IDLE for non-source operators 
> -----------------------------------------------------------
>
>                 Key: FLINK-11286
>                 URL: https://issues.apache.org/jira/browse/FLINK-11286
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: vinoyang
>            Assignee: vinoyang
>            Priority: Major
>
> Currently, only stream source tasks can be marked as temporary idle. But many 
> times, this approach has limitations.
> Considering such a scenario, there is a DAG as follows: 
> {{source->map->filter->flatmap->keyBy->window}}, with a degree of parallelism 
> of 10. Among them, the watermark is not sent by the source operator, but is 
> downstream, such as flatmap. Every source subtask will not be idle. However, 
> after the filter, some pipelines generate "idle". For example, there are 3 
> pipelines that will no longer have data sent downstream. At this time, we 
> can't call the {{markAsTemporarilyIdle}} method to mark the current pipeline 
> in the idle state. This will affect the downstream window.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to