Hi Dominik, No clue why this happens, but it looks like that when restarting from the savepoint it uses the flink-connector-kafka version from your docker image (3.0.x ?) instead of the newer one you configured. How did you integrate the newer version?
Thias From: dominik.buen...@swisscom.com <dominik.buen...@swisscom.com> Sent: Monday, September 2, 2024 1:35 PM To: user@flink.apache.org Subject: [External] Kafka connector exception restarting Flink 1.19 pipeline ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Dear Flink community We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and subsequently to 1.19.1). We are sourcing events from Kafka and write enriched events back to Kafka. I’m currently using the flink-connector-kafka (3.2.0-1.19). When initially deploying (via k8s operator), the pipeline starts as expected and is generating output events that are written to Kafka. As soon as I deploy a new pipeline and perform a restart via savepoint sometime afterwards, the following error is thrown. Has anyone seen this before? Any idea how to solve the issue? 11:20:41.934 [jobmanager-io-thread-1] INFO o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to checkpoint. 11:20:41.935 [Thread-15] INFO o.a.f.r.s.c.SourceCoordinator - Closing SourceCoordinator for source Source: Kafka Source XYZ. 11:20:41.936 [Thread-15] INFO o.a.f.r.s.c.SourceCoordinator - Source coordinator for source Source: Kafka Source XYZ closed. 11:20:41.939 [jobmanager-io-thread-1] INFO o.a.f.r.s.c.SourceCoordinator - Restoring SplitEnumerator of source Source: Kafka Source XYZ from checkpoint. 11:20:41.942 [jobmanager-io-thread-1] ERROR o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the coordinator to checkpoint and start. java.io.IOException: The bytes are serialized with version 2, while this deserializer only supports version up to 1 at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451) Kind regards Dominik Bünzli Data, Analytics & AI Engineer III Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.