[ https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15632701#comment-15632701 ]
Aljoscha Krettek commented on FLINK-4576: ----------------------------------------- It's a good solution for the problem of idle sources (where we know that they are idle because they don't have partitions assigned). I don't think it solves the problem of having partitions assigned that don't produce new elements for a while. I also don't think it solves the problem of having a timestamp extractor/watermark assigner in the middle of a topology, event right after the sources. Does it? > Low Watermark Service in JobManager for Streaming Sources > --------------------------------------------------------- > > Key: FLINK-4576 > URL: https://issues.apache.org/jira/browse/FLINK-4576 > Project: Flink > Issue Type: New Feature > Components: JobManager, Streaming, TaskManager > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.2.0 > > > As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a > low watermark service in the JobManager to support transparent resharding / > partition discovery for our Kafka and Kinesis consumers (and any future > streaming connectors in general for which the external system may elastically > scale up and down independently of the parallelism of sources in Flink). The > main idea is to let source subtasks that don't emit their own watermarks > (because they currently don't have data partitions to consume) emit the low > watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE > watermark and forbidding them to be assigned partitions in the future. > The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} > will be added to execution graphs, periodically triggering only the source > vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the > JobManager through the actor gateway (or a new interface after FLINK-4456 > gets merged) with a {{ReplyLowWatermark}} message. When the coordinator > collects all low watermarks for a particular source vertex and determines the > aggregated low watermark for this round (accounting only values that are > larger than the aggregated low watermark of the last round), it sends a > {{NotifyNewLowWatermark}} message to the source vertex's tasks. > The messages will only be relevant to tasks that implement an internal > {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} > should implement {{LowWatermarkCooperatingTask}}. > Source functions should implement a public {{LowWatermarkListener}} interface > if they wish to get notified of the aggregated low watermarks across > subtasks. Connectors like the Kinesis consumer can choose to emit this > watermark if the subtask currently does not have any shards, so that > downstream operators may still properly advance time windows (implementation > for this is tracked as a separate issue). > Overall, the service will include - > New messages between JobManager <-> TaskManager: > {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}} > {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}} > {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, > timestamp)}} > New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime > New public interface {{LowWatermarkListener}} in flink-streaming-java > Might also need to extend {{SourceFunction.SourceContext}} to support > retrieving the current low watermark of sources. > Any feedback for this is appreciated! -- This message was sent by Atlassian JIRA (v6.3.4#6332)