Hi All,

I have the following setup:
Kafka broker (3.9.0)
Kafka producer (for now, using the producer-console in kafka itself)

This setup works fine for basic TCP, TLS and even tried SASL authentication
using PLAIN, SHA256.
Now, I am trying to setup OAuth2 SASL authentication on this setup and get
an *invalid_token* error from kafka broker while doing *SASL authentication*
;.

This is what my configuration looks like: (included only properties
relevant to SASL oauth)

*server.properties:*

sasl.enabled.mechanisms=OAUTHBEARER
#JWKS URL from the openid-configuration URL for the oauth2 host
sasl.oauthbearer.*jwks.endpoin*t.url=https://<oauth2 host
name>:443/admin/v1/SigningCert/jwk
listener.name.sasl_plaintext.sasl.enabled.mechanisms=OAUTHBEARER
#verifief that this isn sync with the values for aud and iss from the
access token

*sasl.oauthbearer.expected.audience="<aud
value>"sasl.oauthbearer.expected.issuer="<issuer URL>"*
listener.name.sasl_plaintext.oauthbearer.principal.builder.class=org.apache.kafka.common.security.authenticator.
*DefaultKafkaPrincipalBuilder*
sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.
*OAuthBearerValidatorCallbackHandler*

*Server side jaas config file:*

KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required;
};

*producer.properties:*

security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.*OAuthBearerLoginModule
*required \


*clientId="<client id value shared by the oauth2 admin>"
\clientSecret="<client secret shared by the oauth2 admin>" \scope="<scope
shared by oauth2 admin>";*
sasl.oauthbearer.*token.endpoint*.url=https://<oauth2 host
name>:443/oauth2/v1/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.
*OAuthBearerLoginCallbackHandler*

Note: I am not interested in any custom functionality. I just want to be
able to use oauth2 authentication for my kafka client. Basically, I want
that if I give the provided oauth2 credentials (client id and client
secret), I should be able to login and carry out with the kafka
functionality.

This is what happens with the above configuration:
1. I can see from the logs that the kafka producer is able to login to the
oauth2 server and get the access token. I see logs like this on the
producer console which tell me that the client can authenticate with the
oauth2 server:

 DEBUG getClaim - scope: all
(org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)
[2024-12-13 13:08:04,852] DEBUG getClaim - exp: 1734098884
(org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)
[2024-12-13 13:08:04,853] DEBUG getClaim - sub <client id
value>(org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)
[2024-12-13 13:08:04,853] DEBUG getClaim - iat: 1734095284
(org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)
[2024-12-13 13:08:04,863] DEBUG *Login succeeded*; invoke commit() to
commit it; current committed token count=0
(org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
[2024-12-13 13:08:04,864] TRACE Committing my token; current committed
token count = 0
(org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
[2024-12-13 13:08:04,865] DEBUG Done committing my token; committed token
count is now 1
(org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
[2024-12-13 13:08:04,866] INFO *Successfully logged in.*

2. However, after that, when the producer tries to do SASL authentication
with the kafka broker, it fails with the error:
*{"status":"invalid_token"}.*

I see the following logs on the producer console:
 DEBUG [Producer clientId=console-producer] Set SASL client state to
INITIAL
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2024-12-13 13:08:08,076] DEBUG Setting SASL/OAUTHBEARER client state to
RECEIVE_SERVER_FIRST_MESSAGE
(org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient)
[2024-12-13 13:08:08,084] DEBUG [Producer clientId=console-producer] Set
SASL client state to INTERMEDIATE
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2024-12-13 13:08:08,085] TRACE [Producer clientId=console-producer] Found
least loaded connecting node <machine name>:9093 (id: -1 rack: null)
(org.apache.kafka.clients
.NetworkClient)
[2024-12-13 13:08:08,086] TRACE For telemetry state SUBSCRIPTION_NEEDED,
returning the value 0 ms;
 (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2024-12-13 13:08:08,086] TRACE [Producer clientId=console-producer] Found
least loaded connecting node <machine name>:9093 (id: -1 rack: null)
(org.apache.kafka.clients
.NetworkClient)
[2024-12-13 13:08:08,091] DEBUG *Sending %%x01 response to server after
receiving an error: {"status":"invalid_token"} *
(org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient)

In the server log file, I see the following lines:

DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during
authentication
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,091] DEBUG Handling Kafka request API_VERSIONS during
authentication
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,093] DEBUG Set SASL server state to HANDSHAKE_REQUEST
during authentication
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,101] DEBUG Handling Kafka request SASL_HANDSHAKE
during authentication
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,138] DEBUG Using SASL mechanism 'OAUTHBEARER' provided
by client
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,142] DEBUG Set SASL server state to AUTHENTICATE
during authentication
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,168] DEBUG *{"status":"invalid_token"}
(org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer)*
[2024-12-13 12:28:42,182] DEBUG Received %x01 response from client after it
received our error
(org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer)
[2024-12-13 12:28:42,191] DEBUG Set SASL server state to FAILED during
authentication
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator

I searched online for clues and verified the following:

1. The problem is caused by the following file:
https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java
in the function handleValidatorCallback(OAuthBearerValidatorCallback
callback)

For some reason, here, it is trying to validate the token and considers it
invalid.
OAuthBearerToken token;

        try {
            t*oken = accessTokenValidator.validate(callback.tokenValue());*
            callback.token(token);
        } catch (ValidateException e) {
            log.warn(e.getMessage(), e);
            callback.error("invalid_token", null, null);
        }

2. I verified that the JWKS URL configured in the server properties is
accessible. Ran it via curl and postman. (I however do not see anything in
the server.log file to indicate that the kafka broker contacted the JWKS
URL)

3. I even verified that the sub value in the access token is right. I see
the log in producer console: DEBUG getClaim - sub <client id value>

4. The iat and exp values in the access token are also appropriate. I even
synced up the clocks on kafka broker, producer and the oauth2 server (all
now use UTC)

5. I verified that the kid in the access token matches the kid in the JWKS
JSON.

6. I am trying to debug the Kafka code for this issue. I did notice that
first the SASL client moves to state *INITIAL* and then at this point, gets
the access token. But when it transitions to state *INTERMEDIATE*, it
throws the invalid_token error.

Any suggestions on what the issue could be? I have already enabled logging
level to DEBUG on producer and server.

Thanks in advance!
Iyer

Reply via email to