Dear Flink community

We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and subsequently 
to 1.19.1). We are sourcing events from Kafka and write enriched events back to 
Kafka. I’m currently using the flink-connector-kafka (3.2.0-1.19). When 
initially deploying (via k8s operator), the pipeline starts as expected and is 
generating output events that are written to Kafka.

As soon as I deploy a new pipeline and perform a restart via savepoint sometime 
afterwards, the following error is thrown. Has anyone seen this before? Any 
idea how to solve the issue?

11:20:41.934 [jobmanager-io-thread-1] INFO  
o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to 
checkpoint.
11:20:41.935 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Closing 
SourceCoordinator for source Source: Kafka Source XYZ.
11:20:41.936 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Source 
coordinator for source Source: Kafka Source XYZ closed.
11:20:41.939 [jobmanager-io-thread-1] INFO  o.a.f.r.s.c.SourceCoordinator - 
Restoring SplitEnumerator of source Source: Kafka Source XYZ from checkpoint.
11:20:41.942 [jobmanager-io-thread-1] ERROR 
o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the 
coordinator to checkpoint and start.
java.io.IOException: The bytes are serialized with version 2, while this 
deserializer only supports version up to 1
                at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83)
                at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43)
                at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556)
                at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451)


Kind regards

Dominik Bünzli
Data, Analytics & AI Engineer III

Reply via email to