Hi Dominik,

No clue why this happens, but it looks like that
when restarting from the savepoint it uses the flink-connector-kafka version 
from your docker image (3.0.x ?) instead of the newer one you configured.
How did you integrate the newer version?



Thias


From: dominik.buen...@swisscom.com <dominik.buen...@swisscom.com>
Sent: Monday, September 2, 2024 1:35 PM
To: user@flink.apache.org
Subject: [External] Kafka connector exception restarting Flink 1.19 pipeline

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Dear Flink community
We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and subsequently 
to 1.19.1). We are sourcing events from Kafka and write enriched events back to 
Kafka. I’m currently using the flink-connector-kafka (3.2.0-1.19). When 
initially deploying (via k8s operator), the pipeline starts as expected and is 
generating output events that are written to Kafka.

As soon as I deploy a new pipeline and perform a restart via savepoint sometime 
afterwards, the following error is thrown. Has anyone seen this before? Any 
idea how to solve the issue?

11:20:41.934 [jobmanager-io-thread-1] INFO  
o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to 
checkpoint.
11:20:41.935 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Closing 
SourceCoordinator for source Source: Kafka Source XYZ.
11:20:41.936 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Source 
coordinator for source Source: Kafka Source XYZ closed.
11:20:41.939 [jobmanager-io-thread-1] INFO  o.a.f.r.s.c.SourceCoordinator - 
Restoring SplitEnumerator of source Source: Kafka Source XYZ from checkpoint.
11:20:41.942 [jobmanager-io-thread-1] ERROR 
o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the 
coordinator to checkpoint and start.
java.io.IOException: The bytes are serialized with version 2, while this 
deserializer only supports version up to 1
                at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83)
                at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43)
                at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556)
                at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451)


Kind regards

Dominik Bünzli
Data, Analytics & AI Engineer III
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to