Between the two versions this [1] change came in which might effect the
deserialization logic.
There is an obvious bug in it, namely TopicPartitionAndAssignmentStatus has
no equals and hashcode
methods which breaks the set contract (practically the same topic/partition
can be put into the set)
but I don't think that it should end up in an issue like what you've
described.

[1] https://github.com/apache/flink-connector-kafka/pull/116

BR,
G


On Tue, Apr 1, 2025 at 4:22 PM jasvendra kumar <javatech....@gmail.com>
wrote:

> Hi Gabor,
>
> Please find below info.
> *Flink version 1.18 *
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-connector-kafka</artifactId>
>     *<version>3.2.0-1.18</version>*
> </dependency>
>
> *Flink version : 1.20.1*
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-connector-kafka</artifactId>
>    * <version>3.3.0-1.20</version>*
> </dependency>
>
> Thank you
> Jasvendra
>
> On Tue, Apr 1, 2025 at 7:45 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> What version of kafka connectors are you using for 1.18 and what for 1.20?
>>
>> BR,
>> G
>>
>>
>> On Tue, Apr 1, 2025 at 4:02 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> I would suggest allowNonRestoredState=true only if data loss or replay
>>> is acceptable since it will drop the Kafka part of the state.
>>> I see some changes in KafkaSourceEnumStateSerializer but that said it
>>> would be good to have a repro app.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Tue, Apr 1, 2025 at 3:59 PM jasvendra kumar <javatech....@gmail.com>
>>> wrote:
>>>
>>>> Dear Yang,
>>>>
>>>> Thank you for your response and suggestion.
>>>>
>>>> I have already tried using allowNonRestoredState=true, but the issue
>>>> still persists. Changing the *operator ID* of the Kafka source is
>>>> something I haven’t tested yet. I will attempt this and see if it resolves
>>>> the partition assignment issue.
>>>>
>>>> *Additionally, I would like to highlight an observation that might be
>>>> relevant:*
>>>>
>>>>    -
>>>>
>>>>    I noticed the following *warning logs* appearing *only in Flink
>>>>    1.20.1*. These logs were not present in Flink 1.18.0. *Could this
>>>>    be related to the Kafka source state issue?*
>>>>
>>>>    Name Collision: Group already contains a Metric with the name 
>>>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>>>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>>>> Committer, *0*]
>>>>    Name Collision: Group already contains a Metric with the name 
>>>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>>>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>>>> Committer, *1*]
>>>>    Name Collision: Group already contains a Metric with the name 
>>>> 'pendingCommittables'. Metric will not be reported. [IP address, 
>>>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: 
>>>> Committer, *2*]
>>>>
>>>>
>>>> Regarding the root cause, do you have any insights on what might have
>>>> changed in *Flink 1.20.1* that could lead to this behavior?
>>>> Specifically, are there any known changes in *Kafka source state
>>>> handling* that could impact partition restoration from a savepoint?
>>>>
>>>> Looking forward to your thoughts. Thank you
>>>>
>>>> Best regards,
>>>> Jasvendra
>>>>
>>>> On Tue, Apr 1, 2025 at 6:40 PM Yang LI <yang.hunter...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jasvendra,
>>>>>
>>>>> From what I’m hearing, it sounds like a Kafka source state issue. As a
>>>>> workaround, in my humble opinion, you could try changing the operator ID 
>>>>> of
>>>>> your Kafka source operator and re-deploying it with
>>>>> allowNonRestoredState=true to discard the existing Kafka source state.
>>>>>
>>>>> As for the root cause of the Kafka source state issue, that would
>>>>> definitely require further investigation.
>>>>>
>>>>> BR,
>>>>> Yang
>>>>>
>>>>> On Tue, 1 Apr 2025 at 14:46, Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jasvendra,
>>>>>>
>>>>>> In short 1.18 savepoint should be compatible from 1.20.
>>>>>> We don't know such existing issue. Can you please come up with a bare
>>>>>> minimal step-by-step or public repo where one can repro it easily?
>>>>>>
>>>>>> BR,
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 1, 2025 at 2:37 PM jasvendra kumar <
>>>>>> javatech....@gmail.com> wrote:
>>>>>>
>>>>>>> Dear Flink Community,
>>>>>>>
>>>>>>> I am currently in the process of upgrading our Flink cluster from 
>>>>>>> *version
>>>>>>> 1.18.0 to 1.20.1*. The cluster itself is functioning correctly
>>>>>>> post-upgrade, and I am able to deploy Flink jobs successfully. However, 
>>>>>>> I
>>>>>>> have encountered an issue when attempting to restore a job using a 
>>>>>>> *savepoint
>>>>>>> or state taken from Flink 1.18.0*.
>>>>>>> *Issue Description*
>>>>>>>
>>>>>>>    -
>>>>>>>
>>>>>>>    When deploying the Flink job to the *Flink 1.20.1 cluster* using
>>>>>>>    a *savepoint from Flink 1.18.0*, the job is assigned *only one
>>>>>>>    Kafka partition (partition 0)*. As a result, messages from the
>>>>>>>    other partitions are not being consumed.
>>>>>>>    -
>>>>>>>
>>>>>>>    However, if I deploy the same job *without a savepoint*, the job
>>>>>>>    correctly assigns all three partitions (*0, 1, 2*) and consumes
>>>>>>>    messages as expected.
>>>>>>>
>>>>>>> I have researched this issue extensively but have not found a clear
>>>>>>> explanation. I would appreciate any guidance on the following queries:
>>>>>>>
>>>>>>>    1.
>>>>>>>
>>>>>>>    *Is this issue related to the compatibility of savepoint
>>>>>>>    restoration between Flink 1.18.0 and Flink 1.20.1?*
>>>>>>>    2.
>>>>>>>
>>>>>>>    *Is this behavior a known bug or an expected outcome?*
>>>>>>>    3.
>>>>>>>
>>>>>>>    *If this is a bug, what are the recommended steps to resolve it?*
>>>>>>>    -
>>>>>>>
>>>>>>>       Are there any configuration changes required to properly
>>>>>>>       restore partitions?
>>>>>>>       -
>>>>>>>
>>>>>>>       Would fixing this require modifications in the application
>>>>>>>       code?
>>>>>>>
>>>>>>> Your insights and assistance on this matter would be highly
>>>>>>> appreciated.
>>>>>>>
>>>>>>> Thanks & Regards
>>>>>>> Jasvendra Kumar
>>>>>>>
>>>>>>
>
> --
> Thanks & Regards
> Jasvendra
>
>

Reply via email to