Thanks a lot. Just a clarification, it's not the Kafka source that is configured AT_LEAST_ONCE, it is the Flink checkpointing mode as a whole, for all operations. This has no effect on regular operations, only on recovery records may be send multiple times... but it leads to lower latency. I guess this makes sense in your case, since you are deduping based on a unique key.
For the longer checkpoints, adjusting timeouts makes sense. On Tue, Nov 3, 2020 at 6:04 PM John Smith <java.dev....@gmail.com> wrote: > Kafka source is configured as AT_LEAST_ONCE and the JDBC sink handles > duplicates with unique key/constraint and logs duplicates in a separate SQL > table. And essentially it gives us EXACTLY_ONCE semantics. > > That's not a problem, it works great! > > 1- I was curious if that specific Kafka message was the cause of the > duplicates, but if I understand correctly Becket it's not the source of the > duplicates and I wanted to confirm that. > 2- I started monitoring checkpoints on average they are 100ms, during peak > we started seeing checkpoints takie 20s-40s+... My checkpoint is configed > as follows: > - env.enableCheckpointing(60000); > - > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > - > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > - env.getCheckpointConfig().setCheckpointTimeout(60000); > - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); > 3- Based on above it's possible that the sink takes longer than 60seconds > sometimes... > - Looking at adjusting timeouts. > - Looking at reducing the load of the sink and reduce how long it > takes in general. > > On Tue, 3 Nov 2020 at 10:49, Robert Metzger <rmetz...@apache.org> wrote: > >> How did you configure the Kafka source as at least once? Afaik the source >> is always exactly-once (as long as there aren't any restarts). >> >> Are you seeing the duplicates in the context of restarts of the Flink job? >> >> On Tue, Nov 3, 2020 at 1:54 AM John Smith <java.dev....@gmail.com> wrote: >> >>> Sorry, got confused with your reply... Does the message "Error sending >>> fetch request" cause retries/duplicates down stream or it doesn't? >>> >>> I'm guessing it's even before the source can even send anything >>> downstream... >>> >>> >>> On Sat, 31 Oct 2020 at 09:10, John Smith <java.dev....@gmail.com> wrote: >>> >>>> Hi my flow is Kafka Source -> Transform -> JDBC Sink >>>> >>>> Kafka Source is configured as at least once and JDBC prevents >>>> duplicates with unique key constraint and duplicate is logged in separate >>>> table. So the destination data is exactly once. >>>> >>>> The duplicates happen every so often, looking at check point history >>>> there was some checkpoints that failed, but the history isn't long enough >>>> to go back and look. I'm guessing I will have to adjust the checkpointing >>>> times a bit... >>>> >>>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <becket....@gmail.com> >>>> wrote: >>>> >>>>> Hi John, >>>>> >>>>> The log message you saw from Kafka consumer simply means the consumer >>>>> was disconnected from the broker that FetchRequest was supposed to be sent >>>>> to. The disconnection can happen in many cases, such as broker down, >>>>> network glitches, etc. The KafkaConsumer will just reconnect and retry >>>>> sending that FetchRequest again. This won't cause duplicate messages in >>>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your >>>>> Kafka sink? If not, the downstream might see duplicates in case of Flink >>>>> failover or occasional retry in the KafkaProducer of the Kafka sink. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Any thoughts this doesn't seem to create duplicates all the time or >>>>>> maybe it's unrelated as we are still seeing the message and there is no >>>>>> duplicates... >>>>>> >>>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, < >>>>>> java.dev....@gmail.com> wrote: >>>>>> >>>>>>> And yes my downstream is handling the duplicates in an idempotent >>>>>>> way so we are good on that point. But just curious what the behaviour >>>>>>> is on >>>>>>> the source consumer when that error happens. >>>>>>> >>>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <java.dev....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020- >>>>>>>> 10-21 15:48:57,625 INFO org.apache.kafka.clients. >>>>>>>> FetchSessionHandler - [Consumer clientId=consumer-2, >>>>>>>> groupId=xxxxxx-import] Error sending fetch request (sessionId= >>>>>>>> 806089934, epoch=INITIAL) to node 0: >>>>>>>> org.apache.kafka.common.errors.DisconnectException. >>>>>>>> >>>>>>>> Obviously it looks like the consumer is getting disconnected and >>>>>>>> from what it seems it's either a Kafka bug on the way it handles the >>>>>>>> EPOCH >>>>>>>> or possibly version mismatch between client and brokers. That's fine I >>>>>>>> can >>>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand >>>>>>>> what happens in terms of the source and the sink. It looks let we get >>>>>>>> duplicates on the sink and I'm guessing it's because the consumer is >>>>>>>> failing and at that point Flink stays on that checkpoint until it can >>>>>>>> reconnect and process that offset and hence the duplicates downstream? >>>>>>>> >>>>>>>