I'll try forcing # source tasks = # partitions tomorrow. Thank you, Dongwon, for all of your help!
On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > I believe your job with per-partition watermarking should be working okay > even in a backfill scenario. > > BTW, is the problem still observed even with # sour tasks = # partitions? > > For committers: > Is there a way to confirm that per-partition watermarking is used in TM > log? > > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com> wrote: > >> I hit this using event processing and no idleness detection. The same >> issue happens if I enable idleness. >> >> My code matches the code example for per-partition watermarking >> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector> >> . >> >> >> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi Dan, >>> >>> I'm quite confused as you already use per-partition watermarking. >>> >>> What I meant in the reply is >>> - If you don't use per-partition watermarking, # tasks < # partitions >>> can cause the problem for backfill jobs. >>> - If you don't use per-partition watermarking, # tasks = # partitions is >>> going to be okay even for backfill jobs. >>> - If you use per-partition watermarking, # tasks < # partitions >>> shouldn't cause any problems unless you turn on the idleness detection. >>> >>> Regarding the idleness detection which is based on processing time, what >>> is your setting? If you set the value to 10 seconds for example, you'll >>> face the same problem unless the watermark of your backfill job catches >>> up real-time within 10 seconds. If you increase the value to 1 minute, your >>> backfill job should catch up real-time within 1 minute. >>> >>> Best, >>> >>> Dongwon >>> >>> >>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote: >>> >>>> Thanks Dongwon! >>>> >>>> Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source >>>> tasks < # kafka partitions. This should be called out in the docs or the >>>> bug should be fixed. >>>> >>>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> >>>> wrote: >>>> >>>>> Hi Dan, >>>>> >>>>> Do you use the per-partition watermarking explained in [1]? >>>>> I've also experienced a similar problem when running backfill jobs >>>>> specifically when # source tasks < # kafka partitions. >>>>> - When # source tasks = # kafka partitions, the backfill job works as >>>>> expected. >>>>> - When # source tasks < # kafka partitions, a Kafka consumer consumes >>>>> multiple partitions. This case can destroying the per-partition patterns >>>>> as >>>>> explained in [2]. >>>>> >>>>> Hope this helps. >>>>> >>>>> p.s. If you plan to use the per-partition watermarking, be aware that >>>>> idleness detection [3] can cause another problem when you run a backfill >>>>> job. Kafka source tasks in a backfill job seem to read a batch of records >>>>> from Kafka and then wait for downstream tasks to catch up the progress, >>>>> which can be counted as idleness. >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie >>>>> [2] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>>>> [3] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources >>>>> >>>>> Best, >>>>> >>>>> Dongwon >>>>> >>>>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> >>>>> wrote: >>>>> >>>>>> I'm following the example from this section: >>>>>> >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>>>>> >>>>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Other points >>>>>>> - I'm using the kafka timestamp as event time. >>>>>>> - The same issue happens even if I use an idle watermark. >>>>>>> >>>>>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> There are 12 Kafka partitions (to keep the structure similar to >>>>>>>> other low traffic environments). >>>>>>>> >>>>>>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi. >>>>>>>>> >>>>>>>>> I'm running a backfill from a kafka topic with very few records >>>>>>>>> spread across a few days. I'm seeing a case where the records coming >>>>>>>>> from >>>>>>>>> a kafka source have a watermark that's more recent (by hours) than the >>>>>>>>> event time. I haven't seen this before when running this. This >>>>>>>>> violates >>>>>>>>> what I'd assume the kafka source would do. >>>>>>>>> >>>>>>>>> Example problem: >>>>>>>>> 1. I have kafka records at ts=1000, 2000, ... 500000. The actual >>>>>>>>> times are separated by a longer time period. >>>>>>>>> 2. My first operator after the FlinkKafkaConsumer sees: >>>>>>>>> context.timestamp() = 1000 >>>>>>>>> context.timerService().currentWatermark() = 500000 >>>>>>>>> >>>>>>>>> Details about how I'm running this: >>>>>>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the >>>>>>>>> source. >>>>>>>>> - I'm using FlinkKafkaConsumer >>>>>>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No >>>>>>>>> idleness settings. >>>>>>>>> - I'm running similar code in all the environments. The main >>>>>>>>> difference is low traffic. I have not been able to reproduce this >>>>>>>>> out of >>>>>>>>> the environment. >>>>>>>>> >>>>>>>>> >>>>>>>>> I put the following process function right after my kafka source. >>>>>>>>> >>>>>>>>> -------- >>>>>>>>> >>>>>>>>> AfterSource >>>>>>>>> ts=1647274892728 >>>>>>>>> watermark=1647575140007 >>>>>>>>> record=... >>>>>>>>> >>>>>>>>> >>>>>>>>> public static class TextLog extends ProcessFunction<Record, >>>>>>>>> Record> { >>>>>>>>> private final String label; >>>>>>>>> public TextLogDeliveryLog(String label) { >>>>>>>>> this.label = label; >>>>>>>>> } >>>>>>>>> @Override >>>>>>>>> public void processElement(Record record, Context context, >>>>>>>>> Collector<Record> collector) throws Exception { >>>>>>>>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", >>>>>>>>> label, context.timestamp(), >>>>>>>>> context.timerService().currentWatermark(), record); >>>>>>>>> collector.collect(deliveryLog); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>