How did you configure the Kafka source as at least once? Afaik the source is always exactly-once (as long as there aren't any restarts).
Are you seeing the duplicates in the context of restarts of the Flink job? On Tue, Nov 3, 2020 at 1:54 AM John Smith <java.dev....@gmail.com> wrote: > Sorry, got confused with your reply... Does the message "Error sending > fetch request" cause retries/duplicates down stream or it doesn't? > > I'm guessing it's even before the source can even send anything > downstream... > > > On Sat, 31 Oct 2020 at 09:10, John Smith <java.dev....@gmail.com> wrote: > >> Hi my flow is Kafka Source -> Transform -> JDBC Sink >> >> Kafka Source is configured as at least once and JDBC prevents duplicates >> with unique key constraint and duplicate is logged in separate table. So >> the destination data is exactly once. >> >> The duplicates happen every so often, looking at check point history >> there was some checkpoints that failed, but the history isn't long enough >> to go back and look. I'm guessing I will have to adjust the checkpointing >> times a bit... >> >> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <becket....@gmail.com> >> wrote: >> >>> Hi John, >>> >>> The log message you saw from Kafka consumer simply means the consumer >>> was disconnected from the broker that FetchRequest was supposed to be sent >>> to. The disconnection can happen in many cases, such as broker down, >>> network glitches, etc. The KafkaConsumer will just reconnect and retry >>> sending that FetchRequest again. This won't cause duplicate messages in >>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your >>> Kafka sink? If not, the downstream might see duplicates in case of Flink >>> failover or occasional retry in the KafkaProducer of the Kafka sink. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> Any thoughts this doesn't seem to create duplicates all the time or >>>> maybe it's unrelated as we are still seeing the message and there is no >>>> duplicates... >>>> >>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <java.dev....@gmail.com> >>>> wrote: >>>> >>>>> And yes my downstream is handling the duplicates in an idempotent way >>>>> so we are good on that point. But just curious what the behaviour is on >>>>> the >>>>> source consumer when that error happens. >>>>> >>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10 >>>>>> -21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - >>>>>> [Consumer clientId=consumer-2, groupId=xxxxxx-import] Error sending >>>>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0: >>>>>> org.apache.kafka.common.errors.DisconnectException. >>>>>> >>>>>> Obviously it looks like the consumer is getting disconnected and from >>>>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or >>>>>> possibly version mismatch between client and brokers. That's fine I can >>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand >>>>>> what happens in terms of the source and the sink. It looks let we get >>>>>> duplicates on the sink and I'm guessing it's because the consumer is >>>>>> failing and at that point Flink stays on that checkpoint until it can >>>>>> reconnect and process that offset and hence the duplicates downstream? >>>>>> >>>>>