Hello,

I'm running a Flink Session Cluster on K8s and deploy the Flink jobs using the 
the Flink rest API. The jobs using Avro for the producers and consumers. The 
jobs consume and produce from/to a secured Kafka cluster via TLS and SCRAM-SHA. 
Everything works as expected.

Now I need to introduce the Schema Registry, to accomplish this I use the 
library from Flink: flink-avro-confluent-registry, version 1.11.1. 
Today I found out that this library is using an old version of the 
kafka-schema-registry-client, when I checked the META-INF on the package 
dependency for the Kafka Schema Registry Client of the library.

#Created by Apache Maven 3.3.9
version=4.1.0
groupId=io.confluent
artifactId=kafka-schema-registry-client
 
I think this is the main problem, because the schema registry that is deployed 
on my cluster is using 5.5 and there have been considerable improvements, 
especially around SSL support since version 5.4 See the following PR: 
https://github.com/confluentinc/schema-registry/pull/957/files 
<https://github.com/confluentinc/schema-registry/pull/957/files> Which was 
merged into version 5.4. As you also can see here: 
https://docs.confluent.io/current/schema-registry/security/index.html#additional-configurations-for-https
 
<https://docs.confluent.io/current/schema-registry/security/index.html#additional-configurations-for-https>

So far none of the documented solutions worked. And it is also not possible to 
implement my own Serializer with this Flink library because all important 
classes are either have private or protected constructors.

So to my question: 
Will this library flink-avro-confluent-registry be updated to use the latest 
Kafka Schema Registry Client (v5.5) to support SSL and will it be possible to 
pass in the config map with the schema registry properties into the serializer 
as described in the documentation?

So far I tried all documented options without success and added all the 
properties as described:

val props: Map[String, String] = Map[String, String](
      "schema.registry.ssl.keystore.location" ->"/config/keystore/keystore.jks",
      "schema.registry.ssl.keystore.location" -> 
"/config/keystore/keystore.jks",
      "schema.registry.ssl.keystore.password" -> kafkaSettings.keystorePass.get,
      "schema.registry.ssl.truststore.location" -> 
"/config/keystore/keystore.jks",
      "schema.registry.ssl.truststore.password" -> 
kafkaSettings.keystorePass.get,
      "schema.registry.ssl.key.password" -> kafkaSettings.keystorePass.get
    )

I also tried the legacy approach to put the keystore and truststore as 
environment variables directly on the jvm. Nothing works so far.

So if someone found a way to implement SSL with Flink 1.11.1 against a SSL 
secured confluent Schema Registry, please reach out or advice on the 
development of the library

Many Thanks for your time.

Cheers,
 
Patrick

Reply via email to