Hi Kirk,

Thanks for checking.

I am trying to setup a Kafka cluster with end-to-end oauth (i.e. Kafka -
Kafka communication within a cluster & clients to Kafka broker). I was able
to get my broker started without errors with below config however I am now
unable to create topics with below error.

*Current config*
*jaas.config*

KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required
    clientId="<xxxxxxxx>"
    clientSecret="<xxxxxxxx>"
    audience="https://myprovider.com";
    token.endpoint.uri="https://xxxxxxxx/oauth/token";
    scope="kafka.read kafka.write";
};

*server.properties*
listeners=SASL_PLAINTEXT://:9093
advertised.listeners=SASL_PLAINTEXT://<>:9093
sasl.enabled.mechanisms=OAUTHBEARER
sasl.oauthbearer.expected.audience=https://myprovider.com
oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required;
inter.broker.listener.name=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://<xxxxxxxx>/oauth/token
sasl.oauthbearer.jwks.endpoint.url=https://<xxxxxxxx>/.well-known/jwks.json

*ERROR WHILE CREATING TOPIC*

This is very strange because when I check the fetch the token manually via
curl and check it , I clearly see the "sub" field populated with value
<clientid@clients>


* ERROR No principal name in JWT claim: sub
(org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)java.io.IOException:
No principal name in JWT claim: sub*
        at
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
        at
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
        at
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
        at
java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
        at
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
        at
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
        at
java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
        at
java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
        at
java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
        at
org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204)
        at
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150)
        at
org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
        at
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
        at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
        at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:119)
        at
org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:223)
        at
org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:189)
        at
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:525)
        at
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:492)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:137)
        at
org.apache.kafka.tools.TopicCommand$TopicService.createAdminClient(TopicCommand.java:437)
        at
org.apache.kafka.tools.TopicCommand$TopicService.<init>(TopicCommand.java:426)
        at org.apache.kafka.tools.TopicCommand.execute(TopicCommand.java:98)
        at
org.apache.kafka.tools.TopicCommand.mainNoExit(TopicCommand.java:87)
        at org.apache.kafka.tools.TopicCommand.main(TopicCommand.java:82)





Regards
Ashish Sood

On Thu, Mar 20, 2025 at 12:15 AM Kirk True <k...@kirktrue.pro> wrote:

> Hi Ashish,
>
> Are you using OAuth for client->broker communication, inter-broker
> communication, or both?
>
> Based on the server.properties configuration that was shared, it looks
> like the configuration is attempting to set up inter-broker communication
> using OAuth.
>
> For a broker to *retrieve* tokens , it needs to have this configuration:
>
>
> listener.name.SASL_PLAINTEXT.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
>
> For a broker to *validate* tokens, it needs to have this configuration:
>
>
> listener.name.SASL_PLAINTEXT.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler
>
> Then the SASL configs would need to be included too:
>
> listener.name.SASL_PLAINTEXT.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required \
>     clientId="XXXXXXXXXXXXXXXXXX"
>     clientSecret="XXXXXXXXXXXXXXXXXX"
>     audience="https://myprovider.com";
>     serviceName="kafka"
>     scope="kafka.read kafka.write";
>
> If possible, please share any non-sensitive logs.
>
> Thanks,
> Kirk
>
> On Wed, Mar 19, 2025, at 3:41 AM, ashish sood wrote:
> > Hi All,
> >
> > I am setting up oauth for my Kafka broker. I have set up an account on
> Auth0 for the same and set up an application and API.
> >
> > With the below config in the server.properties and Jaas.config file I
> keep getting invalid token. Although if I generate a manual token via curl
> it works fine. Also Auth0 logs show successful generation of the token,
> still the Kafka shows error. Any suggestions to resolve this issue would be
> appreciated.
> >
> > *Server.properties*
> > listeners=SASL_PLAINTEXT://:9093
> > advertised.listeners=SASL_PLAINTEXT://<XXXXXX>:9093
> > sasl.enabled.mechanisms=OAUTHBEARER
> > sasl.oauthbearer.jwks.endpoint.url=https://XXXXXXXXX/oauth/token <
> https://xxxxxxxxx/oauth/token>
> > sasl.oauthbearer.expected.audience=https://myprovider.com
> >
> oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required;
> >
> listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> > confluent.oauth.groups.claim.name=groups
> > inter.broker.listener.name=SASL_PLAINTEXT
> > sasl.mechanism.inter.broker.protocol=OAUTHBEARER
> > super.users=User:<ClientID>
> > sasl.oauthbearer.token.endpoint.url=<XXXXXXXXX>/oauth/token
> > sasl.oauthbearer.audience=https://myprovider.com
> > allow.everyone.if.no.acl.found=true
> > **
> > *Jaas Config*
> > KafkaServer {
> >     org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required
> >     clientId="XXXXXXXXXXXXXXXXXX"
> >     clientSecret="XXXXXXXXXXXXXXXXXX"
> >     audience="https://myprovider.com";
> >     serviceName="kafka"
> >     scope="kafka.read kafka.write";
> > };
> >
> > *Error*
> > [2025-03-19 16:05:43,465] INFO [Controller id=0, targetBrokerId=0]
> Failed authentication with localhost/127.0.0.1 (channelId=0)
> ({"status":"invalid_token"}) (org.apache.kafka.common.network.Selector)
> >
> > image.png
> >
> > image.png
> >
> > Thanks & Regards
> >
> >
> >
> >
> >
> >
> > ReplyForward
> >
> >
> > Add reaction
>

Reply via email to