[ https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15632615#comment-15632615 ]
Stephan Ewen commented on FLINK-4576: ------------------------------------- +1 for the "watermark idle" messages approach. That strikes me as a good idea. > Low Watermark Service in JobManager for Streaming Sources > --------------------------------------------------------- > > Key: FLINK-4576 > URL: https://issues.apache.org/jira/browse/FLINK-4576 > Project: Flink > Issue Type: New Feature > Components: JobManager, Streaming, TaskManager > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.2.0 > > > As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a > low watermark service in the JobManager to support transparent resharding / > partition discovery for our Kafka and Kinesis consumers (and any future > streaming connectors in general for which the external system may elastically > scale up and down independently of the parallelism of sources in Flink). The > main idea is to let source subtasks that don't emit their own watermarks > (because they currently don't have data partitions to consume) emit the low > watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE > watermark and forbidding them to be assigned partitions in the future. > The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} > will be added to execution graphs, periodically triggering only the source > vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the > JobManager through the actor gateway (or a new interface after FLINK-4456 > gets merged) with a {{ReplyLowWatermark}} message. When the coordinator > collects all low watermarks for a particular source vertex and determines the > aggregated low watermark for this round (accounting only values that are > larger than the aggregated low watermark of the last round), it sends a > {{NotifyNewLowWatermark}} message to the source vertex's tasks. > The messages will only be relevant to tasks that implement an internal > {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} > should implement {{LowWatermarkCooperatingTask}}. > Source functions should implement a public {{LowWatermarkListener}} interface > if they wish to get notified of the aggregated low watermarks across > subtasks. Connectors like the Kinesis consumer can choose to emit this > watermark if the subtask currently does not have any shards, so that > downstream operators may still properly advance time windows (implementation > for this is tracked as a separate issue). > Overall, the service will include - > New messages between JobManager <-> TaskManager: > {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}} > {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}} > {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, > timestamp)}} > New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime > New public interface {{LowWatermarkListener}} in flink-streaming-java > Might also need to extend {{SourceFunction.SourceContext}} to support > retrieving the current low watermark of sources. > Any feedback for this is appreciated! -- This message was sent by Atlassian JIRA (v6.3.4#6332)