Thanks a lot.
Just a clarification, it's not the Kafka source that is configured
AT_LEAST_ONCE, it is the Flink checkpointing mode as a whole, for all
operations.
This has no effect on regular operations, only on recovery records may be
send multiple times... but it leads to lower latency. I guess this makes
sense in your case, since you are deduping based on a unique key.

For the longer checkpoints, adjusting timeouts makes sense.

On Tue, Nov 3, 2020 at 6:04 PM John Smith <java.dev....@gmail.com> wrote:

> Kafka source is configured as AT_LEAST_ONCE and the JDBC sink handles
> duplicates with unique key/constraint and logs duplicates in a separate SQL
> table. And essentially it gives us EXACTLY_ONCE semantics.
>
> That's not a problem, it works great!
>
> 1- I was curious if that specific Kafka message was the cause of the
> duplicates, but if I understand correctly Becket it's not the source of the
> duplicates and I wanted to confirm that.
> 2- I started monitoring checkpoints on average they are 100ms, during peak
> we started seeing checkpoints takie 20s-40s+... My checkpoint is configed
> as follows:
>      - env.enableCheckpointing(60000);
>      -
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
>      -
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>      - env.getCheckpointConfig().setCheckpointTimeout(60000);
>      - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
> 3- Based on above it's possible that the sink takes longer than 60seconds
> sometimes...
>     - Looking at adjusting timeouts.
>     - Looking at reducing the load of the sink and reduce how long it
> takes in general.
>
> On Tue, 3 Nov 2020 at 10:49, Robert Metzger <rmetz...@apache.org> wrote:
>
>> How did you configure the Kafka source as at least once? Afaik the source
>> is always exactly-once (as long as there aren't any restarts).
>>
>> Are you seeing the duplicates in the context of restarts of the Flink job?
>>
>> On Tue, Nov 3, 2020 at 1:54 AM John Smith <java.dev....@gmail.com> wrote:
>>
>>> Sorry, got confused with your reply... Does the message "Error sending
>>> fetch request" cause retries/duplicates down stream or it doesn't?
>>>
>>> I'm guessing it's even before the source can even send anything
>>> downstream...
>>>
>>>
>>> On Sat, 31 Oct 2020 at 09:10, John Smith <java.dev....@gmail.com> wrote:
>>>
>>>> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>>>>
>>>> Kafka Source is configured as at least once and JDBC prevents
>>>> duplicates with unique key constraint and duplicate is logged in separate
>>>> table. So the destination data is exactly once.
>>>>
>>>> The duplicates happen every so often, looking at check point history
>>>> there was some checkpoints that failed, but the history isn't long enough
>>>> to go back and look. I'm guessing I will have to adjust the checkpointing
>>>> times a bit...
>>>>
>>>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <becket....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> The log message you saw from Kafka consumer simply means the consumer
>>>>> was disconnected from the broker that FetchRequest was supposed to be sent
>>>>> to. The disconnection can happen in many cases, such as broker down,
>>>>> network glitches, etc. The KafkaConsumer will just reconnect and retry
>>>>> sending that FetchRequest again. This won't cause duplicate messages in
>>>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your
>>>>> Kafka sink? If not, the downstream might see duplicates in case of Flink
>>>>> failover or occasional retry in the KafkaProducer of the Kafka sink.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Any thoughts this doesn't seem to create duplicates all the time or
>>>>>> maybe it's unrelated as we are still seeing the message and there is no
>>>>>> duplicates...
>>>>>>
>>>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <
>>>>>> java.dev....@gmail.com> wrote:
>>>>>>
>>>>>>> And yes my downstream is handling the duplicates in an idempotent
>>>>>>> way so we are good on that point. But just curious what the behaviour 
>>>>>>> is on
>>>>>>> the source consumer when that error happens.
>>>>>>>
>>>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <java.dev....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-
>>>>>>>> 10-21 15:48:57,625 INFO org.apache.kafka.clients.
>>>>>>>> FetchSessionHandler - [Consumer clientId=consumer-2,
>>>>>>>> groupId=xxxxxx-import] Error sending fetch request (sessionId=
>>>>>>>> 806089934, epoch=INITIAL) to node 0:
>>>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>>>
>>>>>>>> Obviously it looks like the consumer is getting disconnected and
>>>>>>>> from what it seems it's either a Kafka bug on the way it handles the 
>>>>>>>> EPOCH
>>>>>>>> or possibly version mismatch between client and brokers. That's fine I 
>>>>>>>> can
>>>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>>>>
>>>>>>>

Reply via email to