Kafka source is configured as AT_LEAST_ONCE and the JDBC sink handles duplicates with unique key/constraint and logs duplicates in a separate SQL table. And essentially it gives us EXACTLY_ONCE semantics.
That's not a problem, it works great! 1- I was curious if that specific Kafka message was the cause of the duplicates, but if I understand correctly Becket it's not the source of the duplicates and I wanted to confirm that. 2- I started monitoring checkpoints on average they are 100ms, during peak we started seeing checkpoints takie 20s-40s+... My checkpoint is configed as follows: - env.enableCheckpointing(60000); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); - env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - env.getCheckpointConfig().setCheckpointTimeout(60000); - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); 3- Based on above it's possible that the sink takes longer than 60seconds sometimes... - Looking at adjusting timeouts. - Looking at reducing the load of the sink and reduce how long it takes in general. On Tue, 3 Nov 2020 at 10:49, Robert Metzger <rmetz...@apache.org> wrote: > How did you configure the Kafka source as at least once? Afaik the source > is always exactly-once (as long as there aren't any restarts). > > Are you seeing the duplicates in the context of restarts of the Flink job? > > On Tue, Nov 3, 2020 at 1:54 AM John Smith <java.dev....@gmail.com> wrote: > >> Sorry, got confused with your reply... Does the message "Error sending >> fetch request" cause retries/duplicates down stream or it doesn't? >> >> I'm guessing it's even before the source can even send anything >> downstream... >> >> >> On Sat, 31 Oct 2020 at 09:10, John Smith <java.dev....@gmail.com> wrote: >> >>> Hi my flow is Kafka Source -> Transform -> JDBC Sink >>> >>> Kafka Source is configured as at least once and JDBC prevents duplicates >>> with unique key constraint and duplicate is logged in separate table. So >>> the destination data is exactly once. >>> >>> The duplicates happen every so often, looking at check point history >>> there was some checkpoints that failed, but the history isn't long enough >>> to go back and look. I'm guessing I will have to adjust the checkpointing >>> times a bit... >>> >>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <becket....@gmail.com> >>> wrote: >>> >>>> Hi John, >>>> >>>> The log message you saw from Kafka consumer simply means the consumer >>>> was disconnected from the broker that FetchRequest was supposed to be sent >>>> to. The disconnection can happen in many cases, such as broker down, >>>> network glitches, etc. The KafkaConsumer will just reconnect and retry >>>> sending that FetchRequest again. This won't cause duplicate messages in >>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your >>>> Kafka sink? If not, the downstream might see duplicates in case of Flink >>>> failover or occasional retry in the KafkaProducer of the Kafka sink. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <java.dev....@gmail.com> >>>> wrote: >>>> >>>>> Any thoughts this doesn't seem to create duplicates all the time or >>>>> maybe it's unrelated as we are still seeing the message and there is no >>>>> duplicates... >>>>> >>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> And yes my downstream is handling the duplicates in an idempotent way >>>>>> so we are good on that point. But just curious what the behaviour is on >>>>>> the >>>>>> source consumer when that error happens. >>>>>> >>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020- >>>>>>> 10-21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler >>>>>>> - [Consumer clientId=consumer-2, groupId=xxxxxx-import] Error >>>>>>> sending fetch request (sessionId=806089934, epoch=INITIAL) to node 0: >>>>>>> org.apache.kafka.common.errors.DisconnectException. >>>>>>> >>>>>>> Obviously it looks like the consumer is getting disconnected and >>>>>>> from what it seems it's either a Kafka bug on the way it handles the >>>>>>> EPOCH >>>>>>> or possibly version mismatch between client and brokers. That's fine I >>>>>>> can >>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand >>>>>>> what happens in terms of the source and the sink. It looks let we get >>>>>>> duplicates on the sink and I'm guessing it's because the consumer is >>>>>>> failing and at that point Flink stays on that checkpoint until it can >>>>>>> reconnect and process that offset and hence the duplicates downstream? >>>>>>> >>>>>>