What version of kafka connectors are you using for 1.18 and what for 1.20?

BR,
G


On Tue, Apr 1, 2025 at 4:02 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> I would suggest allowNonRestoredState=true only if data loss or replay is
> acceptable since it will drop the Kafka part of the state.
> I see some changes in KafkaSourceEnumStateSerializer but that said it
> would be good to have a repro app.
>
> BR,
> G
>
>
> On Tue, Apr 1, 2025 at 3:59 PM jasvendra kumar <javatech....@gmail.com>
> wrote:
>
>> Dear Yang,
>>
>> Thank you for your response and suggestion.
>>
>> I have already tried using allowNonRestoredState=true, but the issue
>> still persists. Changing the *operator ID* of the Kafka source is
>> something I haven’t tested yet. I will attempt this and see if it resolves
>> the partition assignment issue.
>>
>> *Additionally, I would like to highlight an observation that might be
>> relevant:*
>>
>>    -
>>
>>    I noticed the following *warning logs* appearing *only in Flink
>>    1.20.1*. These logs were not present in Flink 1.18.0. *Could this be
>>    related to the Kafka source state issue?*
>>
>>    Name Collision: Group already contains a Metric with the name 
>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>> Committer, *0*]
>>    Name Collision: Group already contains a Metric with the name 
>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>> Committer, *1*]
>>    Name Collision: Group already contains a Metric with the name 
>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>> Committer, *2*]
>>
>>
>> Regarding the root cause, do you have any insights on what might have
>> changed in *Flink 1.20.1* that could lead to this behavior?
>> Specifically, are there any known changes in *Kafka source state
>> handling* that could impact partition restoration from a savepoint?
>>
>> Looking forward to your thoughts. Thank you
>>
>> Best regards,
>> Jasvendra
>>
>> On Tue, Apr 1, 2025 at 6:40 PM Yang LI <yang.hunter...@gmail.com> wrote:
>>
>>> Hi Jasvendra,
>>>
>>> From what I’m hearing, it sounds like a Kafka source state issue. As a
>>> workaround, in my humble opinion, you could try changing the operator ID of
>>> your Kafka source operator and re-deploying it with
>>> allowNonRestoredState=true to discard the existing Kafka source state.
>>>
>>> As for the root cause of the Kafka source state issue, that would
>>> definitely require further investigation.
>>>
>>> BR,
>>> Yang
>>>
>>> On Tue, 1 Apr 2025 at 14:46, Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jasvendra,
>>>>
>>>> In short 1.18 savepoint should be compatible from 1.20.
>>>> We don't know such existing issue. Can you please come up with a bare
>>>> minimal step-by-step or public repo where one can repro it easily?
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Tue, Apr 1, 2025 at 2:37 PM jasvendra kumar <javatech....@gmail.com>
>>>> wrote:
>>>>
>>>>> Dear Flink Community,
>>>>>
>>>>> I am currently in the process of upgrading our Flink cluster from *version
>>>>> 1.18.0 to 1.20.1*. The cluster itself is functioning correctly
>>>>> post-upgrade, and I am able to deploy Flink jobs successfully. However, I
>>>>> have encountered an issue when attempting to restore a job using a 
>>>>> *savepoint
>>>>> or state taken from Flink 1.18.0*.
>>>>> *Issue Description*
>>>>>
>>>>>    -
>>>>>
>>>>>    When deploying the Flink job to the *Flink 1.20.1 cluster* using a 
>>>>> *savepoint
>>>>>    from Flink 1.18.0*, the job is assigned *only one Kafka partition
>>>>>    (partition 0)*. As a result, messages from the other partitions
>>>>>    are not being consumed.
>>>>>    -
>>>>>
>>>>>    However, if I deploy the same job *without a savepoint*, the job
>>>>>    correctly assigns all three partitions (*0, 1, 2*) and consumes
>>>>>    messages as expected.
>>>>>
>>>>> I have researched this issue extensively but have not found a clear
>>>>> explanation. I would appreciate any guidance on the following queries:
>>>>>
>>>>>    1.
>>>>>
>>>>>    *Is this issue related to the compatibility of savepoint
>>>>>    restoration between Flink 1.18.0 and Flink 1.20.1?*
>>>>>    2.
>>>>>
>>>>>    *Is this behavior a known bug or an expected outcome?*
>>>>>    3.
>>>>>
>>>>>    *If this is a bug, what are the recommended steps to resolve it?*
>>>>>    -
>>>>>
>>>>>       Are there any configuration changes required to properly
>>>>>       restore partitions?
>>>>>       -
>>>>>
>>>>>       Would fixing this require modifications in the application code?
>>>>>
>>>>> Your insights and assistance on this matter would be highly
>>>>> appreciated.
>>>>>
>>>>> Thanks & Regards
>>>>> Jasvendra Kumar
>>>>>
>>>>

Reply via email to