Oops mistyped your name, Dan

From: Schwalbe Matthias
Sent: Freitag, 18. März 2022 09:02
To: 'Dan Hill' <quietgol...@gmail.com>; Dongwon Kim <eastcirc...@gmail.com>
Cc: user <user@flink.apache.org>
Subject: RE: Weird Flink Kafka source watermark behavior

Hi San, Dongwon,

I share the opinion that when per-partition watermarking is enabled, you should 
observe correct behavior … would be interesting to see why it does not work for 

I’d like to clear one tiny misconception here when you write:

>> - The same issue happens even if I use an idle watermark.

You would expect to see glitches with watermarking when you enable idleness.
Idleness sort of trades watermark correctness for reduces latency when 
processing timers (much simplified).
With idleness enabled you have no guaranties whatsoever as to the quality of 
watermarks (which might be ok in some cases).
BTW we dominantly use a mix of fast and slow sources (that only update once a 
day) which hand-pimped watermarking and late event processing, and enabling 
idleness would break everything.

Oversight put aside things should work the way you implemented it.

One thing I could imagine to be a cause is

  *   that over time the kafka partitions get reassigned  to different consumer 
subtasks which would probably stress correct recalculation of watermarks. Hence 
#partition == number subtask might reduce the problem
  *   can you enable logging of partition-consumer assignment, to see if that 
is the cause of the problem
  *   also involuntary restarts of the job can cause havoc as this resets 

I’ll be off next week, unable to take part in the active discussion …

Sincere greetings


From: Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>>
Sent: Freitag, 18. März 2022 08:23
To: Dongwon Kim <eastcirc...@gmail.com<mailto:eastcirc...@gmail.com>>
Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Weird Flink Kafka source watermark behavior

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

I'll try forcing # source tasks = # partitions tomorrow.

Thank you, Dongwon, for all of your help!

On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim 
<eastcirc...@gmail.com<mailto:eastcirc...@gmail.com>> wrote:
I believe your job with per-partition watermarking should be working okay even 
in a backfill scenario.

BTW, is the problem still observed even with # sour tasks = # partitions?

For committers:
Is there a way to confirm that per-partition watermarking is used in TM log?

On Fri, Mar 18, 2022 at 4:14 PM Dan Hill 
<quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote:
I hit this using event processing and no idleness detection.  The same issue 
happens if I enable idleness.

My code matches the code example for per-partition 

On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim 
<eastcirc...@gmail.com<mailto:eastcirc...@gmail.com>> wrote:
Hi Dan,

I'm quite confused as you already use per-partition watermarking.

What I meant in the reply is
- If you don't use per-partition watermarking, # tasks < # partitions can cause 
the problem for backfill jobs.
- If you don't use per-partition watermarking, # tasks = # partitions is going 
to be okay even for backfill jobs.
- If you use per-partition watermarking, # tasks < # partitions shouldn't cause 
any problems unless you turn on the idleness detection.

Regarding the idleness detection which is based on processing time, what is 
your setting? If you set the value to 10 seconds for example, you'll face the 
same problem unless the watermark of your backfill job catches up real-time 
within 10 seconds. If you increase the value to 1 minute, your backfill job 
should catch up real-time within 1 minute.



On Fri, Mar 18, 2022 at 3:51 PM Dan Hill 
<quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote:
Thanks Dongwon!

Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source tasks < 
# kafka partitions.  This should be called out in the docs or the bug should be 

On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim 
<eastcirc...@gmail.com<mailto:eastcirc...@gmail.com>> wrote:
Hi Dan,

Do you use the per-partition watermarking explained in [1]?
I've also experienced a similar problem when running backfill jobs specifically 
when # source tasks < # kafka partitions.
- When # source tasks = # kafka partitions, the backfill job works as expected.
- When # source tasks < # kafka partitions, a Kafka consumer consumes multiple 
partitions. This case can destroying the per-partition patterns as explained in 

Hope this helps.

p.s. If you plan to use the per-partition watermarking, be aware that idleness 
detection [3] can cause another problem when you run a backfill job. Kafka 
source tasks in a backfill job seem to read a batch of records from Kafka and 
then wait for downstream tasks to catch up the progress, which can be counted 
as idleness.




On Fri, Mar 18, 2022 at 2:35 PM Dan Hill 
<quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote:
I'm following the example from this section:

On Thu, Mar 17, 2022 at 10:26 PM Dan Hill 
<quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote:
Other points
- I'm using the kafka timestamp as event time.
- The same issue happens even if I use an idle watermark.

On Thu, Mar 17, 2022 at 10:17 PM Dan Hill 
<quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote:
There are 12 Kafka partitions (to keep the structure similar to other low 
traffic environments).

On Thu, Mar 17, 2022 at 10:13 PM Dan Hill 
<quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote:

I'm running a backfill from a kafka topic with very few records spread across a 
few days.  I'm seeing a case where the records coming from a kafka source have 
a watermark that's more recent (by hours) than the event time.  I haven't seen 
this before when running this.  This violates what I'd assume the kafka source 
would do.

Example problem:
1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times are 
separated by a longer time period.
2.  My first operator after the FlinkKafkaConsumer sees:
   context.timestamp() = 1000
   context.timerService().currentWatermark() = 500000

Details about how I'm running this:
- I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
- I'm using FlinkKafkaConsumer
- I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness 
- I'm running similar code in all the environments.  The main difference is low 
traffic.  I have not been able to reproduce this out of the environment.

I put the following process function right after my kafka source.



public static class TextLog extends ProcessFunction<Record, Record> {
    private final String label;
    public TextLogDeliveryLog(String label) {
        this.label = label;
    public void processElement(Record record, Context context, 
Collector<Record> collector) throws Exception {
                label, context.timestamp(), 
context.timerService().currentWatermark(), record);
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to