There are 12 Kafka partitions (to keep the structure similar to other low
traffic environments).

On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote:

> Hi.
>
> I'm running a backfill from a kafka topic with very few records spread
> across a few days.  I'm seeing a case where the records coming from a kafka
> source have a watermark that's more recent (by hours) than the event time.
> I haven't seen this before when running this.  This violates what I'd
> assume the kafka source would do.
>
> Example problem:
> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times
> are separated by a longer time period.
> 2.  My first operator after the FlinkKafkaConsumer sees:
>    context.timestamp() = 1000
>    context.timerService().currentWatermark() = 500000
>
> Details about how I'm running this:
> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
> - I'm using FlinkKafkaConsumer
> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
> settings.
> - I'm running similar code in all the environments.  The main difference
> is low traffic.  I have not been able to reproduce this out of the
> environment.
>
>
> I put the following process function right after my kafka source.
>
> --------
>
> AfterSource
> ts=1647274892728
> watermark=1647575140007
> record=...
>
>
> public static class TextLog extends ProcessFunction<Record, Record> {
>     private final String label;
>     public TextLogDeliveryLog(String label) {
>         this.label = label;
>     }
>     @Override
>     public void processElement(Record record, Context context,
> Collector<Record> collector) throws Exception {
>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>                 label, context.timestamp(),
> context.timerService().currentWatermark(), record);
>         collector.collect(deliveryLog);
>     }
> }
>

Reply via email to