Hi Dominic,

The issue has nothing to do with DynamicKafkaSource.

The scenario what happens here is clear:
* At some point in time you've used the 3.2 Kafka connector which writes
out it's state with v2 serializer
* Later falled back to a version which is not able to read that (pre 3.1.0
because this feature has been added in FLINK-32019)
* The old connector blows up as normal operation (such state compatibility
is not supported)

I see 2 resolutions:
* You remove the state data and use the old connector
* Use the new connector which can read it correctly

Hope this helps.

BR,
G


On Tue, Sep 3, 2024 at 11:31 AM <dominik.buen...@swisscom.com> wrote:

> Did you get the indication via the line number that matches the
> implementation in 3.0.2?
>
> I’ll have to check, I cannot find it anywhere in the classpath and we’re
> not using fat jars in app mode. But I see where this is heading. Thanks for
> mentioning!
>
>
>
> Best,
>
>
>
>
> *Dominik Bünzli  *Data, Analytics & AI Engineer III
>
>
>
> *From: *Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Date: *Tuesday, 3 September 2024 at 11:07
> *To: *Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com>,
> user@flink.apache.org <user@flink.apache.org>
> *Subject: *RE: Kafka connector exception restarting Flink 1.19 pipeline
>
> *Be aware:* This is an external email.
>
>
>
> … really hard to tell, your original error message clearly indicated that
> an older JAR (3.0.x, or pre) was involved, assuming it was somewhere in the
> classpath …
>
> … did you maybe shade this into your jar?
>
>
>
> Thias
>
>
>
>
>
> *From:* dominik.buen...@swisscom.com <dominik.buen...@swisscom.com>
> *Sent:* Tuesday, September 3, 2024 10:23 AM
> *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>;
> user@flink.apache.org
> *Subject:* [External] Re: Kafka connector exception restarting Flink 1.19
> pipeline
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Matthias,
>
> Thank you for your reply!
>
>
>
> There should not be a dependency for 3.0.x in my docker image, I only add
> 3.2.0 explicitly. When connecting to the running container I also can’t
> find any reference to 3.0.x. I reverted the dependency to 3.0.0-1.17 and it
> works again. Could it be related to the introduction of the
> DynamicKafkaSource in 3.1.0?
>
>
>
> * Factory class for the DynamicKafkaSource components. <a
>
> * href=https://cwiki.apache.org/confluence/x/CBn1D>FLIP-246:
> DynamicKafkaSource</a>
>
> *
>
> * <p>This source's key difference from {@link KafkaSource} is that it
> enables users to read
>
> * dynamically, which does not require job restart, from streams (topics
> that belong to one or more
>
> * clusters). If using {@link KafkaSource}, users need to restart the job
> by deleting the job and
>
> * reconfiguring the topics and clusters.
>
>
>
> We are still using KafkaSource in our pipelines.
>
>
>
> Best,
>
>
>
>
> *Dominik Bünzli  *Data, Analytics & AI Engineer III
>
>
>
> *From: *Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Date: *Tuesday, 3 September 2024 at 09:59
> *To: *Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com>,
> user@flink.apache.org <user@flink.apache.org>
> *Subject: *RE: Kafka connector exception restarting Flink 1.19 pipeline
>
> *Be aware:* This is an external email.
>
>
>
> Hi Dominik,
>
>
>
> No clue why this happens, but it looks like that
>
> when restarting from the savepoint it uses the flink-connector-kafka
> version from your docker image (3.0.x ?) instead of the newer one you
> configured.
>
> How did you integrate the newer version?
>
>
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* dominik.buen...@swisscom.com <dominik.buen...@swisscom.com>
> *Sent:* Monday, September 2, 2024 1:35 PM
> *To:* user@flink.apache.org
> *Subject:* [External] Kafka connector exception restarting Flink 1.19
> pipeline
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Dear Flink community
>
> We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and
> subsequently to 1.19.1). We are sourcing events from Kafka and write
> enriched events back to Kafka. I’m currently using the
> flink-connector-kafka (3.2.0-1.19). When initially deploying (via k8s
> operator), the pipeline starts as expected and is generating output events
> that are written to Kafka.
>
>
>
> As soon as I deploy a new pipeline and perform a restart via savepoint
> sometime afterwards, the following error is thrown. Has anyone seen this
> before? Any idea how to solve the issue?
>
>
> 11:20:41.934 [jobmanager-io-thread-1] INFO
> o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to
> checkpoint.
>
> 11:20:41.935 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Closing
> SourceCoordinator for source Source: Kafka Source XYZ.
>
> 11:20:41.936 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Source
> coordinator for source Source: Kafka Source XYZ closed.
>
> 11:20:41.939 [jobmanager-io-thread-1] INFO  o.a.f.r.s.c.SourceCoordinator
> - Restoring SplitEnumerator of source Source: Kafka Source XYZ from
> checkpoint.
>
> 11:20:41.942 [jobmanager-io-thread-1] ERROR
> o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the
> coordinator to checkpoint and start.
>
> java.io.IOException: The bytes are serialized with version 2, while this
> deserializer only supports version up to 1
>
>                 at
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83)
>
>                 at
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43)
>
>                 at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556)
>
>                 at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451)
>
>
>
>
>
> Kind regards
>
>
>
>
> *Dominik Bünzli  *Data, Analytics & AI Engineer III
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to