Oops mistyped your name, Dan From: Schwalbe Matthias Sent: Freitag, 18. März 2022 09:02 To: 'Dan Hill' <quietgol...@gmail.com>; Dongwon Kim <eastcirc...@gmail.com> Cc: user <user@flink.apache.org> Subject: RE: Weird Flink Kafka source watermark behavior
Hi San, Dongwon, I share the opinion that when per-partition watermarking is enabled, you should observe correct behavior … would be interesting to see why it does not work for you. I’d like to clear one tiny misconception here when you write: >> - The same issue happens even if I use an idle watermark. You would expect to see glitches with watermarking when you enable idleness. Idleness sort of trades watermark correctness for reduces latency when processing timers (much simplified). With idleness enabled you have no guaranties whatsoever as to the quality of watermarks (which might be ok in some cases). BTW we dominantly use a mix of fast and slow sources (that only update once a day) which hand-pimped watermarking and late event processing, and enabling idleness would break everything. Oversight put aside things should work the way you implemented it. One thing I could imagine to be a cause is * that over time the kafka partitions get reassigned to different consumer subtasks which would probably stress correct recalculation of watermarks. Hence #partition == number subtask might reduce the problem * can you enable logging of partition-consumer assignment, to see if that is the cause of the problem * also involuntary restarts of the job can cause havoc as this resets watermarking I’ll be off next week, unable to take part in the active discussion … Sincere greetings Thias From: Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>> Sent: Freitag, 18. März 2022 08:23 To: Dongwon Kim <eastcirc...@gmail.com<mailto:eastcirc...@gmail.com>> Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Weird Flink Kafka source watermark behavior ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ I'll try forcing # source tasks = # partitions tomorrow. Thank you, Dongwon, for all of your help! On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com<mailto:eastcirc...@gmail.com>> wrote: I believe your job with per-partition watermarking should be working okay even in a backfill scenario. BTW, is the problem still observed even with # sour tasks = # partitions? For committers: Is there a way to confirm that per-partition watermarking is used in TM log? On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote: I hit this using event processing and no idleness detection. The same issue happens if I enable idleness. My code matches the code example for per-partition watermarking<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>. On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com<mailto:eastcirc...@gmail.com>> wrote: Hi Dan, I'm quite confused as you already use per-partition watermarking. What I meant in the reply is - If you don't use per-partition watermarking, # tasks < # partitions can cause the problem for backfill jobs. - If you don't use per-partition watermarking, # tasks = # partitions is going to be okay even for backfill jobs. - If you use per-partition watermarking, # tasks < # partitions shouldn't cause any problems unless you turn on the idleness detection. Regarding the idleness detection which is based on processing time, what is your setting? If you set the value to 10 seconds for example, you'll face the same problem unless the watermark of your backfill job catches up real-time within 10 seconds. If you increase the value to 1 minute, your backfill job should catch up real-time within 1 minute. Best, Dongwon On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote: Thanks Dongwon! Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source tasks < # kafka partitions. This should be called out in the docs or the bug should be fixed. On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com<mailto:eastcirc...@gmail.com>> wrote: Hi Dan, Do you use the per-partition watermarking explained in [1]? I've also experienced a similar problem when running backfill jobs specifically when # source tasks < # kafka partitions. - When # source tasks = # kafka partitions, the backfill job works as expected. - When # source tasks < # kafka partitions, a Kafka consumer consumes multiple partitions. This case can destroying the per-partition patterns as explained in [2]. Hope this helps. p.s. If you plan to use the per-partition watermarking, be aware that idleness detection [3] can cause another problem when you run a backfill job. Kafka source tasks in a backfill job seem to read a batch of records from Kafka and then wait for downstream tasks to catch up the progress, which can be counted as idleness. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector [3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources Best, Dongwon On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote: I'm following the example from this section: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote: Other points - I'm using the kafka timestamp as event time. - The same issue happens even if I use an idle watermark. On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote: There are 12 Kafka partitions (to keep the structure similar to other low traffic environments). On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>> wrote: Hi. I'm running a backfill from a kafka topic with very few records spread across a few days. I'm seeing a case where the records coming from a kafka source have a watermark that's more recent (by hours) than the event time. I haven't seen this before when running this. This violates what I'd assume the kafka source would do. Example problem: 1. I have kafka records at ts=1000, 2000, ... 500000. The actual times are separated by a longer time period. 2. My first operator after the FlinkKafkaConsumer sees: context.timestamp() = 1000 context.timerService().currentWatermark() = 500000 Details about how I'm running this: - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source. - I'm using FlinkKafkaConsumer - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No idleness settings. - I'm running similar code in all the environments. The main difference is low traffic. I have not been able to reproduce this out of the environment. I put the following process function right after my kafka source. -------- AfterSource ts=1647274892728 watermark=1647575140007 record=... public static class TextLog extends ProcessFunction<Record, Record> { private final String label; public TextLogDeliveryLog(String label) { this.label = label; } @Override public void processElement(Record record, Context context, Collector<Record> collector) throws Exception { LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", label, context.timestamp(), context.timerService().currentWatermark(), record); collector.collect(deliveryLog); } } Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.