Hi Gabor,

Please find below info.
*Flink version 1.18 *
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    *<version>3.2.0-1.18</version>*
</dependency>

*Flink version : 1.20.1*
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
   * <version>3.3.0-1.20</version>*
</dependency>

Thank you
Jasvendra

On Tue, Apr 1, 2025 at 7:45 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> What version of kafka connectors are you using for 1.18 and what for 1.20?
>
> BR,
> G
>
>
> On Tue, Apr 1, 2025 at 4:02 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> I would suggest allowNonRestoredState=true only if data loss or replay is
>> acceptable since it will drop the Kafka part of the state.
>> I see some changes in KafkaSourceEnumStateSerializer but that said it
>> would be good to have a repro app.
>>
>> BR,
>> G
>>
>>
>> On Tue, Apr 1, 2025 at 3:59 PM jasvendra kumar <javatech....@gmail.com>
>> wrote:
>>
>>> Dear Yang,
>>>
>>> Thank you for your response and suggestion.
>>>
>>> I have already tried using allowNonRestoredState=true, but the issue
>>> still persists. Changing the *operator ID* of the Kafka source is
>>> something I haven’t tested yet. I will attempt this and see if it resolves
>>> the partition assignment issue.
>>>
>>> *Additionally, I would like to highlight an observation that might be
>>> relevant:*
>>>
>>>    -
>>>
>>>    I noticed the following *warning logs* appearing *only in Flink
>>>    1.20.1*. These logs were not present in Flink 1.18.0. *Could this be
>>>    related to the Kafka source state issue?*
>>>
>>>    Name Collision: Group already contains a Metric with the name 
>>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>>> Committer, *0*]
>>>    Name Collision: Group already contains a Metric with the name 
>>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>>> Committer, *1*]
>>>    Name Collision: Group already contains a Metric with the name 
>>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>>> Committer, *2*]
>>>
>>>
>>> Regarding the root cause, do you have any insights on what might have
>>> changed in *Flink 1.20.1* that could lead to this behavior?
>>> Specifically, are there any known changes in *Kafka source state
>>> handling* that could impact partition restoration from a savepoint?
>>>
>>> Looking forward to your thoughts. Thank you
>>>
>>> Best regards,
>>> Jasvendra
>>>
>>> On Tue, Apr 1, 2025 at 6:40 PM Yang LI <yang.hunter...@gmail.com> wrote:
>>>
>>>> Hi Jasvendra,
>>>>
>>>> From what I’m hearing, it sounds like a Kafka source state issue. As a
>>>> workaround, in my humble opinion, you could try changing the operator ID of
>>>> your Kafka source operator and re-deploying it with
>>>> allowNonRestoredState=true to discard the existing Kafka source state.
>>>>
>>>> As for the root cause of the Kafka source state issue, that would
>>>> definitely require further investigation.
>>>>
>>>> BR,
>>>> Yang
>>>>
>>>> On Tue, 1 Apr 2025 at 14:46, Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jasvendra,
>>>>>
>>>>> In short 1.18 savepoint should be compatible from 1.20.
>>>>> We don't know such existing issue. Can you please come up with a bare
>>>>> minimal step-by-step or public repo where one can repro it easily?
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Tue, Apr 1, 2025 at 2:37 PM jasvendra kumar <javatech....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Dear Flink Community,
>>>>>>
>>>>>> I am currently in the process of upgrading our Flink cluster from 
>>>>>> *version
>>>>>> 1.18.0 to 1.20.1*. The cluster itself is functioning correctly
>>>>>> post-upgrade, and I am able to deploy Flink jobs successfully. However, I
>>>>>> have encountered an issue when attempting to restore a job using a 
>>>>>> *savepoint
>>>>>> or state taken from Flink 1.18.0*.
>>>>>> *Issue Description*
>>>>>>
>>>>>>    -
>>>>>>
>>>>>>    When deploying the Flink job to the *Flink 1.20.1 cluster* using
>>>>>>    a *savepoint from Flink 1.18.0*, the job is assigned *only one
>>>>>>    Kafka partition (partition 0)*. As a result, messages from the
>>>>>>    other partitions are not being consumed.
>>>>>>    -
>>>>>>
>>>>>>    However, if I deploy the same job *without a savepoint*, the job
>>>>>>    correctly assigns all three partitions (*0, 1, 2*) and consumes
>>>>>>    messages as expected.
>>>>>>
>>>>>> I have researched this issue extensively but have not found a clear
>>>>>> explanation. I would appreciate any guidance on the following queries:
>>>>>>
>>>>>>    1.
>>>>>>
>>>>>>    *Is this issue related to the compatibility of savepoint
>>>>>>    restoration between Flink 1.18.0 and Flink 1.20.1?*
>>>>>>    2.
>>>>>>
>>>>>>    *Is this behavior a known bug or an expected outcome?*
>>>>>>    3.
>>>>>>
>>>>>>    *If this is a bug, what are the recommended steps to resolve it?*
>>>>>>    -
>>>>>>
>>>>>>       Are there any configuration changes required to properly
>>>>>>       restore partitions?
>>>>>>       -
>>>>>>
>>>>>>       Would fixing this require modifications in the application
>>>>>>       code?
>>>>>>
>>>>>> Your insights and assistance on this matter would be highly
>>>>>> appreciated.
>>>>>>
>>>>>> Thanks & Regards
>>>>>> Jasvendra Kumar
>>>>>>
>>>>>

-- 
Thanks & Regards
Jasvendra

Reply via email to