[ 
https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4576:
---------------------------------------
    Description: 
As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a 
low watermark service in the JobManager to support transparent resharding / 
partition discovery for our Kafka and Kinesis consumers (and any future 
streaming connectors in general for which the external system may elastically 
scale up and down independently of the parallelism of sources in Flink).

The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} 
will be added to execution graphs, periodically triggering only the source 
vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the 
JobManager through the actor gateway (or the new interface after FLINK-4456 
gets merged) with a {{ReplyLowWatermark}} message. When the coordinator 
collects all low watermarks for a particular source vertex and determines the 
aggregated low watermark for this round (accounting only values that are larger 
than the aggregated low watermark of the last round), it sends a 
{{NotifyNewLowWatermark}} message to the source vertex's tasks.

The messages will only be relevant to tasks that implement an internal 
{{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
should implement {{LowWatermarkCooperatingTask}}.

Source functions should implement a public {{LowWatermarkListener}} interface 
if they wish to get notified of the aggregated low watermarks across subtasks. 
Connectors like the Kinesis consumer can choose to emit this watermark if the 
subtask currently does not have any shards, so that downstream operators may 
still properly advance time windows (implementation for this is tracked as a 
separate issue).

Overall, the service will include -
New messages between JobManager <-> TaskManager:
{{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
{{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
{{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark)}}
New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
New public interface {{LowWatermarkListener}} in flink-streaming-java
Might also need to extend {{SourceFunction.SourceContext}} to support 
retrieving the current low watermark of sources.

Any feedback for this is appreciated!

  was:
As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a 
low watermark service in the JobManager to support transparent resharding / 
partition discovery for our Kafka and Kinesis consumers (and any future 
streaming connectors in general for which the external system may elastically 
scale up and down independently of the parallelism of sources in Flink).

The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} 
will be added to execution graphs, periodically triggering only the source 
vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the 
JobManager through the actor gateway (or the new interface after FLINK-4456 
gets merged) with a {{ReplyLowWatermark}} message. When the coordinator 
collects all low watermarks for a particular source vertex and determines the 
aggregated low watermark for this round (accounting only values that are larger 
than the aggregated low watermark of the last round), it sends a 
{{NotifyNewLowWatermark}} message to the source vertex's tasks.

The messages will only be relevant to tasks that implement an internal 
{{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
should implement {{LowWatermarkCooperatingTask}}.

Source functions should implement a public {{LowWatermarkListener}} interface 
if they wish to get notified of the aggregated low watermarks across subtasks. 
Connectors like the Kinesis consumer can choose to emit this watermark if the 
subtask currently does not have any shards, so that downstream operators may 
still properly advance time windows (implementation for this is tracked as a 
separate issue).

Overall, the service will include -
New messages between JobManager <-> TaskManager:
{{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
{{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
{{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark)}}
New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
New public interface {{LowWatermarkListener}} in flink-streaming-java


> Low Watermark Service in JobManager for Streaming Sources
> ---------------------------------------------------------
>
>                 Key: FLINK-4576
>                 URL: https://issues.apache.org/jira/browse/FLINK-4576
>             Project: Flink
>          Issue Type: New Feature
>          Components: JobManager, Streaming, TaskManager
>            Reporter: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0
>
>
> As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a 
> low watermark service in the JobManager to support transparent resharding / 
> partition discovery for our Kafka and Kinesis consumers (and any future 
> streaming connectors in general for which the external system may elastically 
> scale up and down independently of the parallelism of sources in Flink).
> The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} 
> will be added to execution graphs, periodically triggering only the source 
> vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the 
> JobManager through the actor gateway (or the new interface after FLINK-4456 
> gets merged) with a {{ReplyLowWatermark}} message. When the coordinator 
> collects all low watermarks for a particular source vertex and determines the 
> aggregated low watermark for this round (accounting only values that are 
> larger than the aggregated low watermark of the last round), it sends a 
> {{NotifyNewLowWatermark}} message to the source vertex's tasks.
> The messages will only be relevant to tasks that implement an internal 
> {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
> should implement {{LowWatermarkCooperatingTask}}.
> Source functions should implement a public {{LowWatermarkListener}} interface 
> if they wish to get notified of the aggregated low watermarks across 
> subtasks. Connectors like the Kinesis consumer can choose to emit this 
> watermark if the subtask currently does not have any shards, so that 
> downstream operators may still properly advance time windows (implementation 
> for this is tracked as a separate issue).
> Overall, the service will include -
> New messages between JobManager <-> TaskManager:
> {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
> {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
> {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark)}}
> New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
> New public interface {{LowWatermarkListener}} in flink-streaming-java
> Might also need to extend {{SourceFunction.SourceContext}} to support 
> retrieving the current low watermark of sources.
> Any feedback for this is appreciated!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to