Hi!

You'll need to set

props.put("schema.registry.basic.auth.user.info",
"<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>");

tkg_cangkul <yuza.ras...@gmail.com> 于2021年7月21日周三 上午12:06写道:

> Hi,
>
>
> i'm trying to connect to kafka with schema registry that using SSL with
> flink 1.11.2
>
> i've got this error message when i try to submit the job.
>
> Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.
> RestClientException: Unauthorized; error code: 401
>
>
> Anyone can help me to solve this?
>
> This is my properties :
>
> public static Properties getDefaultProperties() {
>     final Properties properties = new Properties();    
> properties.setProperty("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);    
> properties.setProperty("security.protocol", KAFKA_SECURITY_PROTOCOL);    
> properties.setProperty("ssl.truststore.location", 
> KAFKA_SSL_TRUSTSTORE_LOCATION);    
> properties.setProperty("ssl.truststore.password", 
> KAFKA_SSL_TRUSTSTORE_PASSWORD);    
> properties.setProperty("ssl.keystore.location", KAFKA_SSL_KEYSTORE_LOCATION); 
>    properties.setProperty("ssl.keystore.password", 
> KAFKA_SSL_KEYSTORE_PASSWORD);    properties.setProperty("ssl.key.password", 
> KAFKA_SSL_KEY_PASSWORD);    // schema registry //    
> properties.setProperty("schema.registry.ssl.truststore.location", 
> KAFKA_SSL_TRUSTSTORE_LOCATION);    
> properties.setProperty("schema.registry.ssl.truststore.password", 
> KAFKA_SSL_TRUSTSTORE_PASSWORD);    
> properties.setProperty("schema.registry.ssl.keystore.location", 
> KAFKA_SSL_KEYSTORE_LOCATION);    
> properties.setProperty("schema.registry.ssl.keystore.password", 
> KAFKA_SSL_KEYSTORE_PASSWORD);    
> properties.setProperty("schema.registry.ssl.key.password", 
> KAFKA_SSL_KEY_PASSWORD);    // schema registry end //    
> properties.setProperty("ssl.endpoint.identification.algorithm", "");    
> properties.setProperty("sasl.mechanism", "PLAIN");    
> properties.setProperty("basic.auth.credentials.source", "USER_INFO");    
> properties.setProperty("schema.registry.url", SCHEMA_REGISTRY_URL);    
> properties.setProperty("basic.auth.user.info","user:pass");    
> properties.setProperty("sasl.jaas.config", 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"user\" password=\"pass\" ;");    
> properties.setProperty("flink.partition-discovery.interval-millis", "60000"); 
>    properties.setProperty("request.timeout.ms","60000");    return 
> properties;}
>
>

Reply via email to