[ https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635719#comment-15635719 ]
Tzu-Li (Gordon) Tai edited comment on FLINK-4576 at 11/4/16 9:08 AM: --------------------------------------------------------------------- Yes, that's true. In general, if all input channels of an operator has received the "watermark idle" message, then itself should propagate a "watermark idle" downstream. In the case of an operator which is in the middle of the operator chain of a task, it should just pass on the "watermark idle" downstream (for example, a timestamp / watermark extractor chained with source). In the case of a head operator of a chain which has multiple input channels, it only passes a "watermark idle" message when all of its input channels has received the message; as long as one of the input channels is not idle, the "watermark idle" message will not be passed further downstream. About that, I could use some help describing how the code works regarding elements flowing through an operator chain of a single task. Specifically, if a previous operator in the chain emits an watermark, this watermark will be used to call the {{AbstractStreamOperator#processWatermark()}} method of the operator directly afterwards, correct? I'm struggling a bit trying to understand how the {{ChainedOutput}} works in {{OperatorChain}}. was (Author: tzulitai): Yes, that's true. In general, if all input channels of an operator has received the "watermark idle" message, then itself should propagate a "watermark idle" downstream. In the case of an operator which is in the middle of the operator chain of a task, it should just pass on the "watermark idle" downstream. In the case of a head operator of a chain which has multiple input channels, it only passes a "watermark idle" message when all of its input channels has received the message; as long as one of the input channels is not idle, the "watermark idle" message will not be passed further downstream. About that, I could use some help describing how the code works regarding elements flowing through an operator chain of a single task. Specifically, if a previous operator in the chain emits an watermark, this watermark will be used to call the {{AbstractStreamOperator#processWatermark()}} method of the operator directly afterwards, correct? I'm struggling a bit trying to understand how the {{ChainedOutput}} works in {{OperatorChain}}. > Low Watermark Service in JobManager for Streaming Sources > --------------------------------------------------------- > > Key: FLINK-4576 > URL: https://issues.apache.org/jira/browse/FLINK-4576 > Project: Flink > Issue Type: New Feature > Components: JobManager, Streaming, TaskManager > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.2.0 > > > As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a > low watermark service in the JobManager to support transparent resharding / > partition discovery for our Kafka and Kinesis consumers (and any future > streaming connectors in general for which the external system may elastically > scale up and down independently of the parallelism of sources in Flink). The > main idea is to let source subtasks that don't emit their own watermarks > (because they currently don't have data partitions to consume) emit the low > watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE > watermark and forbidding them to be assigned partitions in the future. > The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} > will be added to execution graphs, periodically triggering only the source > vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the > JobManager through the actor gateway (or a new interface after FLINK-4456 > gets merged) with a {{ReplyLowWatermark}} message. When the coordinator > collects all low watermarks for a particular source vertex and determines the > aggregated low watermark for this round (accounting only values that are > larger than the aggregated low watermark of the last round), it sends a > {{NotifyNewLowWatermark}} message to the source vertex's tasks. > The messages will only be relevant to tasks that implement an internal > {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} > should implement {{LowWatermarkCooperatingTask}}. > Source functions should implement a public {{LowWatermarkListener}} interface > if they wish to get notified of the aggregated low watermarks across > subtasks. Connectors like the Kinesis consumer can choose to emit this > watermark if the subtask currently does not have any shards, so that > downstream operators may still properly advance time windows (implementation > for this is tracked as a separate issue). > Overall, the service will include - > New messages between JobManager <-> TaskManager: > {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}} > {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}} > {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, > timestamp)}} > New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime > New public interface {{LowWatermarkListener}} in flink-streaming-java > Might also need to extend {{SourceFunction.SourceContext}} to support > retrieving the current low watermark of sources. > Any feedback for this is appreciated! -- This message was sent by Atlassian JIRA (v6.3.4#6332)