[ https://issues.apache.org/jira/browse/FLINK-11286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16776540#comment-16776540 ]
Tzu-Li (Gordon) Tai edited comment on FLINK-11286 at 2/25/19 5:57 AM: ---------------------------------------------------------------------- Please allow me to re-elaborate what I do agree with and what I don't agree with. I feel like the discussion is looping in circles and we keep on repeating the same arguments. What I agree with: # In some cases, like filtering within the pipeline, input channels become IDLE only after the filtering, and not at the source. # However, these non-source operators are not capable of marking themselves as IDLE, only sources can do that right now. What I don't agree with: # The JIRA proposes that ALL operators should be able to mark themselves as IDLE. I DON'T agree with this. # Only watermark-generating operators (i.e. source operators, `TimestampsAndPeriodicWatermarksOperator`, and `TimestampsAndPunctuatedWatermarksOperator`) are required to mark themselves as IDLE if some input channel is IDLE. # Their idleness status should NOT be triggered by user code, like what is available on `SourceContext#markAsTemporarilyIdle()`. The sources can do that, because in certain cases they can deterministically know that they are indeed idle (e.g. a subtask is not assigned any Kafka partitions). This is not the case for watermark operators; the only way for them to detect idleness, is by some configured timeout of how much time has passed since one of their input channels did not provide input. This should be automatically done by the system. This is likewise to how idleness timeout works in `StreamSource` (although right now the timeout is always set to -1, meaning that automatic idleness detection triggered by timeouts is always disabled). So, it really boils down to the following: - Only sources can allow user code to mark the operator as temporarily idle. That is because only sources can identify specific cases that immediately defines idleness. - All other operators need to rely on some timeout configuration to detect idleness, even the source themselves because it could be that the source didn't explicitly say it is IDLE, but it actually does not produce any records (e.g., some source subtask has a Kafka partition assigned, but the partition is empty) - That being said, we do not need all operators to perform this timeout-based automatic idleness detection, because time-based operations such as windows always happen after some watermark-generating operator. - Therefore, only watermark-generating operators should need to do the idleness detection. We already have that in the source operator; it is still missing in the `TimestampsAndPeriodicWatermarksOperator` and `TimestampsAndPunctuatedWatermarksOperator` operators. was (Author: tzulitai): Please allow me to re-elaborate what I do agree with and what I don't agree with. I feel like the discussion is looping in circles and we keep on repeating the same arguments. What I agree with: 1. In some cases, like filtering within the pipeline, input channels become IDLE only after the filtering, and not at the source. 2. However, these non-source operators are not capable of marking themselves as IDLE, only sources can do that right now. What I don't agree with: 1. The JIRA proposes that ALL operators should be able to mark themselves as IDLE. I DON'T agree with this. 2. Only watermark-generating operators (i.e. source operators, `TimestampsAndPeriodicWatermarksOperator`, and `TimestampsAndPunctuatedWatermarksOperator`) are required to mark themselves as IDLE if some input channel is IDLE. 3. Their idleness status should NOT be triggered by user code, like what is available on `SourceContext#markAsTemporarilyIdle()`. The sources can do that, because in certain cases they can deterministically know that they are indeed idle (e.g. a subtask is not assigned any Kafka partitions). This is not the case for watermark operators; the only way for them to detect idleness, is by some configured timeout of how much time has passed since one of their input channels did not provide input. This should be automatically done by the system. This is likewise to how idleness timeout works in `StreamSource` (although right now the timeout is always set to -1, meaning that automatic idleness detection triggered by timeouts is always disabled). So, it really boils down to the following: - Only sources can allow user code to mark the operator as temporarily idle. That is because only sources can identify specific cases that immediately defines idleness. - All other operators need to rely on some timeout configuration to detect idleness, even the source themselves because it could be that the source didn't explicitly say it is IDLE, but it actually does not produce any records (e.g., some source subtask has a Kafka partition assigned, but the partition is empty) - That being said, we do not need all operators to perform this timeout-based automatic idleness detection, because time-based operations such as windows always happen after some watermark-generating operator. - Therefore, only watermark-generating operators should need to do the idleness detection. We already have that in the source operator; it is still missing in the `TimestampsAndPeriodicWatermarksOperator` and `TimestampsAndPunctuatedWatermarksOperator` operators. > Support to send StreamStatus.IDLE for non-source operators > ----------------------------------------------------------- > > Key: FLINK-11286 > URL: https://issues.apache.org/jira/browse/FLINK-11286 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Reporter: vinoyang > Assignee: vinoyang > Priority: Major > > Currently, only stream source tasks can be marked as temporary idle. But many > times, this approach has limitations. > Considering such a scenario, there is a DAG as follows: > {{source->map->filter->flatmap->keyBy->window}}, with a degree of parallelism > of 10. Among them, the watermark is not sent by the source operator, but is > downstream, such as flatmap. Every source subtask will not be idle. However, > after the filter, some pipelines generate "idle". For example, there are 3 > pipelines that will no longer have data sent downstream. At this time, we > can't call the {{markAsTemporarilyIdle}} method to mark the current pipeline > in the idle state. This will affect the downstream window. -- This message was sent by Atlassian JIRA (v7.6.3#76005)