Hi Dominic, The issue has nothing to do with DynamicKafkaSource.
The scenario what happens here is clear: * At some point in time you've used the 3.2 Kafka connector which writes out it's state with v2 serializer * Later falled back to a version which is not able to read that (pre 3.1.0 because this feature has been added in FLINK-32019) * The old connector blows up as normal operation (such state compatibility is not supported) I see 2 resolutions: * You remove the state data and use the old connector * Use the new connector which can read it correctly Hope this helps. BR, G On Tue, Sep 3, 2024 at 11:31 AM <dominik.buen...@swisscom.com> wrote: > Did you get the indication via the line number that matches the > implementation in 3.0.2? > > I’ll have to check, I cannot find it anywhere in the classpath and we’re > not using fat jars in app mode. But I see where this is heading. Thanks for > mentioning! > > > > Best, > > > > > *Dominik Bünzli *Data, Analytics & AI Engineer III > > > > *From: *Schwalbe Matthias <matthias.schwa...@viseca.ch> > *Date: *Tuesday, 3 September 2024 at 11:07 > *To: *Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com>, > user@flink.apache.org <user@flink.apache.org> > *Subject: *RE: Kafka connector exception restarting Flink 1.19 pipeline > > *Be aware:* This is an external email. > > > > … really hard to tell, your original error message clearly indicated that > an older JAR (3.0.x, or pre) was involved, assuming it was somewhere in the > classpath … > > … did you maybe shade this into your jar? > > > > Thias > > > > > > *From:* dominik.buen...@swisscom.com <dominik.buen...@swisscom.com> > *Sent:* Tuesday, September 3, 2024 10:23 AM > *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>; > user@flink.apache.org > *Subject:* [External] Re: Kafka connector exception restarting Flink 1.19 > pipeline > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi Matthias, > > Thank you for your reply! > > > > There should not be a dependency for 3.0.x in my docker image, I only add > 3.2.0 explicitly. When connecting to the running container I also can’t > find any reference to 3.0.x. I reverted the dependency to 3.0.0-1.17 and it > works again. Could it be related to the introduction of the > DynamicKafkaSource in 3.1.0? > > > > * Factory class for the DynamicKafkaSource components. <a > > * href=https://cwiki.apache.org/confluence/x/CBn1D>FLIP-246: > DynamicKafkaSource</a> > > * > > * <p>This source's key difference from {@link KafkaSource} is that it > enables users to read > > * dynamically, which does not require job restart, from streams (topics > that belong to one or more > > * clusters). If using {@link KafkaSource}, users need to restart the job > by deleting the job and > > * reconfiguring the topics and clusters. > > > > We are still using KafkaSource in our pipelines. > > > > Best, > > > > > *Dominik Bünzli *Data, Analytics & AI Engineer III > > > > *From: *Schwalbe Matthias <matthias.schwa...@viseca.ch> > *Date: *Tuesday, 3 September 2024 at 09:59 > *To: *Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com>, > user@flink.apache.org <user@flink.apache.org> > *Subject: *RE: Kafka connector exception restarting Flink 1.19 pipeline > > *Be aware:* This is an external email. > > > > Hi Dominik, > > > > No clue why this happens, but it looks like that > > when restarting from the savepoint it uses the flink-connector-kafka > version from your docker image (3.0.x ?) instead of the newer one you > configured. > > How did you integrate the newer version? > > > > > > > > Thias > > > > > > *From:* dominik.buen...@swisscom.com <dominik.buen...@swisscom.com> > *Sent:* Monday, September 2, 2024 1:35 PM > *To:* user@flink.apache.org > *Subject:* [External] Kafka connector exception restarting Flink 1.19 > pipeline > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Dear Flink community > > We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and > subsequently to 1.19.1). We are sourcing events from Kafka and write > enriched events back to Kafka. I’m currently using the > flink-connector-kafka (3.2.0-1.19). When initially deploying (via k8s > operator), the pipeline starts as expected and is generating output events > that are written to Kafka. > > > > As soon as I deploy a new pipeline and perform a restart via savepoint > sometime afterwards, the following error is thrown. Has anyone seen this > before? Any idea how to solve the issue? > > > 11:20:41.934 [jobmanager-io-thread-1] INFO > o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to > checkpoint. > > 11:20:41.935 [Thread-15] INFO o.a.f.r.s.c.SourceCoordinator - Closing > SourceCoordinator for source Source: Kafka Source XYZ. > > 11:20:41.936 [Thread-15] INFO o.a.f.r.s.c.SourceCoordinator - Source > coordinator for source Source: Kafka Source XYZ closed. > > 11:20:41.939 [jobmanager-io-thread-1] INFO o.a.f.r.s.c.SourceCoordinator > - Restoring SplitEnumerator of source Source: Kafka Source XYZ from > checkpoint. > > 11:20:41.942 [jobmanager-io-thread-1] ERROR > o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the > coordinator to checkpoint and start. > > java.io.IOException: The bytes are serialized with version 2, while this > deserializer only supports version up to 1 > > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83) > > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43) > > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556) > > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451) > > > > > > Kind regards > > > > > *Dominik Bünzli *Data, Analytics & AI Engineer III > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >