[ 
https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635719#comment-15635719
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4576 at 11/4/16 9:08 AM:
---------------------------------------------------------------------

Yes, that's true. In general, if all input channels of an operator has received 
the "watermark idle" message, then itself should propagate a "watermark idle" 
downstream. In the case of an operator which is in the middle of the operator 
chain of a task, it should just pass on the "watermark idle" downstream (for 
example, a timestamp / watermark extractor chained with source). In the case of 
a head operator of a chain which has multiple input channels, it only passes a 
"watermark idle" message when all of its input channels has received the 
message; as long as one of the input channels is not idle, the "watermark idle" 
message will not be passed further downstream.

About that, I could use some help describing how the code works regarding 
elements flowing through an operator chain of a single task. Specifically, if a 
previous operator in the chain emits an watermark, this watermark will be used 
to call the {{AbstractStreamOperator#processWatermark()}} method of the 
operator directly afterwards, correct? I'm struggling a bit trying to 
understand how the {{ChainedOutput}} works in {{OperatorChain}}.


was (Author: tzulitai):
Yes, that's true. In general, if all input channels of an operator has received 
the "watermark idle" message, then itself should propagate a "watermark idle" 
downstream. In the case of an operator which is in the middle of the operator 
chain of a task, it should just pass on the "watermark idle" downstream. In the 
case of a head operator of a chain which has multiple input channels, it only 
passes a "watermark idle" message when all of its input channels has received 
the message; as long as one of the input channels is not idle, the "watermark 
idle" message will not be passed further downstream.

About that, I could use some help describing how the code works regarding 
elements flowing through an operator chain of a single task. Specifically, if a 
previous operator in the chain emits an watermark, this watermark will be used 
to call the {{AbstractStreamOperator#processWatermark()}} method of the 
operator directly afterwards, correct? I'm struggling a bit trying to 
understand how the {{ChainedOutput}} works in {{OperatorChain}}.

> Low Watermark Service in JobManager for Streaming Sources
> ---------------------------------------------------------
>
>                 Key: FLINK-4576
>                 URL: https://issues.apache.org/jira/browse/FLINK-4576
>             Project: Flink
>          Issue Type: New Feature
>          Components: JobManager, Streaming, TaskManager
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a 
> low watermark service in the JobManager to support transparent resharding / 
> partition discovery for our Kafka and Kinesis consumers (and any future 
> streaming connectors in general for which the external system may elastically 
> scale up and down independently of the parallelism of sources in Flink). The 
> main idea is to let source subtasks that don't emit their own watermarks 
> (because they currently don't have data partitions to consume) emit the low 
> watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE 
> watermark and forbidding them to be assigned partitions in the future.
> The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} 
> will be added to execution graphs, periodically triggering only the source 
> vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the 
> JobManager through the actor gateway (or a new interface after FLINK-4456 
> gets merged) with a {{ReplyLowWatermark}} message. When the coordinator 
> collects all low watermarks for a particular source vertex and determines the 
> aggregated low watermark for this round (accounting only values that are 
> larger than the aggregated low watermark of the last round), it sends a 
> {{NotifyNewLowWatermark}} message to the source vertex's tasks.
> The messages will only be relevant to tasks that implement an internal 
> {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
> should implement {{LowWatermarkCooperatingTask}}.
> Source functions should implement a public {{LowWatermarkListener}} interface 
> if they wish to get notified of the aggregated low watermarks across 
> subtasks. Connectors like the Kinesis consumer can choose to emit this 
> watermark if the subtask currently does not have any shards, so that 
> downstream operators may still properly advance time windows (implementation 
> for this is tracked as a separate issue).
> Overall, the service will include -
> New messages between JobManager <-> TaskManager:
> {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
> {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
> {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, 
> timestamp)}}
> New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
> New public interface {{LowWatermarkListener}} in flink-streaming-java
> Might also need to extend {{SourceFunction.SourceContext}} to support 
> retrieving the current low watermark of sources.
> Any feedback for this is appreciated!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to