Thanks Alexander for your detailed response.

I have a requirement that each asset will communicate different event time due 
to connectivity issues. If I have 50 asset and each communicates with different 
event time, I should not lose the data because of lateness.

To handle this, I have tried with keyBy operator to route the data by asset 
context and try to maintain watermark per asset (key) using keyedProcess 
function by registering eventtime timer for each asset (key).  When I have 
tried this option, I observed that eventtime timer is not triggered 
keyedProcess function and hence data didn’t flow downstream.

I am curious to know that whether will it be a feasible requirement to achieve 
it in flink using event time?


From: Alexander Fedulov <>
Date: Thursday, 14 May 2020 at 9:25 PM
To: Gnanasoundari Soundarajan <>
Cc: "" <>
Subject: Re: Watermarks and parallelism

Hi Gnana,

1. No, watermarks are generated independently per subtask. I think this section 
of the docs might make things more clear - 

2. The same watermark from the input of the keyBy will be dispatched to all of 
the instances of the downstream keyed operator. That said, there is no global 
coordination between the subtasks. The same watermark can arrive at the 
downstream subtask at a different time, depending on how much time they'd spend 
on the input channels. Notice also that watermarks are managed on the subtask 
level, not at the level of the individual keys.

3. I am not quite sure I get what you mean by this one and what exactly you try 
to achieve. I assume you want to basically have parallel windows that are 
scoped to all of the items coming from a corresponding subtask of the previous 
non-keyed operator. As Flink windows can be executed in parallel only on keyed 
streams, you could  do a little trick - use `reinterpredAsKeyedStream` 
 This will make it possible to basically have a "passthrough" partitioning, 
without an actual data shuffle. Another alternative would be to implement your 
Map function as a RichMapFunction, which gives you the access to the runtime 
context. From there:
1) use `getRuntimeContext().getIndexOfThisSubtask();` to retrieve the ID of the 
current subtask
2) enrich your events with a new field, containing the subtask ID
3) use this ID as the key in your keyBy operator
The problem is that both of those approaches will be non-deterministic in terms 
of state recovery when, for instance, you would like to scale out your job to a 
higher degree of parallelism. You'd need to decide if this is relevant for your 
use case.




Alexander Fedulov | Solutions Architect

+49 1514 6265796

[Image removed by sender.]<>

Follow us @VervericaData


Join Flink Forward<> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time


Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany


Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) 

On Thu, May 14, 2020 at 6:14 AM Gnanasoundari Soundarajan 
Hi all,

I have below queries in flink. Could anyone help me to understand?


1 Is watermark maintained  globally at the operator level?

2 When we have a keyByOperator with parallelism >1, is there a single watermark 
maintained across all the parallel subtasks or for each of the parallel subtasks

3. Assuming I have a keybyoperator with parallelism > 1, is it possible to feed 
data to this operator from only one stream from the previous parameter (say map 
(1) always goes to window (1)


Reply via email to