How did you configure the Kafka source as at least once? Afaik the source
is always exactly-once (as long as there aren't any restarts).

Are you seeing the duplicates in the context of restarts of the Flink job?

On Tue, Nov 3, 2020 at 1:54 AM John Smith <java.dev....@gmail.com> wrote:

> Sorry, got confused with your reply... Does the message "Error sending
> fetch request" cause retries/duplicates down stream or it doesn't?
>
> I'm guessing it's even before the source can even send anything
> downstream...
>
>
> On Sat, 31 Oct 2020 at 09:10, John Smith <java.dev....@gmail.com> wrote:
>
>> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>>
>> Kafka Source is configured as at least once and JDBC prevents duplicates
>> with unique key constraint and duplicate is logged in separate table. So
>> the destination data is exactly once.
>>
>> The duplicates happen every so often, looking at check point history
>> there was some checkpoints that failed, but the history isn't long enough
>> to go back and look. I'm guessing I will have to adjust the checkpointing
>> times a bit...
>>
>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <becket....@gmail.com>
>> wrote:
>>
>>> Hi John,
>>>
>>> The log message you saw from Kafka consumer simply means the consumer
>>> was disconnected from the broker that FetchRequest was supposed to be sent
>>> to. The disconnection can happen in many cases, such as broker down,
>>> network glitches, etc. The KafkaConsumer will just reconnect and retry
>>> sending that FetchRequest again. This won't cause duplicate messages in
>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your
>>> Kafka sink? If not, the downstream might see duplicates in case of Flink
>>> failover or occasional retry in the KafkaProducer of the Kafka sink.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> Any thoughts this doesn't seem to create duplicates all the time or
>>>> maybe it's unrelated as we are still seeing the message and there is no
>>>> duplicates...
>>>>
>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <java.dev....@gmail.com>
>>>> wrote:
>>>>
>>>>> And yes my downstream is handling the duplicates in an idempotent way
>>>>> so we are good on that point. But just curious what the behaviour is on 
>>>>> the
>>>>> source consumer when that error happens.
>>>>>
>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10
>>>>>> -21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler -
>>>>>> [Consumer clientId=consumer-2, groupId=xxxxxx-import] Error sending
>>>>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>
>>>>>> Obviously it looks like the consumer is getting disconnected and from
>>>>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>>>>>> possibly version mismatch between client and brokers. That's fine I can
>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>>
>>>>>

Reply via email to