I would suggest allowNonRestoredState=true only if data loss or replay is acceptable since it will drop the Kafka part of the state. I see some changes in KafkaSourceEnumStateSerializer but that said it would be good to have a repro app.
BR, G On Tue, Apr 1, 2025 at 3:59 PM jasvendra kumar <javatech....@gmail.com> wrote: > Dear Yang, > > Thank you for your response and suggestion. > > I have already tried using allowNonRestoredState=true, but the issue > still persists. Changing the *operator ID* of the Kafka source is > something I haven’t tested yet. I will attempt this and see if it resolves > the partition assignment issue. > > *Additionally, I would like to highlight an observation that might be > relevant:* > > - > > I noticed the following *warning logs* appearing *only in Flink 1.20.1*. > These logs were not present in Flink 1.18.0. *Could this be related to > the Kafka source state issue?* > > Name Collision: Group already contains a Metric with the name > 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, > flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *0*] > Name Collision: Group already contains a Metric with the name > 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, > flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *1*] > Name Collision: Group already contains a Metric with the name > 'pendingCommittables'. Metric will not be reported. [IP address, taskmanager, > flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: Committer, *2*] > > > Regarding the root cause, do you have any insights on what might have > changed in *Flink 1.20.1* that could lead to this behavior? Specifically, > are there any known changes in *Kafka source state handling* that could > impact partition restoration from a savepoint? > > Looking forward to your thoughts. Thank you > > Best regards, > Jasvendra > > On Tue, Apr 1, 2025 at 6:40 PM Yang LI <yang.hunter...@gmail.com> wrote: > >> Hi Jasvendra, >> >> From what I’m hearing, it sounds like a Kafka source state issue. As a >> workaround, in my humble opinion, you could try changing the operator ID of >> your Kafka source operator and re-deploying it with >> allowNonRestoredState=true to discard the existing Kafka source state. >> >> As for the root cause of the Kafka source state issue, that would >> definitely require further investigation. >> >> BR, >> Yang >> >> On Tue, 1 Apr 2025 at 14:46, Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Jasvendra, >>> >>> In short 1.18 savepoint should be compatible from 1.20. >>> We don't know such existing issue. Can you please come up with a bare >>> minimal step-by-step or public repo where one can repro it easily? >>> >>> BR, >>> G >>> >>> >>> On Tue, Apr 1, 2025 at 2:37 PM jasvendra kumar <javatech....@gmail.com> >>> wrote: >>> >>>> Dear Flink Community, >>>> >>>> I am currently in the process of upgrading our Flink cluster from *version >>>> 1.18.0 to 1.20.1*. The cluster itself is functioning correctly >>>> post-upgrade, and I am able to deploy Flink jobs successfully. However, I >>>> have encountered an issue when attempting to restore a job using a >>>> *savepoint >>>> or state taken from Flink 1.18.0*. >>>> *Issue Description* >>>> >>>> - >>>> >>>> When deploying the Flink job to the *Flink 1.20.1 cluster* using a >>>> *savepoint >>>> from Flink 1.18.0*, the job is assigned *only one Kafka partition >>>> (partition 0)*. As a result, messages from the other partitions are >>>> not being consumed. >>>> - >>>> >>>> However, if I deploy the same job *without a savepoint*, the job >>>> correctly assigns all three partitions (*0, 1, 2*) and consumes >>>> messages as expected. >>>> >>>> I have researched this issue extensively but have not found a clear >>>> explanation. I would appreciate any guidance on the following queries: >>>> >>>> 1. >>>> >>>> *Is this issue related to the compatibility of savepoint >>>> restoration between Flink 1.18.0 and Flink 1.20.1?* >>>> 2. >>>> >>>> *Is this behavior a known bug or an expected outcome?* >>>> 3. >>>> >>>> *If this is a bug, what are the recommended steps to resolve it?* >>>> - >>>> >>>> Are there any configuration changes required to properly restore >>>> partitions? >>>> - >>>> >>>> Would fixing this require modifications in the application code? >>>> >>>> Your insights and assistance on this matter would be highly appreciated. >>>> >>>> Thanks & Regards >>>> Jasvendra Kumar >>>> >>>