What version of kafka connectors are you using for 1.18 and what for 1.20? BR, G
On Tue, Apr 1, 2025 at 4:02 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > I would suggest allowNonRestoredState=true only if data loss or replay is > acceptable since it will drop the Kafka part of the state. > I see some changes in KafkaSourceEnumStateSerializer but that said it > would be good to have a repro app. > > BR, > G > > > On Tue, Apr 1, 2025 at 3:59 PM jasvendra kumar <javatech....@gmail.com> > wrote: > >> Dear Yang, >> >> Thank you for your response and suggestion. >> >> I have already tried using allowNonRestoredState=true, but the issue >> still persists. Changing the *operator ID* of the Kafka source is >> something I haven’t tested yet. I will attempt this and see if it resolves >> the partition assignment issue. >> >> *Additionally, I would like to highlight an observation that might be >> relevant:* >> >> - >> >> I noticed the following *warning logs* appearing *only in Flink >> 1.20.1*. These logs were not present in Flink 1.18.0. *Could this be >> related to the Kafka source state issue?* >> >> Name Collision: Group already contains a Metric with the name >> 'pendingCommittables'. Metric will not be reported. [IP address, >> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: >> Committer, *0*] >> Name Collision: Group already contains a Metric with the name >> 'pendingCommittables'. Metric will not be reported. [IP address, >> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: >> Committer, *1*] >> Name Collision: Group already contains a Metric with the name >> 'pendingCommittables'. Metric will not be reported. [IP address, >> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: >> Committer, *2*] >> >> >> Regarding the root cause, do you have any insights on what might have >> changed in *Flink 1.20.1* that could lead to this behavior? >> Specifically, are there any known changes in *Kafka source state >> handling* that could impact partition restoration from a savepoint? >> >> Looking forward to your thoughts. Thank you >> >> Best regards, >> Jasvendra >> >> On Tue, Apr 1, 2025 at 6:40 PM Yang LI <yang.hunter...@gmail.com> wrote: >> >>> Hi Jasvendra, >>> >>> From what I’m hearing, it sounds like a Kafka source state issue. As a >>> workaround, in my humble opinion, you could try changing the operator ID of >>> your Kafka source operator and re-deploying it with >>> allowNonRestoredState=true to discard the existing Kafka source state. >>> >>> As for the root cause of the Kafka source state issue, that would >>> definitely require further investigation. >>> >>> BR, >>> Yang >>> >>> On Tue, 1 Apr 2025 at 14:46, Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Hi Jasvendra, >>>> >>>> In short 1.18 savepoint should be compatible from 1.20. >>>> We don't know such existing issue. Can you please come up with a bare >>>> minimal step-by-step or public repo where one can repro it easily? >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Tue, Apr 1, 2025 at 2:37 PM jasvendra kumar <javatech....@gmail.com> >>>> wrote: >>>> >>>>> Dear Flink Community, >>>>> >>>>> I am currently in the process of upgrading our Flink cluster from *version >>>>> 1.18.0 to 1.20.1*. The cluster itself is functioning correctly >>>>> post-upgrade, and I am able to deploy Flink jobs successfully. However, I >>>>> have encountered an issue when attempting to restore a job using a >>>>> *savepoint >>>>> or state taken from Flink 1.18.0*. >>>>> *Issue Description* >>>>> >>>>> - >>>>> >>>>> When deploying the Flink job to the *Flink 1.20.1 cluster* using a >>>>> *savepoint >>>>> from Flink 1.18.0*, the job is assigned *only one Kafka partition >>>>> (partition 0)*. As a result, messages from the other partitions >>>>> are not being consumed. >>>>> - >>>>> >>>>> However, if I deploy the same job *without a savepoint*, the job >>>>> correctly assigns all three partitions (*0, 1, 2*) and consumes >>>>> messages as expected. >>>>> >>>>> I have researched this issue extensively but have not found a clear >>>>> explanation. I would appreciate any guidance on the following queries: >>>>> >>>>> 1. >>>>> >>>>> *Is this issue related to the compatibility of savepoint >>>>> restoration between Flink 1.18.0 and Flink 1.20.1?* >>>>> 2. >>>>> >>>>> *Is this behavior a known bug or an expected outcome?* >>>>> 3. >>>>> >>>>> *If this is a bug, what are the recommended steps to resolve it?* >>>>> - >>>>> >>>>> Are there any configuration changes required to properly >>>>> restore partitions? >>>>> - >>>>> >>>>> Would fixing this require modifications in the application code? >>>>> >>>>> Your insights and assistance on this matter would be highly >>>>> appreciated. >>>>> >>>>> Thanks & Regards >>>>> Jasvendra Kumar >>>>> >>>>