I'll try forcing # source tasks = # partitions tomorrow.

Thank you, Dongwon, for all of your help!

On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com> wrote:

> I believe your job with per-partition watermarking should be working okay
> even in a backfill scenario.
>
> BTW, is the problem still observed even with # sour tasks = # partitions?
>
> For committers:
> Is there a way to confirm that per-partition watermarking is used in TM
> log?
>
> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> I hit this using event processing and no idleness detection.  The same
>> issue happens if I enable idleness.
>>
>> My code matches the code example for per-partition watermarking
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>
>> .
>>
>>
>> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi Dan,
>>>
>>> I'm quite confused as you already use per-partition watermarking.
>>>
>>> What I meant in the reply is
>>> - If you don't use per-partition watermarking, # tasks < # partitions
>>> can cause the problem for backfill jobs.
>>> - If you don't use per-partition watermarking, # tasks = # partitions is
>>> going to be okay even for backfill jobs.
>>> - If you use per-partition watermarking, # tasks < # partitions
>>> shouldn't cause any problems unless you turn on the idleness detection.
>>>
>>> Regarding the idleness detection which is based on processing time, what
>>> is your setting? If you set the value to 10 seconds for example, you'll
>>> face the same problem unless the watermark of your backfill job catches
>>> up real-time within 10 seconds. If you increase the value to 1 minute, your
>>> backfill job should catch up real-time within 1 minute.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>
>>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote:
>>>
>>>> Thanks Dongwon!
>>>>
>>>> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
>>>> tasks < # kafka partitions.  This should be called out in the docs or the
>>>> bug should be fixed.
>>>>
>>>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> Do you use the per-partition watermarking explained in [1]?
>>>>> I've also experienced a similar problem when running backfill jobs
>>>>> specifically when # source tasks < # kafka partitions.
>>>>> - When # source tasks = # kafka partitions, the backfill job works as
>>>>> expected.
>>>>> - When # source tasks < # kafka partitions, a Kafka consumer consumes
>>>>> multiple partitions. This case can destroying the per-partition patterns 
>>>>> as
>>>>> explained in [2].
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> p.s. If you plan to use the per-partition watermarking, be aware that
>>>>> idleness detection [3] can cause another problem when you run a backfill
>>>>> job. Kafka source tasks in a backfill job seem to read a batch of records
>>>>> from Kafka and then wait for downstream tasks to catch up the progress,
>>>>> which can be counted as idleness.
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>>>>> [2]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>>> [3]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm following the example from this section:
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>>>>
>>>>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Other points
>>>>>>> - I'm using the kafka timestamp as event time.
>>>>>>> - The same issue happens even if I use an idle watermark.
>>>>>>>
>>>>>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> There are 12 Kafka partitions (to keep the structure similar to
>>>>>>>> other low traffic environments).
>>>>>>>>
>>>>>>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi.
>>>>>>>>>
>>>>>>>>> I'm running a backfill from a kafka topic with very few records
>>>>>>>>> spread across a few days.  I'm seeing a case where the records coming 
>>>>>>>>> from
>>>>>>>>> a kafka source have a watermark that's more recent (by hours) than the
>>>>>>>>> event time.  I haven't seen this before when running this.  This 
>>>>>>>>> violates
>>>>>>>>> what I'd assume the kafka source would do.
>>>>>>>>>
>>>>>>>>> Example problem:
>>>>>>>>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual
>>>>>>>>> times are separated by a longer time period.
>>>>>>>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>>>>>>>    context.timestamp() = 1000
>>>>>>>>>    context.timerService().currentWatermark() = 500000
>>>>>>>>>
>>>>>>>>> Details about how I'm running this:
>>>>>>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
>>>>>>>>> source.
>>>>>>>>> - I'm using FlinkKafkaConsumer
>>>>>>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>>>>>>>> idleness settings.
>>>>>>>>> - I'm running similar code in all the environments.  The main
>>>>>>>>> difference is low traffic.  I have not been able to reproduce this 
>>>>>>>>> out of
>>>>>>>>> the environment.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I put the following process function right after my kafka source.
>>>>>>>>>
>>>>>>>>> --------
>>>>>>>>>
>>>>>>>>> AfterSource
>>>>>>>>> ts=1647274892728
>>>>>>>>> watermark=1647575140007
>>>>>>>>> record=...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public static class TextLog extends ProcessFunction<Record,
>>>>>>>>> Record> {
>>>>>>>>>     private final String label;
>>>>>>>>>     public TextLogDeliveryLog(String label) {
>>>>>>>>>         this.label = label;
>>>>>>>>>     }
>>>>>>>>>     @Override
>>>>>>>>>     public void processElement(Record record, Context context,
>>>>>>>>> Collector<Record> collector) throws Exception {
>>>>>>>>>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>>>>>>>>                 label, context.timestamp(),
>>>>>>>>> context.timerService().currentWatermark(), record);
>>>>>>>>>         collector.collect(deliveryLog);
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>

Reply via email to