[
https://issues.apache.org/jira/browse/FLINK-39674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alex Rovner updated FLINK-39674:
--------------------------------
Description:
For more than two years now, `flink-avro-confluent-registry` module uses
version 7.5.3 of `io.confluent:kafka-schema-registry-client`. This is deeply
problematic for multiple reasons:
* Confluent's official support for this version expired in August 2025
* This version has several known high-severity vulnerabilities
* This version is not aligned with the Kafka client used in the most recent
version of flink-connector-kafka (4.0.1-2.0 at the time of writing). The
included Kafka client has version 3.9.x, while the 7.5.x versions of the schema
registry client are made for Kafka version 3.5.x (see [compatibility
table|https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility])
The last point is especially painful, because the schema registry client
depends on the Kafka client for some of its functionalities. As the versions of
the two clients drift apart, we begin seeing runtime errors due to methods not
existing any more. For example, it is no longer possible to configure the
schema registry to use OAuth authentication for this reason:
{code:java}
Caused by: java.lang.NoSuchMethodError: 'void
org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.<init>(java.lang.String,
java.lang.String, java.lang.String, javax.net.ssl.SSLSocketFactory,
java.lang.String, long, long, java.lang.Integer, java.lang.Integer)'
at
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getTokenRetriever(OauthCredentialProvider.java:106)
{code}
To trigger the above error, it is sufficient to configure a Kafka Sink with the
aforementioned most recent versions of `flink-avro-confluent-registry` and
`flink-connector-kafka`:
{code:java}
Map<String, String> registryConfig = Map.of(
"bearer.auth.credentials.source","OAUTHBEARER",
"bearer.auth.issuer.endpoint.url", "...",
"bearer.auth.client.id","registry-api",
"bearer.auth.client.secret","...",
);
KafkaSink.<...>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<...>builder()
.setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(....class,
subject, schemaRegistryUrl, registryConfig))
.setTopic(topicName)
.setKeySerializationSchema(...)
.build())
.build(); {code}
was:
For more than two years now, `flink-avro-confluent-registry` module uses
version 7.5.3 of `io.confluent:kafka-schema-registry-client`. This is deeply
problematic for multiple reasons:
* Confluent's official support for this version expired in August 2025
* This version has several known high-severity vulnerabilities
* This version is not aligned with the Kafka client used in the most recent
version of flink-connector-kafka (4.0.1-2.0 at the time of writing). The
included Kafka client has version 3.9.x, while the 7.5.x versions of the schema
registry client are made for Kafka version 3.5.x (see [compatibility
table|https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility])
The last point is especially painful, because the schema registry client
depends on the Kafka client for some of its functionalities. As the versions of
the two clients drift apart, we begin seeing runtime errors due to methods not
existing any more. For example, it is no longer possible to configure the
schema registry to use OAuth authentication for this reason:
{code:java}
Caused by: java.lang.NoSuchMethodError: 'void
org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.<init>(java.lang.String,
java.lang.String, java.lang.String, javax.net.ssl.SSLSocketFactory,
java.lang.String, long, long, java.lang.Integer, java.lang.Integer)'
at
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getTokenRetriever(OauthCredentialProvider.java:106)
{code}
> flink-avro-confluent-registry uses an obsolete schema registry client
> ---------------------------------------------------------------------
>
> Key: FLINK-39674
> URL: https://issues.apache.org/jira/browse/FLINK-39674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC,
> SequenceFile)
> Affects Versions: 2.2.0, 2.2.1
> Environment: this issue is environment-independent
> Reporter: Alex Rovner
> Priority: Major
> Labels: avro, kafka, oauth, oauth2
>
> For more than two years now, `flink-avro-confluent-registry` module uses
> version 7.5.3 of `io.confluent:kafka-schema-registry-client`. This is deeply
> problematic for multiple reasons:
> * Confluent's official support for this version expired in August 2025
> * This version has several known high-severity vulnerabilities
> * This version is not aligned with the Kafka client used in the most recent
> version of flink-connector-kafka (4.0.1-2.0 at the time of writing). The
> included Kafka client has version 3.9.x, while the 7.5.x versions of the
> schema registry client are made for Kafka version 3.5.x (see [compatibility
> table|https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-ak-compatibility])
> The last point is especially painful, because the schema registry client
> depends on the Kafka client for some of its functionalities. As the versions
> of the two clients drift apart, we begin seeing runtime errors due to methods
> not existing any more. For example, it is no longer possible to configure the
> schema registry to use OAuth authentication for this reason:
>
> {code:java}
> Caused by: java.lang.NoSuchMethodError: 'void
> org.apache.kafka.common.security.oauthbearer.internals.secured.HttpAccessTokenRetriever.<init>(java.lang.String,
> java.lang.String, java.lang.String, javax.net.ssl.SSLSocketFactory,
> java.lang.String, long, long, java.lang.Integer, java.lang.Integer)'
> at
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getTokenRetriever(OauthCredentialProvider.java:106)
> {code}
>
> To trigger the above error, it is sufficient to configure a Kafka Sink with
> the aforementioned most recent versions of `flink-avro-confluent-registry`
> and `flink-connector-kafka`:
> {code:java}
> Map<String, String> registryConfig = Map.of(
> "bearer.auth.credentials.source","OAUTHBEARER",
> "bearer.auth.issuer.endpoint.url", "...",
> "bearer.auth.client.id","registry-api",
> "bearer.auth.client.secret","...",
> );
> KafkaSink.<...>builder()
> .setBootstrapServers("localhost:9092")
> .setRecordSerializer(KafkaRecordSerializationSchema.<...>builder()
>
> .setValueSerializationSchema(ConfluentRegistryAvroSerializationSchema.forSpecific(....class,
> subject, schemaRegistryUrl, registryConfig))
> .setTopic(topicName)
> .setKeySerializationSchema(...)
> .build())
> .build(); {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)