Dear Yang, Thank you for your response and suggestion.
I have already tried using allowNonRestoredState=true, but the issue still persists. Changing the *operator ID* of the Kafka source is something I haven’t tested yet. I will attempt this and see if it resolves the partition assignment issue. *Additionally, I would like to highlight an observation that might be relevant:* - I noticed the following *warning logs* appearing *only in Flink 1.20.1*. These logs were not present in Flink 1.18.0. *Could this be related to the Kafka source state issue?* Name Collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *0*] Name Collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *1*] Name Collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *2*] Regarding the root cause, do you have any insights on what might have changed in *Flink 1.20.1* that could lead to this behavior? Specifically, are there any known changes in *Kafka source state handling* that could impact partition restoration from a savepoint? Looking forward to your thoughts. Thank you Best regards, Jasvendra On Tue, Apr 1, 2025 at 6:40 PM Yang LI <yang.hunter...@gmail.com> wrote: > Hi Jasvendra, > > From what I’m hearing, it sounds like a Kafka source state issue. As a > workaround, in my humble opinion, you could try changing the operator ID of > your Kafka source operator and re-deploying it with > allowNonRestoredState=true to discard the existing Kafka source state. > > As for the root cause of the Kafka source state issue, that would > definitely require further investigation. > > BR, > Yang > > On Tue, 1 Apr 2025 at 14:46, Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Hi Jasvendra, >> >> In short 1.18 savepoint should be compatible from 1.20. >> We don't know such existing issue. Can you please come up with a bare >> minimal step-by-step or public repo where one can repro it easily? >> >> BR, >> G >> >> >> On Tue, Apr 1, 2025 at 2:37 PM jasvendra kumar <javatech....@gmail.com> >> wrote: >> >>> Dear Flink Community, >>> >>> I am currently in the process of upgrading our Flink cluster from *version >>> 1.18.0 to 1.20.1*. The cluster itself is functioning correctly >>> post-upgrade, and I am able to deploy Flink jobs successfully. However, I >>> have encountered an issue when attempting to restore a job using a >>> *savepoint >>> or state taken from Flink 1.18.0*. >>> *Issue Description* >>> >>> - >>> >>> When deploying the Flink job to the *Flink 1.20.1 cluster* using a >>> *savepoint >>> from Flink 1.18.0*, the job is assigned *only one Kafka partition >>> (partition 0)*. As a result, messages from the other partitions are >>> not being consumed. >>> - >>> >>> However, if I deploy the same job *without a savepoint*, the job >>> correctly assigns all three partitions (*0, 1, 2*) and consumes >>> messages as expected. >>> >>> I have researched this issue extensively but have not found a clear >>> explanation. I would appreciate any guidance on the following queries: >>> >>> 1. >>> >>> *Is this issue related to the compatibility of savepoint restoration >>> between Flink 1.18.0 and Flink 1.20.1?* >>> 2. >>> >>> *Is this behavior a known bug or an expected outcome?* >>> 3. >>> >>> *If this is a bug, what are the recommended steps to resolve it?* >>> - >>> >>> Are there any configuration changes required to properly restore >>> partitions? >>> - >>> >>> Would fixing this require modifications in the application code? >>> >>> Your insights and assistance on this matter would be highly appreciated. >>> >>> Thanks & Regards >>> Jasvendra Kumar >>> >>