There are 12 Kafka partitions (to keep the structure similar to other low traffic environments).
On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote: > Hi. > > I'm running a backfill from a kafka topic with very few records spread > across a few days. I'm seeing a case where the records coming from a kafka > source have a watermark that's more recent (by hours) than the event time. > I haven't seen this before when running this. This violates what I'd > assume the kafka source would do. > > Example problem: > 1. I have kafka records at ts=1000, 2000, ... 500000. The actual times > are separated by a longer time period. > 2. My first operator after the FlinkKafkaConsumer sees: > context.timestamp() = 1000 > context.timerService().currentWatermark() = 500000 > > Details about how I'm running this: > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source. > - I'm using FlinkKafkaConsumer > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No idleness > settings. > - I'm running similar code in all the environments. The main difference > is low traffic. I have not been able to reproduce this out of the > environment. > > > I put the following process function right after my kafka source. > > -------- > > AfterSource > ts=1647274892728 > watermark=1647575140007 > record=... > > > public static class TextLog extends ProcessFunction<Record, Record> { > private final String label; > public TextLogDeliveryLog(String label) { > this.label = label; > } > @Override > public void processElement(Record record, Context context, > Collector<Record> collector) throws Exception { > LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", > label, context.timestamp(), > context.timerService().currentWatermark(), record); > collector.collect(deliveryLog); > } > } >