Hi,
yes, your description is spot on!

Cheers,
Aljoscha
> On 26 Feb 2016, at 00:19, Zach Cox <zcox...@gmail.com> wrote:
> 
> I think I found the information I was looking for:
> 
> RecordWriter broadcasts each emitted watermark to all outgoing channels [1].
> 
> StreamInputProcessor tracks the max watermark received on each incoming 
> channel separately, and computes the task's watermark as the min of all 
> incoming watermarks [2].
> 
> Is this an accurate summary of Flink's watermark propagation?
> 
> So in my previous example, each window count task is building up a count for 
> each window based on incoming event's timestamp, and when all incoming 
> watermarks have progressed beyond the end of the window, the count is 
> emitted. So if one partition's watermark lags behind the other, it just means 
> the window output is triggered based on this lagging watermark.
> 
> -Zach
> 
> [1] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> [2] 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
> 
> 
> On Thu, Feb 25, 2016 at 3:31 PM Zach Cox <zcox...@gmail.com> wrote:
> Hi - how are watermarks passed along parallel tasks where there is a 
> repartition? For example, say I have a simple streaming job computing hourly 
> counts per key, something like this:
> 
> val environment = StreamExecutionEnvironment.getExecutionEnvironment
> environment.setParallelism(2)
> environment.setStreamTimeCharacteristic(EventTime)
> environment.getConfig.enableTimestamps()
> environment
>   .addSource(...)
>   .assignAscendingTimestamps(_.timestamp)
>   .keyBy("someField")
>   .timeWindow(Time.hours(1))
>   .fold(0, (count, element) => count + 1)
>   .addSink(...)
> environment.execute("example")
> 
> Say the source has 2 parallel partitions (e.g. Kafka topic) and the events 
> from the source contain timestamps, but over time the 2 source tasks diverge 
> in event time (maybe 1 Kafka topic partition has many more events than the 
> other).
> 
> The job graph looks like this: http://imgur.com/hxEpF6b
> 
> From what I can tell, the execution graph, with parallelism=2, would look 
> like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to be 
> used, so that events with the same key end up at the same window subtask, 
> regardless of which source partition they came from. 
> 
> Since the watermarks are skewed between the parallel pipelines, what happens 
> when differing watermarks are sent to the window count operators? Is 
> something tracking the min incoming watermark there? Could anyone point me to 
> Flink code that implements this? I'd really like to learn more about how this 
> works.
> 
> Thanks,
> Zach
> 
> 

Reply via email to