Hi Gabor, Please find below info. *Flink version 1.18 * <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> *<version>3.2.0-1.18</version>* </dependency>
*Flink version : 1.20.1* <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> * <version>3.3.0-1.20</version>* </dependency> Thank you Jasvendra On Tue, Apr 1, 2025 at 7:45 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > What version of kafka connectors are you using for 1.18 and what for 1.20? > > BR, > G > > > On Tue, Apr 1, 2025 at 4:02 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> I would suggest allowNonRestoredState=true only if data loss or replay is >> acceptable since it will drop the Kafka part of the state. >> I see some changes in KafkaSourceEnumStateSerializer but that said it >> would be good to have a repro app. >> >> BR, >> G >> >> >> On Tue, Apr 1, 2025 at 3:59 PM jasvendra kumar <javatech....@gmail.com> >> wrote: >> >>> Dear Yang, >>> >>> Thank you for your response and suggestion. >>> >>> I have already tried using allowNonRestoredState=true, but the issue >>> still persists. Changing the *operator ID* of the Kafka source is >>> something I haven’t tested yet. I will attempt this and see if it resolves >>> the partition assignment issue. >>> >>> *Additionally, I would like to highlight an observation that might be >>> relevant:* >>> >>> - >>> >>> I noticed the following *warning logs* appearing *only in Flink >>> 1.20.1*. These logs were not present in Flink 1.18.0. *Could this be >>> related to the Kafka source state issue?* >>> >>> Name Collision: Group already contains a Metric with the name >>> 'pendingCommittables'. Metric will not be reported. [IP address, >>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: >>> Committer, *0*] >>> Name Collision: Group already contains a Metric with the name >>> 'pendingCommittables'. Metric will not be reported. [IP address, >>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: >>> Committer, *1*] >>> Name Collision: Group already contains a Metric with the name >>> 'pendingCommittables'. Metric will not be reported. [IP address, >>> taskmanager, flink-cluster-1-20-1taskmanager-2-41, test-job, streamSink: >>> Committer, *2*] >>> >>> >>> Regarding the root cause, do you have any insights on what might have >>> changed in *Flink 1.20.1* that could lead to this behavior? >>> Specifically, are there any known changes in *Kafka source state >>> handling* that could impact partition restoration from a savepoint? >>> >>> Looking forward to your thoughts. Thank you >>> >>> Best regards, >>> Jasvendra >>> >>> On Tue, Apr 1, 2025 at 6:40 PM Yang LI <yang.hunter...@gmail.com> wrote: >>> >>>> Hi Jasvendra, >>>> >>>> From what I’m hearing, it sounds like a Kafka source state issue. As a >>>> workaround, in my humble opinion, you could try changing the operator ID of >>>> your Kafka source operator and re-deploying it with >>>> allowNonRestoredState=true to discard the existing Kafka source state. >>>> >>>> As for the root cause of the Kafka source state issue, that would >>>> definitely require further investigation. >>>> >>>> BR, >>>> Yang >>>> >>>> On Tue, 1 Apr 2025 at 14:46, Gabor Somogyi <gabor.g.somo...@gmail.com> >>>> wrote: >>>> >>>>> Hi Jasvendra, >>>>> >>>>> In short 1.18 savepoint should be compatible from 1.20. >>>>> We don't know such existing issue. Can you please come up with a bare >>>>> minimal step-by-step or public repo where one can repro it easily? >>>>> >>>>> BR, >>>>> G >>>>> >>>>> >>>>> On Tue, Apr 1, 2025 at 2:37 PM jasvendra kumar <javatech....@gmail.com> >>>>> wrote: >>>>> >>>>>> Dear Flink Community, >>>>>> >>>>>> I am currently in the process of upgrading our Flink cluster from >>>>>> *version >>>>>> 1.18.0 to 1.20.1*. The cluster itself is functioning correctly >>>>>> post-upgrade, and I am able to deploy Flink jobs successfully. However, I >>>>>> have encountered an issue when attempting to restore a job using a >>>>>> *savepoint >>>>>> or state taken from Flink 1.18.0*. >>>>>> *Issue Description* >>>>>> >>>>>> - >>>>>> >>>>>> When deploying the Flink job to the *Flink 1.20.1 cluster* using >>>>>> a *savepoint from Flink 1.18.0*, the job is assigned *only one >>>>>> Kafka partition (partition 0)*. As a result, messages from the >>>>>> other partitions are not being consumed. >>>>>> - >>>>>> >>>>>> However, if I deploy the same job *without a savepoint*, the job >>>>>> correctly assigns all three partitions (*0, 1, 2*) and consumes >>>>>> messages as expected. >>>>>> >>>>>> I have researched this issue extensively but have not found a clear >>>>>> explanation. I would appreciate any guidance on the following queries: >>>>>> >>>>>> 1. >>>>>> >>>>>> *Is this issue related to the compatibility of savepoint >>>>>> restoration between Flink 1.18.0 and Flink 1.20.1?* >>>>>> 2. >>>>>> >>>>>> *Is this behavior a known bug or an expected outcome?* >>>>>> 3. >>>>>> >>>>>> *If this is a bug, what are the recommended steps to resolve it?* >>>>>> - >>>>>> >>>>>> Are there any configuration changes required to properly >>>>>> restore partitions? >>>>>> - >>>>>> >>>>>> Would fixing this require modifications in the application >>>>>> code? >>>>>> >>>>>> Your insights and assistance on this matter would be highly >>>>>> appreciated. >>>>>> >>>>>> Thanks & Regards >>>>>> Jasvendra Kumar >>>>>> >>>>> -- Thanks & Regards Jasvendra