I would suggest allowNonRestoredState=true only if data loss or replay is
acceptable since it will drop the Kafka part of the state.
I see some changes in KafkaSourceEnumStateSerializer but that said it would
be good to have a repro app.

BR,
G


On Tue, Apr 1, 2025 at 3:59 PM jasvendra kumar <javatech....@gmail.com>
wrote:

> Dear Yang,
>
> Thank you for your response and suggestion.
>
> I have already tried using allowNonRestoredState=true, but the issue
> still persists. Changing the *operator ID* of the Kafka source is
> something I haven’t tested yet. I will attempt this and see if it resolves
> the partition assignment issue.
>
> *Additionally, I would like to highlight an observation that might be
> relevant:*
>
>    -
>
>    I noticed the following *warning logs* appearing *only in Flink 1.20.1*.
>    These logs were not present in Flink 1.18.0. *Could this be related to
>    the Kafka source state issue?*
>
>    Name Collision: Group already contains a Metric with the name 
> 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, 
> flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *0*]
>    Name Collision: Group already contains a Metric with the name 
> 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, 
> flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *1*]
>    Name Collision: Group already contains a Metric with the name 
> 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, 
> flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *2*]
>
>
> Regarding the root cause, do you have any insights on what might have
> changed in *Flink 1.20.1* that could lead to this behavior? Specifically,
> are there any known changes in *Kafka source state handling* that could
> impact partition restoration from a savepoint?
>
> Looking forward to your thoughts. Thank you
>
> Best regards,
> Jasvendra
>
> On Tue, Apr 1, 2025 at 6:40 PM Yang LI <yang.hunter...@gmail.com> wrote:
>
>> Hi Jasvendra,
>>
>> From what I’m hearing, it sounds like a Kafka source state issue. As a
>> workaround, in my humble opinion, you could try changing the operator ID of
>> your Kafka source operator and re-deploying it with
>> allowNonRestoredState=true to discard the existing Kafka source state.
>>
>> As for the root cause of the Kafka source state issue, that would
>> definitely require further investigation.
>>
>> BR,
>> Yang
>>
>> On Tue, 1 Apr 2025 at 14:46, Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Hi Jasvendra,
>>>
>>> In short 1.18 savepoint should be compatible from 1.20.
>>> We don't know such existing issue. Can you please come up with a bare
>>> minimal step-by-step or public repo where one can repro it easily?
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Tue, Apr 1, 2025 at 2:37 PM jasvendra kumar <javatech....@gmail.com>
>>> wrote:
>>>
>>>> Dear Flink Community,
>>>>
>>>> I am currently in the process of upgrading our Flink cluster from *version
>>>> 1.18.0 to 1.20.1*. The cluster itself is functioning correctly
>>>> post-upgrade, and I am able to deploy Flink jobs successfully. However, I
>>>> have encountered an issue when attempting to restore a job using a 
>>>> *savepoint
>>>> or state taken from Flink 1.18.0*.
>>>> *Issue Description*
>>>>
>>>>    -
>>>>
>>>>    When deploying the Flink job to the *Flink 1.20.1 cluster* using a 
>>>> *savepoint
>>>>    from Flink 1.18.0*, the job is assigned *only one Kafka partition
>>>>    (partition 0)*. As a result, messages from the other partitions are
>>>>    not being consumed.
>>>>    -
>>>>
>>>>    However, if I deploy the same job *without a savepoint*, the job
>>>>    correctly assigns all three partitions (*0, 1, 2*) and consumes
>>>>    messages as expected.
>>>>
>>>> I have researched this issue extensively but have not found a clear
>>>> explanation. I would appreciate any guidance on the following queries:
>>>>
>>>>    1.
>>>>
>>>>    *Is this issue related to the compatibility of savepoint
>>>>    restoration between Flink 1.18.0 and Flink 1.20.1?*
>>>>    2.
>>>>
>>>>    *Is this behavior a known bug or an expected outcome?*
>>>>    3.
>>>>
>>>>    *If this is a bug, what are the recommended steps to resolve it?*
>>>>    -
>>>>
>>>>       Are there any configuration changes required to properly restore
>>>>       partitions?
>>>>       -
>>>>
>>>>       Would fixing this require modifications in the application code?
>>>>
>>>> Your insights and assistance on this matter would be highly appreciated.
>>>>
>>>> Thanks & Regards
>>>> Jasvendra Kumar
>>>>
>>>

Reply via email to