Dear Flink community We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and subsequently to 1.19.1). We are sourcing events from Kafka and write enriched events back to Kafka. I’m currently using the flink-connector-kafka (3.2.0-1.19). When initially deploying (via k8s operator), the pipeline starts as expected and is generating output events that are written to Kafka.
As soon as I deploy a new pipeline and perform a restart via savepoint sometime afterwards, the following error is thrown. Has anyone seen this before? Any idea how to solve the issue? 11:20:41.934 [jobmanager-io-thread-1] INFO o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to checkpoint. 11:20:41.935 [Thread-15] INFO o.a.f.r.s.c.SourceCoordinator - Closing SourceCoordinator for source Source: Kafka Source XYZ. 11:20:41.936 [Thread-15] INFO o.a.f.r.s.c.SourceCoordinator - Source coordinator for source Source: Kafka Source XYZ closed. 11:20:41.939 [jobmanager-io-thread-1] INFO o.a.f.r.s.c.SourceCoordinator - Restoring SplitEnumerator of source Source: Kafka Source XYZ from checkpoint. 11:20:41.942 [jobmanager-io-thread-1] ERROR o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the coordinator to checkpoint and start. java.io.IOException: The bytes are serialized with version 2, while this deserializer only supports version up to 1 at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83) at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451) Kind regards Dominik Bünzli Data, Analytics & AI Engineer III