Hi! You'll need to set
props.put("schema.registry.basic.auth.user.info", "<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>"); tkg_cangkul <yuza.ras...@gmail.com> 于2021年7月21日周三 上午12:06写道: > Hi, > > > i'm trying to connect to kafka with schema registry that using SSL with > flink 1.11.2 > > i've got this error message when i try to submit the job. > > Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions. > RestClientException: Unauthorized; error code: 401 > > > Anyone can help me to solve this? > > This is my properties : > > public static Properties getDefaultProperties() { > final Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS); > properties.setProperty("security.protocol", KAFKA_SECURITY_PROTOCOL); > properties.setProperty("ssl.truststore.location", > KAFKA_SSL_TRUSTSTORE_LOCATION); > properties.setProperty("ssl.truststore.password", > KAFKA_SSL_TRUSTSTORE_PASSWORD); > properties.setProperty("ssl.keystore.location", KAFKA_SSL_KEYSTORE_LOCATION); > properties.setProperty("ssl.keystore.password", > KAFKA_SSL_KEYSTORE_PASSWORD); properties.setProperty("ssl.key.password", > KAFKA_SSL_KEY_PASSWORD); // schema registry // > properties.setProperty("schema.registry.ssl.truststore.location", > KAFKA_SSL_TRUSTSTORE_LOCATION); > properties.setProperty("schema.registry.ssl.truststore.password", > KAFKA_SSL_TRUSTSTORE_PASSWORD); > properties.setProperty("schema.registry.ssl.keystore.location", > KAFKA_SSL_KEYSTORE_LOCATION); > properties.setProperty("schema.registry.ssl.keystore.password", > KAFKA_SSL_KEYSTORE_PASSWORD); > properties.setProperty("schema.registry.ssl.key.password", > KAFKA_SSL_KEY_PASSWORD); // schema registry end // > properties.setProperty("ssl.endpoint.identification.algorithm", ""); > properties.setProperty("sasl.mechanism", "PLAIN"); > properties.setProperty("basic.auth.credentials.source", "USER_INFO"); > properties.setProperty("schema.registry.url", SCHEMA_REGISTRY_URL); > properties.setProperty("basic.auth.user.info","user:pass"); > properties.setProperty("sasl.jaas.config", > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"user\" password=\"pass\" ;"); > properties.setProperty("flink.partition-discovery.interval-millis", "60000"); > properties.setProperty("request.timeout.ms","60000"); return > properties;} > >