Hi Gabor

There should have never been a dependency to the old connector (or a remaining 
state) as I removed everything before deploying a new version. That’s where my 
confusion is coming from. It crashes when deploying two times the same pipeline 
with the same 3.2.0 dependency when reloading from a save / checkpoint. Somehow 
it tries to reload the state with an old dependency (which I cannot even find 
on the classpath) 😲

Dominik Bünzli
Data, Analytics & AI Engineer III

From: Gabor Somogyi <gabor.g.somo...@gmail.com>
Date: Tuesday, 3 September 2024 at 12:59
To: Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com>
Cc: matthias.schwa...@viseca.ch <matthias.schwa...@viseca.ch>, 
user@flink.apache.org <user@flink.apache.org>
Subject: Re: Kafka connector exception restarting Flink 1.19 pipeline
Be aware: This is an external email.

Hi Dominic,

The issue has nothing to do with DynamicKafkaSource.

The scenario what happens here is clear:
* At some point in time you've used the 3.2 Kafka connector which writes out 
it's state with v2 serializer
* Later falled back to a version which is not able to read that (pre 3.1.0 
because this feature has been added in FLINK-32019)
* The old connector blows up as normal operation (such state compatibility is 
not supported)

I see 2 resolutions:
* You remove the state data and use the old connector
* Use the new connector which can read it correctly

Hope this helps.

BR,
G


On Tue, Sep 3, 2024 at 11:31 AM 
<dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>> wrote:
Did you get the indication via the line number that matches the implementation 
in 3.0.2?

I’ll have to check, I cannot find it anywhere in the classpath and we’re not 
using fat jars in app mode. But I see where this is heading. Thanks for 
mentioning!

Best,

Dominik Bünzli
Data, Analytics & AI Engineer III

From: Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>>
Date: Tuesday, 3 September 2024 at 11:07
To: Bünzli Dominik, INI-DNA-INF 
<dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>>, 
user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Kafka connector exception restarting Flink 1.19 pipeline
Be aware: This is an external email.

… really hard to tell, your original error message clearly indicated that an 
older JAR (3.0.x, or pre) was involved, assuming it was somewhere in the 
classpath …
… did you maybe shade this into your jar?

Thias


From: dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com> 
<dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>>
Sent: Tuesday, September 3, 2024 10:23 AM
To: Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: [External] Re: Kafka connector exception restarting Flink 1.19 pipeline

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hi Matthias,

Thank you for your reply!

There should not be a dependency for 3.0.x in my docker image, I only add 3.2.0 
explicitly. When connecting to the running container I also can’t find any 
reference to 3.0.x. I reverted the dependency to 3.0.0-1.17 and it works again. 
Could it be related to the introduction of the DynamicKafkaSource in 3.1.0?

* Factory class for the DynamicKafkaSource components. <a
* href=https://cwiki.apache.org/confluence/x/CBn1D>FLIP-246: 
DynamicKafkaSource</a>
*
* <p>This source's key difference from {@link KafkaSource} is that it enables 
users to read
* dynamically, which does not require job restart, from streams (topics that 
belong to one or more
* clusters). If using {@link KafkaSource}, users need to restart the job by 
deleting the job and
* reconfiguring the topics and clusters.

We are still using KafkaSource in our pipelines.

Best,

Dominik Bünzli
Data, Analytics & AI Engineer III

From: Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>>
Date: Tuesday, 3 September 2024 at 09:59
To: Bünzli Dominik, INI-DNA-INF 
<dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>>, 
user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: RE: Kafka connector exception restarting Flink 1.19 pipeline
Be aware: This is an external email.

Hi Dominik,

No clue why this happens, but it looks like that
when restarting from the savepoint it uses the flink-connector-kafka version 
from your docker image (3.0.x ?) instead of the newer one you configured.
How did you integrate the newer version?



Thias


From: dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com> 
<dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>>
Sent: Monday, September 2, 2024 1:35 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: [External] Kafka connector exception restarting Flink 1.19 pipeline

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Dear Flink community
We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and subsequently 
to 1.19.1). We are sourcing events from Kafka and write enriched events back to 
Kafka. I’m currently using the flink-connector-kafka (3.2.0-1.19). When 
initially deploying (via k8s operator), the pipeline starts as expected and is 
generating output events that are written to Kafka.

As soon as I deploy a new pipeline and perform a restart via savepoint sometime 
afterwards, the following error is thrown. Has anyone seen this before? Any 
idea how to solve the issue?

11:20:41.934 [jobmanager-io-thread-1] INFO  
o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to 
checkpoint.
11:20:41.935 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Closing 
SourceCoordinator for source Source: Kafka Source XYZ.
11:20:41.936 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Source 
coordinator for source Source: Kafka Source XYZ closed.
11:20:41.939 [jobmanager-io-thread-1] INFO  o.a.f.r.s.c.SourceCoordinator - 
Restoring SplitEnumerator of source Source: Kafka Source XYZ from checkpoint.
11:20:41.942 [jobmanager-io-thread-1] ERROR 
o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the 
coordinator to checkpoint and start.
java.io.IOException: The bytes are serialized with version 2, while this 
deserializer only supports version up to 1
                at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83)
                at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43)
                at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556)
                at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451)


Kind regards

Dominik Bünzli
Data, Analytics & AI Engineer III
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to