Hi Dominic,

There is not much possibility where such libs live.
Either the application packages it or it lives under the Flink distro root
directory (lib or plugins).

BR,
G


On Tue, Sep 3, 2024 at 1:17 PM <dominik.buen...@swisscom.com> wrote:

> Hi Gabor
>
>
>
> There should have never been a dependency to the old connector (or a
> remaining state) as I removed everything before deploying a new version.
> That’s where my confusion is coming from. It crashes when deploying two
> times the same pipeline with the same 3.2.0 dependency when reloading from
> a save / checkpoint. Somehow it tries to reload the state with an old
> dependency (which I cannot even find on the classpath) 😲
>
>
>
>
> *Dominik Bünzli  *Data, Analytics & AI Engineer III
>
>
>
> *From: *Gabor Somogyi <gabor.g.somo...@gmail.com>
> *Date: *Tuesday, 3 September 2024 at 12:59
> *To: *Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com>
> *Cc: *matthias.schwa...@viseca.ch <matthias.schwa...@viseca.ch>,
> user@flink.apache.org <user@flink.apache.org>
> *Subject: *Re: Kafka connector exception restarting Flink 1.19 pipeline
>
> *Be aware:* This is an external email.
>
>
>
> Hi Dominic,
>
>
>
> The issue has nothing to do with DynamicKafkaSource.
>
>
>
> The scenario what happens here is clear:
>
> * At some point in time you've used the 3.2 Kafka connector which writes
> out it's state with v2 serializer
>
> * Later falled back to a version which is not able to read that (pre 3.1.0
> because this feature has been added in FLINK-32019)
>
> * The old connector blows up as normal operation (such state compatibility
> is not supported)
>
>
>
> I see 2 resolutions:
>
> * You remove the state data and use the old connector
>
> * Use the new connector which can read it correctly
>
>
>
> Hope this helps.
>
>
>
> BR,
>
> G
>
>
>
>
>
> On Tue, Sep 3, 2024 at 11:31 AM <dominik.buen...@swisscom.com> wrote:
>
> Did you get the indication via the line number that matches the
> implementation in 3.0.2?
>
> I’ll have to check, I cannot find it anywhere in the classpath and we’re
> not using fat jars in app mode. But I see where this is heading. Thanks for
> mentioning!
>
>
>
> Best,
>
>
>
>
> *Dominik Bünzli  *Data, Analytics & AI Engineer III
>
>
>
> *From: *Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Date: *Tuesday, 3 September 2024 at 11:07
> *To: *Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com>,
> user@flink.apache.org <user@flink.apache.org>
> *Subject: *RE: Kafka connector exception restarting Flink 1.19 pipeline
>
> *Be aware:* This is an external email.
>
>
>
> … really hard to tell, your original error message clearly indicated that
> an older JAR (3.0.x, or pre) was involved, assuming it was somewhere in the
> classpath …
>
> … did you maybe shade this into your jar?
>
>
>
> Thias
>
>
>
>
>
> *From:* dominik.buen...@swisscom.com <dominik.buen...@swisscom.com>
> *Sent:* Tuesday, September 3, 2024 10:23 AM
> *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>;
> user@flink.apache.org
> *Subject:* [External] Re: Kafka connector exception restarting Flink 1.19
> pipeline
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Matthias,
>
> Thank you for your reply!
>
>
>
> There should not be a dependency for 3.0.x in my docker image, I only add
> 3.2.0 explicitly. When connecting to the running container I also can’t
> find any reference to 3.0.x. I reverted the dependency to 3.0.0-1.17 and it
> works again. Could it be related to the introduction of the
> DynamicKafkaSource in 3.1.0?
>
>
>
> * Factory class for the DynamicKafkaSource components. <a
>
> * href=https://cwiki.apache.org/confluence/x/CBn1D>FLIP-246:
> DynamicKafkaSource</a>
>
> *
>
> * <p>This source's key difference from {@link KafkaSource} is that it
> enables users to read
>
> * dynamically, which does not require job restart, from streams (topics
> that belong to one or more
>
> * clusters). If using {@link KafkaSource}, users need to restart the job
> by deleting the job and
>
> * reconfiguring the topics and clusters.
>
>
>
> We are still using KafkaSource in our pipelines.
>
>
>
> Best,
>
>
>
>
> *Dominik Bünzli  *Data, Analytics & AI Engineer III
>
>
>
> *From: *Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Date: *Tuesday, 3 September 2024 at 09:59
> *To: *Bünzli Dominik, INI-DNA-INF <dominik.buen...@swisscom.com>,
> user@flink.apache.org <user@flink.apache.org>
> *Subject: *RE: Kafka connector exception restarting Flink 1.19 pipeline
>
> *Be aware:* This is an external email.
>
>
>
> Hi Dominik,
>
>
>
> No clue why this happens, but it looks like that
>
> when restarting from the savepoint it uses the flink-connector-kafka
> version from your docker image (3.0.x ?) instead of the newer one you
> configured.
>
> How did you integrate the newer version?
>
>
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* dominik.buen...@swisscom.com <dominik.buen...@swisscom.com>
> *Sent:* Monday, September 2, 2024 1:35 PM
> *To:* user@flink.apache.org
> *Subject:* [External] Kafka connector exception restarting Flink 1.19
> pipeline
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Dear Flink community
>
> We recently migrated our pipelines from Flink 1.17 to 1.19.0 (and
> subsequently to 1.19.1). We are sourcing events from Kafka and write
> enriched events back to Kafka. I’m currently using the
> flink-connector-kafka (3.2.0-1.19). When initially deploying (via k8s
> operator), the pipeline starts as expected and is generating output events
> that are written to Kafka.
>
>
>
> As soon as I deploy a new pipeline and perform a restart via savepoint
> sometime afterwards, the following error is thrown. Has anyone seen this
> before? Any idea how to solve the issue?
>
>
> 11:20:41.934 [jobmanager-io-thread-1] INFO
> o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Resetting coordinator to
> checkpoint.
>
> 11:20:41.935 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Closing
> SourceCoordinator for source Source: Kafka Source XYZ.
>
> 11:20:41.936 [Thread-15] INFO  o.a.f.r.s.c.SourceCoordinator - Source
> coordinator for source Source: Kafka Source XYZ closed.
>
> 11:20:41.939 [jobmanager-io-thread-1] INFO  o.a.f.r.s.c.SourceCoordinator
> - Restoring SplitEnumerator of source Source: Kafka Source XYZ from
> checkpoint.
>
> 11:20:41.942 [jobmanager-io-thread-1] ERROR
> o.a.f.r.o.c.RecreateOnResetOperatorCoordinator - Failed to reset the
> coordinator to checkpoint and start.
>
> java.io.IOException: The bytes are serialized with version 2, while this
> deserializer only supports version up to 1
>
>                 at
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:83)
>
>                 at
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.deserialize(KafkaSourceEnumStateSerializer.java:43)
>
>                 at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:556)
>
>                 at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:451)
>
>
>
>
>
> Kind regards
>
>
>
>
> *Dominik Bünzli  *Data, Analytics & AI Engineer III
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
>

Reply via email to