Hi Gabor There should have never been a dependency to the old connector (or a remaining state) as I removed everything before deploying a new version. That’s where my confusion is coming from. It crashes when deploying two times the same pipeline with the same 3.2.0 dependency when reloading from a save / checkpoint. Somehow it tries to reload the state with an old dependency (which I cannot even find on the classpath) 😲
Dominik Bünzli Data, Analytics & AI Engineer III From: Gabor Somogyi <gabor.g.somo...@gmail.com> Date: Tuesday, 3 September 2024 at 12:59 To: Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com> Cc: matthias.schwa...@viseca.ch <matthias.schwa...@viseca.ch>, user@flink.apache.org <user@flink.apache.org> Subject: Re: Kafka connector exception restarting Flink 1.19 pipeline Be aware: This is an external email. Hi Dominic, The issue has nothing to do with DynamicKafkaSource. The scenario what happens here is clear: * At some point in time you've used the 3.2 Kafka connector which writes out it's state with v2 serializer * Later falled back to a version which is not able to read that (pre 3.1.0 because this feature has been added in FLINK-32019) * The old connector blows up as normal operation (such state compatibility is not supported) I see 2 resolutions: * You remove the state data and use the old connector * Use the new connector which can read it correctly Hope this helps. BR, G On Tue, Sep 3, 2024 at 11:31 AM <dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>> wrote: Did you get the indication via the line number that matches the implementation in 3.0.2? I’ll have to check, I cannot find it anywhere in the classpath and we’re not using fat jars in app mode. But I see where this is heading. Thanks for mentioning! Best, Dominik Bünzli Data, Analytics & AI Engineer III From: Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> Date: Tuesday, 3 September 2024 at 11:07 To: Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>>, user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: RE: Kafka connector exception restarting Flink 1.19 pipeline Be aware: This is an external email. … really hard to tell, your original error message clearly indicated that an older JAR (3.0.x, or pre) was involved, assuming it was somewhere in the classpath … … did you maybe shade this into your jar? Thias From: dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com> <dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>> Sent: Tuesday, September 3, 2024 10:23 AM To: Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>>; user@flink.apache.org<mailto:user@flink.apache.org> Subject: [External] Re: Kafka connector exception restarting Flink 1.19 pipeline ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Matthias, Thank you for your reply! There should not be a dependency for 3.0.x in my docker image, I only add 3.2.0 explicitly. When connecting to the running container I also can’t find any reference to 3.0.x. I reverted the dependency to 3.0.0-1.17 and it works again. Could it be related to the introduction of the DynamicKafkaSource in 3.1.0? * Factory class for the DynamicKafkaSource components. <a * href=https://cwiki.apache.org/confluence/x/CBn1D>FLIP-246: DynamicKafkaSource</a> * * <p>This source's key difference from {@link KafkaSource} is that it enables users to read * dynamically, which does not require job restart, from streams (topics that belong to one or more * clusters). If using {@link KafkaSource}, users need to restart the job by deleting the job and * reconfiguring the topics and clusters. We are still using KafkaSource in our pipelines. Best, Dominik Bünzli Data, Analytics & AI Engineer III From: Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> Date: Tuesday, 3 September 2024 at 09:59 To: Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>>, user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: RE: Kafka connector exception restarting Flink 1.19 pipeline Be aware: This is an external email. Hi Dominik, No clue why this happens, but it looks like that when restarting from the savepoint it uses the flink-connector-kafka version from your docker image (3.0.x ?) instead of the newer one you configured. How did you integrate the newer version? Thias From: dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com> <dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>> Sent: Monday, September 2, 2024 1:35 PM To: user@flink.apache.org<mailto:user@flink.apache.org> Subject: [External] Kafka connector exception restarting Flink 1.19 pipeline ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Dear Flink community We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and subsequently to 1.19.1). We are sourcing events from Kafka and write enriched events back to Kafka. I’m currently using the flink-connector-kafka (3.2.0-1.19). When initially deploying (via k8s operator), the pipeline starts as expected and is generating output events that are written to Kafka. As soon as I deploy a new pipeline and perform a restart via savepoint sometime afterwards, the following error is thrown. Has anyone seen this before? Any idea how to solve the issue? 11:20:41.934 [jobmanager-io-thread-1] INFO o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to checkpoint. 11:20:41.935 [Thread-15] INFO o.a.f.r.s.c.SourceCoordinator - Closing SourceCoordinator for source Source: Kafka Source XYZ. 11:20:41.936 [Thread-15] INFO o.a.f.r.s.c.SourceCoordinator - Source coordinator for source Source: Kafka Source XYZ closed. 11:20:41.939 [jobmanager-io-thread-1] INFO o.a.f.r.s.c.SourceCoordinator - Restoring SplitEnumerator of source Source: Kafka Source XYZ from checkpoint. 11:20:41.942 [jobmanager-io-thread-1] ERROR o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the coordinator to checkpoint and start. java.io.IOException: The bytes are serialized with version 2, while this deserializer only supports version up to 1 at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451) Kind regards Dominik Bünzli Data, Analytics & AI Engineer III Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.